Skip to main content
nestrs-microservices provides message-based transport adapters for nestrs, analogous to @nestjs/microservices. It supports request-reply patterns (#[message_pattern]), fire-and-forget events (#[event_pattern]), and in-process pub/sub via EventBus. The HTTP and microservice servers can share a single process in hybrid mode.
# Cargo.toml
[dependencies]
nestrs = { version = "*", features = ["microservices"] }

Feature flags

Enable transports by adding the corresponding feature to your nestrs dependency:
FeatureTransport
microservicesTCP (default; always included when this group is enabled)
microservices-natsNATS
microservices-redisRedis Pub/Sub
microservices-kafkaApache Kafka
microservices-mqttMQTT
microservices-rabbitmqRabbitMQ (AMQP)
microservices-grpcgRPC (tonic)
[dependencies]
nestrs = { version = "*", features = [
    "microservices",
    "microservices-nats",
    "microservices-redis",
] }

Transport trait

The core async interface for all transport adapters. You use this indirectly through ClientProxy.
#[async_trait]
pub trait Transport: Send + Sync + 'static {
    // Request-reply: send a message and wait for a response
    async fn send_json(
        &self,
        pattern: &str,
        payload: serde_json::Value,
    ) -> Result<serde_json::Value, TransportError>;

    // Fire-and-forget: emit an event with no response expected
    async fn emit_json(
        &self,
        pattern: &str,
        payload: serde_json::Value,
    ) -> Result<(), TransportError>;
}

Transport options structs

Each transport has its own options type passed to NestFactory::create_microservice_*:
TransportServer optionsClient options
TCPTcpMicroserviceOptionsTcpTransportOptions
NATSNatsMicroserviceOptionsNatsTransportOptions
RedisRedisMicroserviceOptionsRedisTransportOptions
gRPCGrpcMicroserviceOptionsGrpcTransportOptions
RabbitMQRabbitMqMicroserviceOptionsRabbitMqTransportOptions
KafkaKafkaMicroserviceOptionsKafkaTransportOptions
MQTTMqttMicroserviceOptionsMqttTransportOptions

Message handlers

Declare message handlers in a #[micro_routes] impl block on a struct that is registered as a provider:
use nestrs::prelude::*;

#[injectable]
struct MathService;

#[micro_routes]
impl MathService {
    #[message_pattern("math.add")]
    async fn add(&self, payload: serde_json::Value) -> Result<serde_json::Value, TransportError> {
        let a = payload["a"].as_f64().unwrap_or(0.0);
        let b = payload["b"].as_f64().unwrap_or(0.0);
        Ok(serde_json::json!({ "result": a + b }))
    }

    #[event_pattern("user.created")]
    async fn on_user_created(&self, payload: serde_json::Value) {
        tracing::info!("new user: {:?}", payload);
    }
}

#[module(providers = [MathService], microservices = [MathService])]
struct AppModule;

Cross-cutting on message handlers

Apply guards, interceptors, and pipes to individual message handlers using their microservice variants:
AttributePurpose
#[use_micro_guards(G)]Run before the handler; G implements MicroCanActivate
#[use_micro_interceptors(I)]Observe inbound patterns; I implements MicroIncomingInterceptor
#[use_micro_pipes(P)]Transform inbound JSON; P implements MicroPipeTransform
Pipeline order is: interceptors → guards → pipes → handler.
#[micro_routes]
impl SecureService {
    #[message_pattern("secure.action")]
    #[use_micro_guards(TokenGuard)]
    async fn action(&self, payload: serde_json::Value) -> Result<serde_json::Value, TransportError> {
        Ok(serde_json::json!({ "ok": true }))
    }
}

MicroCanActivate trait

#[async_trait]
pub trait MicroCanActivate: Default + Send + Sync + 'static {
    async fn can_activate_micro(
        &self,
        pattern: &str,
        payload: &serde_json::Value,
    ) -> Result<(), TransportError>;
}

ClientProxy

A typed wrapper around a Transport for outgoing messages. Use send for request-reply and emit for fire-and-forget.
pub struct ClientProxy { /* private */ }

impl ClientProxy {
    pub async fn send<TReq, TRes>(
        &self,
        pattern: &str,
        payload: &TReq,
    ) -> Result<TRes, TransportError>
    where
        TReq: Serialize + Send + Sync,
        TRes: for<'de> Deserialize<'de> + Send;

    pub async fn emit<TReq>(
        &self,
        pattern: &str,
        payload: &TReq,
    ) -> Result<(), TransportError>
    where
        TReq: Serialize + Send + Sync;
}
use nestrs::prelude::*;

#[injectable]
struct OrdersService {
    client: std::sync::Arc<ClientProxy>,
}

impl OrdersService {
    pub async fn notify_billing(&self, order_id: &str) -> Result<(), TransportError> {
        self.client.emit("billing.order_created", &serde_json::json!({
            "order_id": order_id
        })).await
    }
}

ClientsModule

Registers named ClientProxy instances and a ClientsService into a DynamicModule for import into any module.
pub struct ClientsModule;

impl ClientsModule {
    pub fn register(configs: &[ClientConfig]) -> DynamicModule
}
When exactly one ClientConfig is provided, ClientProxy itself is exported as a default client (no name lookup needed). With multiple configs, use ClientsService::expect(name).
use nestrs::prelude::*;
use nestrs::microservices::{ClientConfig, TcpTransportOptions};

#[module(imports = [
    ClientsModule::register(&[
        ClientConfig::tcp("billing", TcpTransportOptions::default()),
        ClientConfig::tcp("notifications", TcpTransportOptions::default()),
    ]),
])]
struct AppModule;

ClientConfig

Builder for named transport clients:
pub struct ClientConfig {
    pub name: &'static str,
    pub transport: Arc<dyn Transport>,
}

impl ClientConfig {
    pub fn tcp(name: &'static str, options: TcpTransportOptions) -> Self
    pub fn nats(name: &'static str, options: NatsTransportOptions) -> Self   // feature: nats
    pub fn redis(name: &'static str, options: RedisTransportOptions) -> Self // feature: redis
    pub fn grpc(name: &'static str, options: GrpcTransportOptions) -> Self   // feature: grpc
    pub fn kafka(name: &'static str, options: KafkaTransportOptions) -> Self // feature: kafka
    pub fn mqtt(name: &'static str, options: MqttTransportOptions) -> Self   // feature: mqtt
    pub fn rabbitmq(name: &'static str, options: RabbitMqTransportOptions) -> Self // feature: rabbitmq
}

ClientsService

Provides named ClientProxy lookup after ClientsModule::register has been imported.
pub struct ClientsService { /* private */ }

impl ClientsService {
    pub fn get(&self, name: &str) -> Option<ClientProxy>
    pub fn expect(&self, name: &str) -> ClientProxy // panics with helpful message if missing
}
#[injectable]
struct MyService {
    clients: std::sync::Arc<ClientsService>,
}

impl MyService {
    pub async fn send_billing(&self) -> Result<(), TransportError> {
        let proxy = self.clients.expect("billing");
        proxy.emit("billing.event", &serde_json::json!({})).await
    }
}

EventBus

In-process pub/sub bus. Automatically registered when you use ClientsModule. Use EventBus::publish to emit events and #[on_event] inside #[event_routes] to subscribe.
use nestrs::prelude::*;

// Publishing
#[injectable]
struct PublisherService {
    bus: std::sync::Arc<EventBus>,
}

impl PublisherService {
    pub async fn emit_user_created(&self, id: &str) {
        self.bus.publish("user.created", serde_json::json!({ "id": id })).await;
    }
}

// Subscribing
#[injectable]
struct AuditService;

#[event_routes]
impl AuditService {
    #[on_event("user.created")]
    async fn log_user_created(&self, payload: serde_json::Value) {
        tracing::info!("audit: user created {:?}", payload);
    }
}

Hybrid mode (HTTP + microservice)

Run both an HTTP server and a microservice transport in the same process:
use nestrs::prelude::*;
use nestrs::microservices::TcpMicroserviceOptions;

#[module]
struct AppModule;

#[tokio::main]
async fn main() {
    NestFactory::create_microservice::<AppModule>(TcpMicroserviceOptions::default())
        .also_listen_http(3000)
        .configure_http(|app| {
            app.set_global_prefix("api")
               .enable_health_check("/health")
               .enable_cors(CorsOptions::permissive())
        })
        .listen()
        .await;
}

TransportError

The error type returned from message handlers and ClientProxy methods.
pub struct TransportError {
    pub message: String,
    pub details: Option<serde_json::Value>,
}

impl TransportError {
    pub fn new(message: impl Into<String>) -> Self
    pub fn with_details(self, details: serde_json::Value) -> Self
}