Patterns align with MICROSERVICES.md and Recipe G in the repo.
Primitives
| Primitive | Use |
|---|
Transport::emit_json / ClientProxy::emit | Fire-and-forget integration events |
send | Request/reply RPC over the same wire |
#[event_pattern] | Micro handler consumes inbound events |
EventBus + #[on_event] | In-process domain reactions |
Integration vs domain
- Integration events cross service boundaries (
order.created on NATS/Kafka).
- Domain events can stay inside one binary via
EventBus until you need delivery guarantees.
In-process #[on_event] sketch
#[injectable]
struct OrderProjector;
#[event_routes]
impl OrderProjector {
#[on_event("order.created")]
async fn apply(&self, payload: serde_json::Value) {
tracing::info!(?payload, "projection");
}
}
Register OrderProjector in providers; wire_on_event_handlers runs on bootstrap for HTTP/micro apps.
Emit:
bus.emit(
"order.created",
&serde_json::json!({ "id": 99, "event_version": 1 }),
)
.await;
See microservices_events.rs for ClientsModule + EventBus wiring.
HTTP → DB → emit
- Validate body → transactional
OrderService::create.
- After commit →
clients.expect("AUDIT").emit("order.created", &dto).
- If emit can fail post-commit, move to outbox (below).
End-to-end example: checkout, billing, fulfillment
This is a real production-style choreography:
- Checkout API creates the order and writes an outbox row.
- Outbox publisher emits
order.placed to the broker.
- Billing consumes
order.placed and emits payment.captured.
- Fulfillment consumes
payment.captured and emits shipment.requested.
Producer side: create order + outbox row
#[injectable]
pub struct CheckoutService {
prisma: Arc<PrismaService>,
}
impl CheckoutService {
pub async fn place_order(&self, body: CreateOrderDto) -> Result<OrderRow, HttpException> {
// Pseudocode: run both inserts in one SQL transaction in your repo layer.
let order = self
.prisma
.order()
.create(OrderCreateInput {
tenant_id: body.tenant_id.clone(),
customer_id: body.customer_id.clone(),
total_cents: body.total_cents,
status: "pending".into(),
})
.await
.map_err(HttpException::from)?;
self.prisma
.integration_outbox()
.create(IntegrationOutboxCreateInput {
topic: "order.placed".into(),
payload: serde_json::json!({
"event_id": uuid::Uuid::new_v4().to_string(),
"event_version": 1,
"order_id": order.id,
"tenant_id": order.tenant_id,
"total_cents": order.total_cents,
}),
headers: serde_json::json!({
"correlation_id": body.correlation_id,
}),
})
.await
.map_err(HttpException::from)?;
Ok(OrderRow {
id: order.id.to_string(),
status: order.status,
})
}
}
Outbox (PostgreSQL / Prisma)
Transactional write
Persist business row and outbox row in one DB transaction.
Async publisher
Worker polls outbox, emit, marks delivered with retries + DLQ policy.
Idempotency
Consumers dedupe with event_id / idempotency_key—assume at-least-once delivery.
nestrs does not ship an outbox crate—model the table with prisma_model! or raw SQL.
Outbox table (PostgreSQL)
CREATE TABLE integration_outbox (
id BIGSERIAL PRIMARY KEY,
topic TEXT NOT NULL,
payload JSONB NOT NULL,
headers JSONB NOT NULL DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ,
attempts INT NOT NULL DEFAULT 0,
last_error TEXT
);
CREATE INDEX integration_outbox_pending
ON integration_outbox (created_at)
WHERE published_at IS NULL;
The HTTP handler inserts business row + outbox row in one transaction. A background tokio task (or separate worker binary) SELECT … FOR UPDATE SKIP LOCKED, publishes via ClientProxy::emit, then sets published_at. After N failures, move rows to integration_dead_letter for manual replay.
Publisher worker: ship pending outbox rows
#[injectable]
pub struct OutboxPublisher {
clients: Arc<ClientsService>,
prisma: Arc<PrismaService>,
}
impl OutboxPublisher {
pub async fn publish_once(&self) -> Result<(), HttpException> {
let rows = self
.prisma
.integration_outbox()
.find_many_with_options(IntegrationOutboxFindManyOptions {
r#where: IntegrationOutboxWhere::and(vec![
integration_outbox::published_at::is_null(),
]),
order_by: Some(vec![integration_outbox::id::order(nestrs_prisma::SortOrder::Asc)]),
take: Some(100),
skip: Some(0),
distinct: None,
})
.await
.map_err(HttpException::from)?;
for row in rows {
let publish = self
.clients
.expect("DOMAIN_EVENTS")
.emit(&row.topic, &row.payload)
.await;
match publish {
Ok(_) => {
self.prisma
.integration_outbox()
.update(
integration_outbox::id::equals(row.id),
IntegrationOutboxUpdateInput {
published_at: Some(chrono::Utc::now()),
attempts: None,
last_error: None,
},
)
.await
.map_err(HttpException::from)?;
}
Err(err) => {
self.prisma
.integration_outbox()
.update(
integration_outbox::id::equals(row.id),
IntegrationOutboxUpdateInput {
published_at: None,
attempts: Some(row.attempts + 1),
last_error: Some(err.to_string()),
},
)
.await
.map_err(HttpException::from)?;
}
}
}
Ok(())
}
}
Integration event envelope (what crosses the broker)
{
"event_id": "0193a8e0-…",
"event_type": "order.placed",
"event_version": 2,
"occurred_at": "2026-04-17T12:00:00Z",
"producer": "checkout-api",
"data": {
"order_id": "ord_123",
"total_cents": 4999,
"customer_id": "cus_99"
}
}
Consumers dedupe on event_id. event_version selects serde shapes. occurred_at drives SLAs and analytics.
CQRS read models
Subscribe to order.created (broker or on_event) and project into Redis, Mongo, or Elasticsearch for read-heavy queries—keep HTTP write path small.
Choreography example (multiple services)
Choreography means no central orchestrator: each service reacts to events it cares about.
- Checkout commits
orders row, writes outbox row order.placed, publisher emits to Kafka topic prod.orders.events.v1.
- Billing consumes
order.placed, charges card; emits payment.captured or payment.failed (another topic).
- Fulfillment listens for
payment.captured, reserves inventory; emits shipment.requested.
- Notifications listens to multiple topics and sends email/SMS—idempotent on
event_id.
nestrs #[event_pattern] handlers implement step 2–4 consumers on NestFactory::create_microservice_*; emit / ClientProxy emit the next integration events.
Consumer side: billing reacts to order.placed
#[derive(Debug, serde::Deserialize)]
pub struct OrderPlacedEvent {
pub event_id: String,
pub event_version: i32,
pub order_id: String,
pub tenant_id: String,
pub total_cents: i64,
}
#[injectable]
pub struct BillingEvents {
clients: Arc<ClientsService>,
}
#[micro_routes]
impl BillingEvents {
#[event_pattern("order.placed")]
async fn on_order_placed(&self, event: OrderPlacedEvent) -> Result<(), HttpException> {
// Charge card with PSP SDK here. Persist dedupe by event_id before or after the call.
self.clients
.expect("DOMAIN_EVENTS")
.emit(
"payment.captured",
&serde_json::json!({
"event_id": uuid::Uuid::new_v4().to_string(),
"event_version": 1,
"order_id": event.order_id,
"tenant_id": event.tenant_id,
"amount_cents": event.total_cents,
}),
)
.await
.map_err(InternalServerErrorException::new)?;
Ok(())
}
}
Dead-letter and replay
When a handler throws after partial side effects, do not lose the message:
- Message brokers: configure DLQ /
retry topic with exponential backoff.
- In-process
EventBus: log + metric failures; optionally persist failed payloads to SQL for replay tooling.
What “exactly-once” really means
Brokers are at-least-once. Exactly-once processing is achieved only by idempotent handlers + dedupe store (event_id → processed timestamp). Design for duplicates from day one.
Keep event_version in payloads when evolving schemas so old consumers can branch safely.