Haredo
Haredo version 2 introduces breaking changes. See 2.0 Changes
Yet another RabbitMQ library
Motivation
For a long time I've been using tortoise as my go-to RabbitMQ client. I quite like the chaining API it has but tortoise does have it's downsides (it's not being maintained, accessing message metadata needs you to not use arrow functions, missing typings, etc.)
Features
- TypeScript
- Chaining based API
- Graceful closing
- RPC
Usage
Working examples are available on github
Initializing
import { haredo } from 'haredo';
const rabbit = haredo({
connection: 'amqp://localhost:5672/'
});
Listening for messages
rabbit.queue('my-queue')
.bindExchange('testExchange', '#', 'topic', { durable: false }) // Can be omitted if you don't want to bind the queue to an exchange right now
.subscribe(async (message) => {
console.log(message);
});
Publishing to an exchange
rabbit.exchange('my-exchange').publish({ id: 5, status: 'active' }, 'item.created');
Publishing to a queue
rabbit.queue('my-queue').publish({ id: 5, status: 'inactive' });
Limit concurrency
rabbit.queue('my-queue')
.prefetch(5) // same as .concurrency(5)
.subscribe(async (message) => {
console.log(message);
});
RPC
rabbit.queue('sum')
// With autoReply on, returned value from callback is automatically replied
// Alternative is to use the reply/1 method on the message
.autoReply()
.subscribe(({ data }) => data[0] data[1]);
const response = await rabbit.queue('sum').rpc([30, 12])
Delayed messages
Note: this requires RabbitMQ Delayed Message Plugin to be installed and enabled on the server.
interface Message {
id: number;
}
const delayedExchange = e<Message>('my-delayed-exchange', 'x-delayed-message').delayed('topic');
await rabbit.queue('my-queue')
.bindExchange(delayedExchange, '#')
.subscribe(({ data, timestamp }) => {
console.log(`Received message in ${ Date.now() - timestamp }ms id:${ data.id } `);
});
const delayedMessage = preparedMessage().routingKey('item').delay(2000);
let id = 0;
while (true) {
id = 1;
console.log('Publishing message', id);
const msg = delayedMessage.json({ id }).timestamp(Date.now());
await rabbit
.exchange(delayedExchange)
.publish(msg);
await delay(2000);
}
Quorum queues with delivery limits
Node: requires RabbitMQ 3.8.0 or higher, see Quorum Queues Overview for more information.
Message throttling
await rabbit.queue('my-queue')
.backoff(standardBackoff({
failThreshold: 3,
failSpan: 5000,
failTimeout: 5000
}))
.subscribe(() => {
throw new Error('Nack this message for me')
});
Dead letter
Middleware
import { Middleware } from 'haredo';
const timeMessage: Middleware = ({ queue }, next) => {
const start = Date.now();
await next();
console.log(`Message took ${ Date.now() - start }ms`);
}
await rabbit.queue('my-queue')
.use(timeMessage)
.subscribe(() => {
throw new Error('Nack this message for me')
});
Graceful shutdown
Calling consumer.close() will send cancel to channel and wait for existing messages to be handled before resolving the returned promise.
Calling haredoInstance.close() will gracefully close all of it's consumers