npm i @de-novo/nestjs-redis-streams
yarn add @de-novo/nestjs-redis-streams
pnpm i @de-novo/nestjs-redis-streams
This open-source project was inspired by the discussions within the NestJS community, specifically Issue Add more transport strategies (microservices) #3960. The issue highlighted the need for expanding the microservices strategies in NestJS, including an interest in various data stream processing methods, with a particular focus on Redis streams. Redis streams are essential for efficiently supporting real-time data processing and messaging requirements in distributed systems.
The primary goal of this project is to facilitate the easy use of Redis streams within NestJS applications. By doing so, we aim to assist developers in implementing real-time data stream processing features more conveniently, thereby enhancing the performance and scalability of applications built on NestJS.
git clone https://github.com/de-novo/nestjs-redis-stream.git
npm i && npm run build
# redis on
# Docker must be installed.
npm run redis:docker
# client
npm run example:client
# microservice(server)
npm run example:microservice
curl localhost:3000/test/send # use curl
# if you use postman GET: localhost:3000/test/send
// src/main.js
import { RedisStreamServer } from '@de-novo/nestjs-reids-stream';
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const redis = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new RedisStreamServer({
connection: {
path: 'redis://localhost:6379',
},
streams: {
consumer: 'test-1',
consumerGroup: 'test-group',
// deleteMessagesAfterAck: true // not recommend
},
}),
},
);
redis.listen();
await app.listen(3000);
}
bootstrap();
import { Controller } from '@nestjs/common';
import { Ctx, MessagePattern, Payload } from '@nestjs/microservices';
import { AppService } from './app.service';
@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@MessagePattern('message')
sendTest(@Payload() data: any, @Ctx() ctx: any): boolean {
console.log('data', data, ctx);
return false;
}
}
// app.module
import { RedisStreamClientModule } from '@de-novo/nestjs-reids-stream';
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
@Module({
imports: [
RedisStreamClientModule.forRoot({
connection: {
path: 'redis://localhost:6379',
},
streams: {
consumer: 'test-1',
block: 5000,
consumerGroup: 'test-group',
},
responsePattern: ['test.send'],
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
import { RedisStreamClient } from '@de-novo/nestjs-reids-stream';
import { Injectable } from '@nestjs/common';
import { lastValueFrom } from 'rxjs';
@Injectable()
export class AppService {
constructor(private readonly redisStreamClient: RedisStreamClient) {}
async sendTest() {
const res = await lastValueFrom(
this.redisStreamClient.send('test.send', {
value: { value_test: 'test' }, // @Payload payload => {value_test:'test'}
headers: { header_test: 'test' }, // @Ctx ctx => {headers:{header_test:"test"}}
}),
);
return res;
}
emitTest() {
this.redisStreamClient.emit('test.emit', { test: 'test' });
return 'Emit Test';
}
}