Skip to content

Commit

Permalink
fix: add e2e tests, fix create-group return
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jul 25, 2024
1 parent d537aa7 commit ad52b43
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 294 deletions.
3 changes: 0 additions & 3 deletions src/client/client.socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 27,6 @@ export const wrapSocket = (socket: Socket) =>
});



type WriteCb = ((error: Error | null | undefined) => void) | undefined

type Job = {
Expand All @@ -38,7 37,6 @@ type Job = {
};



export class CommandResponseStream extends Duplex {
private _socket: Socket;
private _readPaused: boolean;
Expand All @@ -47,7 45,6 @@ export class CommandResponseStream extends Duplex {
isAuthenticated: boolean;
userId?: number;


constructor(socket: Socket) {
super();
this._socket = this._wrapSocket(socket);
Expand Down
21 changes: 18 additions & 3 deletions src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 61,22 @@ export class Client extends CommandAPI {

export class SingleClient extends CommandAPI {
_config: ClientConfig
destroy: () => void

constructor(config: ClientConfig) {
super(async () => {
const c = await rawClientGetter(config);
const cliP = rawClientGetter(config);
const init = async () => {
const c = await cliP;
if (!c.isAuthenticated)
await c.authenticate(config.credentials);
return c;
});
};
super(init);
this._config = config;
this.destroy = async () => {
const s = await this.clientProvider();
s.destroy();
};
}
};

Expand All @@ -79,3 86,11 @@ export class SimpleClient extends CommandAPI {
super(() => Promise.resolve(client));
}
};

export const getClient = async (config: ClientConfig) => {
const cli = await rawClientGetter(config);
if (!cli.isAuthenticated)
await cli.authenticate(config.credentials);
const api = new SimpleClient(cli);
return api;
};
99 changes: 0 additions & 99 deletions src/consumer-stream.e2e.ts

This file was deleted.

114 changes: 114 additions & 0 deletions src/e2e/tcp.consumer-group.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,114 @@

import { after, describe, it } from 'node:test';
import assert from 'node:assert/strict';
import { Client, SingleClient } from '../client/client.js';
import { ConsumerKind, PollingStrategy, Partitioning } from '../wire/index.js';
import { generateMessages } from '../tcp.sm.utils.js';

describe('e2e -> consumer-group', async () => {

const c = new SingleClient({
transport: 'TCP',
options: { port: 8090, host: '127.0.0.1' },
credentials: { username: 'iggy', password: 'iggy' }
});

const streamId = 555;
const topicId = 666;

const stream = {
streamId,
name: 'e2e-consumer-group-stream'
};

const topic = {
streamId,
topicId,
name: 'e2e-consumer-group-topic',
partitionCount: 3,
compressionAlgorithm: 1
};

let payloadLength = 0;

await c.stream.create(stream);
await c.topic.create(topic);

const groupId = 333;
const group = { streamId, topicId, groupId, name: 'e2e-cg-1' };

it('e2e -> consumer-group::create', async () => {
const r = await c.group.create(group);
assert.deepEqual(
r, { id: groupId, name: 'e2e-cg-1', partitionsCount: 3, membersCount: 0 }
);
});

it('e2e -> consumer-group::get', async () => {
const gg = await c.group.get({ streamId, topicId, groupId });
assert.deepEqual(
gg, { id: groupId, name: 'e2e-cg-1', partitionsCount: 3, membersCount: 0 }
);
});

it('e2e -> consumer-group::list', async () => {
const lg = await c.group.list({ streamId, topicId });
assert.deepEqual(
lg,
[{ id: groupId, name: 'e2e-cg-1', partitionsCount: 3, membersCount: 0 }]
);
});

it('e2e -> consumer-stream::send-messages', async () => {
const ct = 1000;
const mn = 200;
for (let i = 0; i <= ct; i = mn) {
c.message.send({
streamId, topicId,
messages: generateMessages(mn),
partition: Partitioning.MessageKey(`key-${ i % 400 }`)
});
}
payloadLength = ct;
});

it('e2e -> consumer-group::join', async () => {
assert.ok(await c.group.join({ streamId, topicId, groupId }));
});

it('e2e -> consumer-group::poll', async () => {
const pollReq = {
streamId,
topicId,
consumer: { kind: ConsumerKind.Group, id: groupId },
partitionId: 0,
pollingStrategy: PollingStrategy.Next,
count: 100,
autocommit: true
};
let ct = 0;
while (ct < payloadLength) {
const { messages, ...resp } = await c.message.poll(pollReq);
assert.equal(messages.length, resp.messageCount);
ct = messages.length;
}
assert.equal(ct, payloadLength);

const { messages, ...resp } = await c.message.poll(pollReq);
assert.equal(resp.messageCount, 0);
});

it('e2e -> consumer-group::leave', async () => {
assert.ok(await c.group.leave({ streamId, topicId, groupId }));
});

it('e2e -> consumer-group::delete', async () => {
assert.ok(await c.group.delete({ streamId, topicId, groupId }));
});

after(async () => {
assert.ok(await c.stream.delete(stream));
assert.ok(await c.session.logout());
await c.destroy();
});
});
92 changes: 92 additions & 0 deletions src/e2e/tcp.consumer-stream.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,92 @@

import { after, describe, it } from 'node:test';
import assert from 'node:assert/strict';
import { Duplex, Readable } from 'node:stream';
import { consumerStream, groupConsumerStream } from '../stream/consumer-stream.js';
import { Client } from '../client/index.js';
import {
PollingStrategy, ConsumerKind, Partitioning, PollMessagesResponse
} from '../wire/index.js';
import { sendSomeMessages } from '../tcp.sm.utils.js';

describe('e2e -> consumer-stream', async () => {

const credentials = { username: 'iggy', password: 'iggy' };
const opt = {
transport: 'TCP' as const,
options: { port: 8090, host: '127.0.0.1' },
credentials
};

const c = new Client(opt);

const streamId = 137;
const topicId = 123;
const groupId = 12;

await c.stream.create({ streamId, name: `e2e-stream-${streamId}` })
await c.topic.create({
streamId,
topicId,
name: 'test-topic-stream-cg',
partitionCount: 3,
compressionAlgorithm: 1
});

let ct = 1000;

it('e2e -> consumer-stream::send-messages', async () => {
for (let i = 0; i <= ct; i = 100) {
await sendSomeMessages(c.clientProvider)(
streamId, topicId, Partitioning.MessageKey(`k-${i % 400}`)
);
}
});

it('e2e -> consumer-stream::poll-stream', async () => {
// POLL MESSAGE
const pollReq = {
groupId,
streamId,
topicId,
pollingStrategy: PollingStrategy.Next,
count: 100,
interval: 500
};

const pollStream = groupConsumerStream(opt);
const s = await pollStream(pollReq);
const s2 = await pollStream(pollReq);
let recv = 0;
const dataCb = (str: Readable) => (d: PollMessagesResponse) => {
recv = d.messageCount;
if (recv === ct)
str.destroy();
};

s.on('data', dataCb(s));
s2.on('data', dataCb(s2));

// promise prevent test from finishing before end is handled
await new Promise((resolve, reject) => {
s.on('error', (err) => {
console.error('=>>Stream ERROR::', err)
s.destroy(err);
reject(err);
});

s.on('end', async () => {
resolve(assert.equal(ct, recv));
});
});

after(async () => {
assert.ok(await c.stream.delete({ streamId }));
assert.ok(await c.session.logout());
await c.destroy();
});


});

});
2 changes: 1 addition & 1 deletion src/e2e/tcp.parallel.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 43,7 @@ describe('e2e -> parallel', async () => {
'hostname', 'osName', 'osVersion', 'kernelVersion'
]
);
resp.forEach(assert.ok);
resp.forEach(r => assert.ok(r));
});

it('e2e -> parallel::logout', async () => {
Expand Down
Loading

0 comments on commit ad52b43

Please sign in to comment.