Skip to main content
Patterns align with MICROSERVICES.md and Recipe G in the repo.

Primitives

PrimitiveUse
Transport::emit_json / ClientProxy::emitFire-and-forget integration events
sendRequest/reply RPC over the same wire
#[event_pattern]Micro handler consumes inbound events
EventBus + #[on_event]In-process domain reactions

Integration vs domain

  • Integration events cross service boundaries (order.created on NATS/Kafka).
  • Domain events can stay inside one binary via EventBus until you need delivery guarantees.

In-process #[on_event] sketch

#[injectable]
struct OrderProjector;

#[event_routes]
impl OrderProjector {
    #[on_event("order.created")]
    async fn apply(&self, payload: serde_json::Value) {
        tracing::info!(?payload, "projection");
    }
}
Register OrderProjector in providers; wire_on_event_handlers runs on bootstrap for HTTP/micro apps. Emit:
bus.emit(
    "order.created",
    &serde_json::json!({ "id": 99, "event_version": 1 }),
)
.await;
See microservices_events.rs for ClientsModule + EventBus wiring.

HTTP → DB → emit

  1. Validate body → transactional OrderService::create.
  2. After commit → clients.expect("AUDIT").emit("order.created", &dto).
  3. If emit can fail post-commit, move to outbox (below).

End-to-end example: checkout, billing, fulfillment

This is a real production-style choreography:
  • Checkout API creates the order and writes an outbox row.
  • Outbox publisher emits order.placed to the broker.
  • Billing consumes order.placed and emits payment.captured.
  • Fulfillment consumes payment.captured and emits shipment.requested.

Producer side: create order + outbox row

#[injectable]
pub struct CheckoutService {
    prisma: Arc<PrismaService>,
}

impl CheckoutService {
    pub async fn place_order(&self, body: CreateOrderDto) -> Result<OrderRow, HttpException> {
        // Pseudocode: run both inserts in one SQL transaction in your repo layer.
        let order = self
            .prisma
            .order()
            .create(OrderCreateInput {
                tenant_id: body.tenant_id.clone(),
                customer_id: body.customer_id.clone(),
                total_cents: body.total_cents,
                status: "pending".into(),
            })
            .await
            .map_err(HttpException::from)?;

        self.prisma
            .integration_outbox()
            .create(IntegrationOutboxCreateInput {
                topic: "order.placed".into(),
                payload: serde_json::json!({
                    "event_id": uuid::Uuid::new_v4().to_string(),
                    "event_version": 1,
                    "order_id": order.id,
                    "tenant_id": order.tenant_id,
                    "total_cents": order.total_cents,
                }),
                headers: serde_json::json!({
                    "correlation_id": body.correlation_id,
                }),
            })
            .await
            .map_err(HttpException::from)?;

        Ok(OrderRow {
            id: order.id.to_string(),
            status: order.status,
        })
    }
}

Outbox (PostgreSQL / Prisma)

1

Transactional write

Persist business row and outbox row in one DB transaction.
2

Async publisher

Worker polls outbox, emit, marks delivered with retries + DLQ policy.
3

Idempotency

Consumers dedupe with event_id / idempotency_key—assume at-least-once delivery.
nestrs does not ship an outbox crate—model the table with prisma_model! or raw SQL.

Outbox table (PostgreSQL)

CREATE TABLE integration_outbox (
  id           BIGSERIAL PRIMARY KEY,
  topic        TEXT NOT NULL,
  payload      JSONB NOT NULL,
  headers      JSONB NOT NULL DEFAULT '{}',
  created_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
  published_at TIMESTAMPTZ,
  attempts     INT NOT NULL DEFAULT 0,
  last_error   TEXT
);

CREATE INDEX integration_outbox_pending
  ON integration_outbox (created_at)
  WHERE published_at IS NULL;
The HTTP handler inserts business row + outbox row in one transaction. A background tokio task (or separate worker binary) SELECT … FOR UPDATE SKIP LOCKED, publishes via ClientProxy::emit, then sets published_at. After N failures, move rows to integration_dead_letter for manual replay.

Publisher worker: ship pending outbox rows

#[injectable]
pub struct OutboxPublisher {
    clients: Arc<ClientsService>,
    prisma: Arc<PrismaService>,
}

impl OutboxPublisher {
    pub async fn publish_once(&self) -> Result<(), HttpException> {
        let rows = self
            .prisma
            .integration_outbox()
            .find_many_with_options(IntegrationOutboxFindManyOptions {
                r#where: IntegrationOutboxWhere::and(vec![
                    integration_outbox::published_at::is_null(),
                ]),
                order_by: Some(vec![integration_outbox::id::order(nestrs_prisma::SortOrder::Asc)]),
                take: Some(100),
                skip: Some(0),
                distinct: None,
            })
            .await
            .map_err(HttpException::from)?;

        for row in rows {
            let publish = self
                .clients
                .expect("DOMAIN_EVENTS")
                .emit(&row.topic, &row.payload)
                .await;

            match publish {
                Ok(_) => {
                    self.prisma
                        .integration_outbox()
                        .update(
                            integration_outbox::id::equals(row.id),
                            IntegrationOutboxUpdateInput {
                                published_at: Some(chrono::Utc::now()),
                                attempts: None,
                                last_error: None,
                            },
                        )
                        .await
                        .map_err(HttpException::from)?;
                }
                Err(err) => {
                    self.prisma
                        .integration_outbox()
                        .update(
                            integration_outbox::id::equals(row.id),
                            IntegrationOutboxUpdateInput {
                                published_at: None,
                                attempts: Some(row.attempts + 1),
                                last_error: Some(err.to_string()),
                            },
                        )
                        .await
                        .map_err(HttpException::from)?;
                }
            }
        }

        Ok(())
    }
}

Integration event envelope (what crosses the broker)

{
  "event_id": "0193a8e0-…",
  "event_type": "order.placed",
  "event_version": 2,
  "occurred_at": "2026-04-17T12:00:00Z",
  "producer": "checkout-api",
  "data": {
    "order_id": "ord_123",
    "total_cents": 4999,
    "customer_id": "cus_99"
  }
}
Consumers dedupe on event_id. event_version selects serde shapes. occurred_at drives SLAs and analytics.

CQRS read models

Subscribe to order.created (broker or on_event) and project into Redis, Mongo, or Elasticsearch for read-heavy queries—keep HTTP write path small.

Choreography example (multiple services)

Choreography means no central orchestrator: each service reacts to events it cares about.
  1. Checkout commits orders row, writes outbox row order.placed, publisher emits to Kafka topic prod.orders.events.v1.
  2. Billing consumes order.placed, charges card; emits payment.captured or payment.failed (another topic).
  3. Fulfillment listens for payment.captured, reserves inventory; emits shipment.requested.
  4. Notifications listens to multiple topics and sends email/SMS—idempotent on event_id.
nestrs #[event_pattern] handlers implement step 2–4 consumers on NestFactory::create_microservice_*; emit / ClientProxy emit the next integration events.

Consumer side: billing reacts to order.placed

#[derive(Debug, serde::Deserialize)]
pub struct OrderPlacedEvent {
    pub event_id: String,
    pub event_version: i32,
    pub order_id: String,
    pub tenant_id: String,
    pub total_cents: i64,
}

#[injectable]
pub struct BillingEvents {
    clients: Arc<ClientsService>,
}

#[micro_routes]
impl BillingEvents {
    #[event_pattern("order.placed")]
    async fn on_order_placed(&self, event: OrderPlacedEvent) -> Result<(), HttpException> {
        // Charge card with PSP SDK here. Persist dedupe by event_id before or after the call.
        self.clients
            .expect("DOMAIN_EVENTS")
            .emit(
                "payment.captured",
                &serde_json::json!({
                    "event_id": uuid::Uuid::new_v4().to_string(),
                    "event_version": 1,
                    "order_id": event.order_id,
                    "tenant_id": event.tenant_id,
                    "amount_cents": event.total_cents,
                }),
            )
            .await
            .map_err(InternalServerErrorException::new)?;
        Ok(())
    }
}

Dead-letter and replay

When a handler throws after partial side effects, do not lose the message:
  • Message brokers: configure DLQ / retry topic with exponential backoff.
  • In-process EventBus: log + metric failures; optionally persist failed payloads to SQL for replay tooling.

What “exactly-once” really means

Brokers are at-least-once. Exactly-once processing is achieved only by idempotent handlers + dedupe store (event_id → processed timestamp). Design for duplicates from day one.
Keep event_version in payloads when evolving schemas so old consumers can branch safely.