Exchanges & acknowledgements
By default a listener consumes a durable queue directly, which is all you need for simple point-to-point messaging. RabbitMQ's exchanges unlock fan-out and topic routing, and acknowledgements give you delivery guarantees.
Exchanges
Binding a consumer to an exchange
Pass binding options as the second argument of @OnRabbitMQMessage. The queue is asserted, the exchange is asserted, and the queue is bound to it with the given routing key:
import { OnRabbitMQMessage } from 'jsr:@danet/rabbitmq';
@Injectable()
export class OrderNotifier {
@OnRabbitMQMessage('order.shipped.email', {
exchange: 'orders',
exchangeType: 'topic', // 'direct' | 'topic' | 'fanout' | 'headers'
routingKey: 'order.shipped',
})
sendShippingEmail(payload: { orderId: number }) {
// ...
}
}Two consumers binding different queues to the same topic/fanout exchange both receive a copy of each matching message — that is the fan-out pattern.
Publishing to an exchange
Use publish(exchange, routingKey, data, exchangeType?, options?):
this.rabbit.publish('orders', 'order.shipped', { orderId: 1 }, 'topic');The exchange is asserted (created if missing) before publishing, and the payload is JSON-encoded and persistent by default.
Acknowledgements
Messages are acknowledged automatically:
- the handler resolves → the message is acked;
- the handler throws → the message is nacked without requeue, so a dead-letter policy can pick it up instead of redelivering in a hot loop.
To take over acking yourself, opt out of auto-ack with consumeOptions:
@OnRabbitMQMessage('order.created', {
consumeOptions: { noAck: true },
})
handle(payload: OrderCreated) {
// you are responsible for acking — nothing is acked or nacked for you
}Use prefetch in RabbitMQModule.forRoot({ url, prefetch }) to cap how many unacknowledged messages the broker delivers at once (QoS), which is how you spread work across multiple consumer instances.
Sending messages from a non-Danet app
Unlike Danet's KV Queue — where Deno.kv exposes a single global queue, so messages must be wrapped as { type, data } to be routed — RabbitMQ has native queues and exchanges. No wrapper is needed: the message body is simply the JSON of your payload.
A message published with:
this.rabbit.sendMessage('order.created', { orderId: 1 });is delivered with body:
{ "orderId": 1 }@OnRabbitMQMessage('order.created') consumes the order.created queue and JSON.parses the body. So any RabbitMQ producer — in any language — that publishes JSON to the same queue (or to an exchange bound to it) will be consumed by your Danet handler, and vice-versa.
Limitations
- One handler per queue. A queue maps to a single
@OnRabbitMQMessagemethod within an app. - No DTO validation. Payloads are
JSON.parsed as-is; the class-validator based validation used by@Bodydoes not apply. Validate inside the handler if you need it. - No wildcard channel matching in the decorator. Routing is delegated to RabbitMQ — use a
topicexchange and routing-key patterns (e.g.order.*) instead.
Danet