Skip to content

Commit

Permalink
feat: add createPartition & deletePartition command
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jul 4, 2024
1 parent 7bb8b32 commit 2286c10
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 7 deletions.
7 changes: 6 additions & 1 deletion src/tcp.client.ts
Original file line number Diff line number Diff line change
@@ -1,6 1,8 @@

import { createConnection, Socket } from 'node:net';
import { responseError } from './wire/error.utils.js';
import { translateCommandCode } from './wire/command.code.js';

// interface IggyClient {
// socket: Socket
// }
Expand Down Expand Up @@ -44,7 46,10 @@ export const sendCommandWithResponse = (s: Socket) =>
buffer.writeUint32LE(payloadSize, 0);
buffer.writeUint32LE(command, 4);

console.log('==> CMD', buffer.readInt32LE(4), 'LENGTH', buffer.readInt32LE(0));
console.log(
'==> CMD', buffer.readInt32LE(4),
translateCommandCode(command),
'LENGTH', buffer.readInt32LE(0));

const cmd = Buffer.concat([buffer, payload]);
console.log('==> sending cmd', command, cmd /**, cmd.toString()*/);
Expand Down
28 changes: 22 additions & 6 deletions src/tcp.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 3,6 @@ import { createClient, sendCommandWithResponse } from './tcp.client.js';

import { LOGIN } from './wire/session/login.command.js';
import { LOGOUT } from './wire/session/logout.command.js';
// import { LOGIN_WITH_TOKEN } from './wire/session/login-with-token.command.js';
import { CREATE_STREAM } from './wire/stream/create-stream.command.js';
import { GET_STREAM } from './wire/stream/get-stream.command.js';
import { GET_STREAMS } from './wire/stream/get-streams.command.js';
Expand All @@ -12,6 11,8 @@ import { CREATE_TOPIC } from './wire/topic/create-topic.command.js';
import { GET_TOPIC } from './wire/topic/get-topic.command.js';
import { GET_TOPICS } from './wire/topic/get-topics.command.js';
import { DELETE_TOPIC } from './wire/topic/delete-topic.command.js';
import { CREATE_PARTITION } from './wire/partition/create-partition.command.js';
import { DELETE_PARTITION } from './wire/partition/delete-partition.command.js';


try {
Expand All @@ -25,7 26,6 @@ try {
const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd);
console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r));


// CREATE_STREAM
const createStreamCmd = CREATE_STREAM.serialize(1, 'test-stream');
const r_createStream = await sendCommandWithResponse(s)(
Expand Down Expand Up @@ -59,15 59,31 @@ try {
const r_getTopic = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp);
console.log('RESPONSE_getTopic', GET_TOPIC.deserialize(r_getTopic));

// CREATE_PARTITION
const cpa = CREATE_PARTITION.serialize(1, 'test-topic-44', 22);
const r_createPartition = await sendCommandWithResponse(s)(CREATE_PARTITION.code, cpa);
console.log('RESPONSE_createPartition', CREATE_PARTITION.deserialize(r_createPartition));

// DELETE_PARTITION
const dpa = DELETE_PARTITION.serialize(1, 'test-topic-44', 12);
const r_deletePartition = await sendCommandWithResponse(s)(DELETE_PARTITION.code, dpa);
console.log('RESPONSE_deletePartition', DELETE_PARTITION.deserialize(r_deletePartition));

// GET_TOPIC AGAIN
const r_getTopic2 = await sendCommandWithResponse(s)(GET_TOPIC.code, gtp);
console.log('RESPONSE_getTopic2', GET_TOPIC.deserialize(r_getTopic2));


// GET_TOPICS
const gtps = GET_TOPICS.serialize('test-stream');
const r_getTopics = await sendCommandWithResponse(s)(GET_TOPICS.code, gtps);
console.log('RESPONSE_getTopics', GET_TOPICS.deserialize(r_getTopics));

// DELETE TOPIC
const dtp = DELETE_TOPIC.serialize(1, 'test-topic-44', 3);
const r_deleteTopic = await sendCommandWithResponse(s)(DELETE_TOPIC.code, dtp);
console.log('RESPONSE_deleteTopic', DELETE_TOPIC.deserialize(r_deleteTopic));

// // DELETE TOPIC
// const dtp = DELETE_TOPIC.serialize(1, 'test-topic-44', 3);
// const r_deleteTopic = await sendCommandWithResponse(s)(DELETE_TOPIC.code, dtp);
// console.log('RESPONSE_deleteTopic', DELETE_TOPIC.deserialize(r_deleteTopic));

// DELETE STREAM
const dst = DELETE_STREAM.serialize(1);
Expand Down
1 change: 1 addition & 0 deletions src/tcp.token.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 2,7 @@
import { createClient, sendCommandWithResponse } from './tcp.client.js';

import { LOGIN } from './wire/session/login.command.js';
// import { LOGIN_WITH_TOKEN } from './wire/session/login-with-token.command.js';
import { GET_TOKENS } from './wire/token/get-tokens.command.js';
import { CREATE_TOKEN } from './wire/token/create-token.command.js';
import { DELETE_TOKEN } from './wire/token/delete-token.command.js';
Expand Down
14 changes: 14 additions & 0 deletions src/wire/partition/create-partition.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,14 @@

import type { CommandResponse } from '../../tcp.client.js';
import type { Id } from '../identifier.utils.js';
import { serializePartitionParams } from './partition.utils.js';

export const CREATE_PARTITION = {
code: 402,
serialize: (streamId: Id, topicId: Id, partitionCount = 1) => {
return serializePartitionParams(streamId, topicId, partitionCount);
},
deserialize: (r: CommandResponse) => {
return r.status === 0 && r.data.length === 0;
}
};
19 changes: 19 additions & 0 deletions src/wire/partition/delete-partition.command.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,19 @@

import type { CommandResponse } from '../../tcp.client.js';
import type { Id } from '../identifier.utils.js';
import { serializePartitionParams } from './partition.utils.js';


export const DELETE_PARTITION = {
code: 403,
serialize: (
streamId: Id,
topicId: Id,
partitionCount: number,
) => {
return serializePartitionParams(streamId, topicId, partitionCount);
},
deserialize: (r: CommandResponse) => {
return r.status === 0 && r.data.length === 0;
}
};
21 changes: 21 additions & 0 deletions src/wire/partition/partition.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,21 @@

import { serializeIdentifier, type Id } from '../identifier.utils.js';

export const serializePartitionParams = (
streamId: Id, topicId: Id, partitionCount = 1,
) => {

if (partitionCount < 1 || partitionCount > 1000)
throw new Error('Topic partition_count should be between 1 and 1000');

const streamIdentifier = serializeIdentifier(streamId);
const topicIdentifier = serializeIdentifier(topicId);
const b = Buffer.alloc(4);
b.writeUInt32LE(partitionCount, 0);

return Buffer.concat([
streamIdentifier,
topicIdentifier,
b,
])
};

0 comments on commit 2286c10

Please sign in to comment.