Skip to content

de-novo/nestjs-redis-streams

Repository files navigation

@de-novo/nestjs-redis-streams

Nest Logo

Redis Streams Transport Strategy for NestJS using ioredis library.

NPM Version NPM Downloads GitHub Issues or Pull Requests

Installation

with npm

npm i @de-novo/nestjs-redis-streams

with yarn

yarn add @de-novo/nestjs-redis-streams

with pnpm

pnpm i @de-novo/nestjs-redis-streams

Why

This open-source project was inspired by the discussions within the NestJS community, specifically Issue Add more transport strategies (microservices) #3960. The issue highlighted the need for expanding the microservices strategies in NestJS, including an interest in various data stream processing methods, with a particular focus on Redis streams. Redis streams are essential for efficiently supporting real-time data processing and messaging requirements in distributed systems.

The primary goal of this project is to facilitate the easy use of Redis streams within NestJS applications. By doing so, we aim to assist developers in implementing real-time data stream processing features more conveniently, thereby enhancing the performance and scalability of applications built on NestJS.

Run Example

git clone https://github.com/de-novo/nestjs-redis-stream.git
npm i && npm run build
# redis on
# Docker must be installed.
npm run redis:docker
# client
npm run example:client
# microservice(server)
npm run example:microservice
curl localhost:3000/test/send # use curl
# if you use postman GET: localhost:3000/test/send

Start

Use Server Side (like Kafka Consumer)

//  src/main.js
import { RedisStreamServer } from '@de-novo/nestjs-reids-stream';
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const redis = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      strategy: new RedisStreamServer({
        connection: {
          path: 'redis://localhost:6379',
        },
        streams: {
          consumer: 'test-1',
          consumerGroup: 'test-group',
          //   deleteMessagesAfterAck: true // not recommend
        },
      }),
    },
  );
  redis.listen();
  await app.listen(3000);
}
bootstrap();
import { Controller } from '@nestjs/common';
import { Ctx, MessagePattern, Payload } from '@nestjs/microservices';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern('message')
  sendTest(@Payload() data: any, @Ctx() ctx: any): boolean {
    console.log('data', data, ctx);
    return false;
  }
}

Use Client Side (like Kafka Producer)

// app.module
import { RedisStreamClientModule } from '@de-novo/nestjs-reids-stream';
import { Module } from '@nestjs/common';

import { AppController } from './app.controller';
import { AppService } from './app.service';

@Module({
  imports: [
    RedisStreamClientModule.forRoot({
      connection: {
        path: 'redis://localhost:6379',
      },
      streams: {
        consumer: 'test-1',
        block: 5000,
        consumerGroup: 'test-group',
      },
      responsePattern: ['test.send'],
    }),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}
import { RedisStreamClient } from '@de-novo/nestjs-reids-stream';
import { Injectable } from '@nestjs/common';
import { lastValueFrom } from 'rxjs';
@Injectable()
export class AppService {
  constructor(private readonly redisStreamClient: RedisStreamClient) {}

  async sendTest() {
    const res = await lastValueFrom(
      this.redisStreamClient.send('test.send', {
        value: { value_test: 'test' }, // @Payload payload => {value_test:'test'}
        headers: { header_test: 'test' }, // @Ctx ctx => {headers:{header_test:"test"}}
      }),
    );
    return res;
  }

  emitTest() {
    this.redisStreamClient.emit('test.emit', { test: 'test' });
    return 'Emit Test';
  }
}

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published