Skip to content

Commit

Permalink
feat: update modified commands for v0.3.0 server release (createTopic…
Browse files Browse the repository at this point in the history
…, updateTopic, login, createToken)
  • Loading branch information
T1B0 committed Jul 4, 2024
1 parent 79e8cde commit cb4f0d1
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 62 deletions.
5 changes: 1 addition & 4 deletions src/tcp.consumer-group.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 43,7 @@ try {
topicId,
name: 'test-cg-44',
partitionCount: 3,
compressionAlgorithm: 1,
messageExpiry: 0,
maxTopicSize: 0,
replicationFactor: 1
compressionAlgorithm: 1
};

// CREATE_TOPIC
Expand Down
5 changes: 1 addition & 4 deletions src/tcp.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 53,6 @@ try {
name: 'topic-name-44',
partitionCount: 3,
compressionAlgorithm: 1, // 1 = None, 2 = Gzip
messageExpiry: 0,
maxTopicSize: 0,
replicationFactor: 1
};

// CREATE_TOPIC
Expand All @@ -68,7 65,7 @@ try {

// UPDATE_TOPIC
const r_updateTopic = await updateTopic(s)({
streamId: topic1.streamId, topicId: topic1.topicId, name: topic1.name, messageExpiry: 42
streamId: topic1.streamId, topicId: topic1.topicId, name: topic1.name, messageExpiry: 42n
});
console.log('RESPONSE_updateTopic', r_updateTopic);

Expand Down
3 changes: 0 additions & 3 deletions src/tcp.send-message.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 40,6 @@ try {
name: topicId,
partitionCount: 3,
compressionAlgorithm: 1,
messageExpiry: 0,
maxTopicSize: 0,
replicationFactor: 1
};

// CREATE_TOPIC
Expand Down
2 changes: 1 addition & 1 deletion src/tcp.token.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 12,7 @@ try {
console.log('RESPONSE_login', r);

// CREATE_TOKEN
const r_createToken = await createToken(s)({ name: 'yolo-token-test', expiry: 1800 });
const r_createToken = await createToken(s)({ name: 'yolo-token-test', expiry: 1800n });
console.log('RESPONSE_createToken', r_createToken);

// GET_TOKENS
Expand Down
4 changes: 2 additions & 2 deletions src/tcp.topic.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 30,8 @@ try {
name: 'test-topic-fuu',
partitionCount: 0,
compressionAlgorithm: 1,
messageExpiry: 0,
maxTopicSize: 0,
messageExpiry: 0n,
maxTopicSize: 0n,
replicationFactor: 1
});

Expand Down
2 changes: 1 addition & 1 deletion src/wire/session/login.command.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 15,7 @@ describe("Login Command", () => {
it("serialize credentials into a buffer", () => {
assert.deepEqual(
LOGIN.serialize(l1).length,
2 l1.username.length l1.password.length
2 l1.username.length l1.password.length 4 4
);
});

Expand Down
43 changes: 38 additions & 5 deletions src/wire/session/login.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 6,21 @@ import { wrapCommand } from '../command.utils.js';

export type LoginCredentials = {
username: string,
password: string
password: string,
version?: string,
context?: string
}

// LOGIN
export const LOGIN = {
code: 38,

serialize: ({username, password}: LoginCredentials) => {
serialize: ({
username,
password,
version,
context
}: LoginCredentials) => {
const bUsername = Buffer.from(username);
const bPassword = Buffer.from(password);

Expand All @@ -22,16 29,42 @@ export const LOGIN = {
if (bPassword.length < 1 || bPassword.length > 255)
throw new Error('Password should be between 1 and 255 bytes');

const l1 = Buffer.alloc(1);
const l2 = Buffer.alloc(1);
const l1 = Buffer.allocUnsafe(1);
const l2 = Buffer.allocUnsafe(1);
l1.writeUInt8(bUsername.length);
l2.writeUInt8(bPassword.length);

const binVersion: Buffer[] = [];
const l3 = Buffer.allocUnsafe(4);

if(version && version.length > 0) {
const bVersion = Buffer.from(version);
l3.writeUInt32LE(bVersion.length);
binVersion.push(l3, bVersion);
} else {
l3.writeUInt32LE(0);
binVersion.push(l3);
}

const binContext: Buffer[] = [];
const l4 = Buffer.allocUnsafe(4);

if(context && context.length > 0) {
const bContext = Buffer.from(context);
l4.writeUInt32LE(bContext.length);
binContext.push(l4, bContext);
} else {
l4.writeUInt32LE(0);
binContext.push(l4);
}

return Buffer.concat([
l1,
bUsername,
l2,
bPassword
bPassword,
...binVersion,
...binContext
])
},

Expand Down
4 changes: 2 additions & 2 deletions src/wire/token/create-token.command.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 9,14 @@ describe('CreateToken', () => {

const t1 = {
name: 'test-token',
expiry: 1234
expiry: 1234n
};

it('serialize 1 name & 1 uint32 into buffer', () => {

assert.deepEqual(
CREATE_TOKEN.serialize(t1).length,
4 1 t1.name.length
8 1 t1.name.length
);
});

Expand Down
8 changes: 4 additions & 4 deletions src/wire/token/create-token.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 5,21 @@ import { wrapCommand } from '../command.utils.js';

export type CreateToken = {
name: string,
expiry?: number
expiry?: bigint
}


export const CREATE_TOKEN = {
code: 42,

serialize: ({name, expiry = 600}: CreateToken): Buffer => {
serialize: ({name, expiry = 600n}: CreateToken): Buffer => {
const bName = Buffer.from(name);
if (bName.length < 1 || bName.length > 255)
throw new Error('Token name should be between 1 and 255 bytes');
const b1 = Buffer.alloc(1);
b1.writeUInt8(bName.length);
const b2 = Buffer.alloc(4);
b2.writeUInt32LE(expiry);
const b2 = Buffer.alloc(8);
b2.writeBigUInt64LE(expiry);
return Buffer.concat([
b1,
bName,
Expand Down
6 changes: 3 additions & 3 deletions src/wire/topic/create-topic.command.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 13,15 @@ describe('CreateTopic', () => {
name: 'test-topic',
partitionCount: 1,
compressionAlgorithm: 1, // 1 = None, 2 = Gzip
messageExpiry: 0,
maxTopicSize: 0,
messageExpiry: 0n,
maxTopicSize: 0n,
replicationFactor: 1
};

it('serialize 1 numeric id & 1 name into buffer', () => {
assert.deepEqual(
CREATE_TOPIC.serialize(t1).length,
6 4 4 1 4 8 1 1 t1.name.length
6 4 4 1 8 8 1 1 t1.name.length
);
});

Expand Down
24 changes: 9 additions & 15 deletions src/wire/topic/create-topic.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 23,8 @@ export type CreateTopic = {
name: string,
partitionCount: number,
compressionAlgorithm: CompressionAlgorithm,
messageExpiry?: number,
maxTopicSize?: number,
messageExpiry?: bigint,
maxTopicSize?: bigint,
replicationFactor?: number
};

Expand All @@ -36,8 36,8 @@ export const CREATE_TOPIC = {
name,
partitionCount,
compressionAlgorithm = CompressionAlgorithmKind.None,
messageExpiry = 0,
maxTopicSize = 0,
messageExpiry = 0n,
maxTopicSize = 0n,
replicationFactor = 1
}: CreateTopic
) => {
Expand All @@ -49,15 49,15 @@ export const CREATE_TOPIC = {
if (bName.length < 1 || bName.length > 255)
throw new Error('Topic name should be between 1 and 255 bytes');

const b = Buffer.allocUnsafe(4 4 1 4 8 1 1);
const b = Buffer.allocUnsafe(4 4 1 8 8 1 1);
b.writeUInt32LE(topicId, 0);
b.writeUInt32LE(partitionCount, 4);
b.writeUInt8(compressionAlgorithm, 8);

b.writeUInt32LE(messageExpiry, 9); // 0 is unlimited
b.writeBigUInt64LE(BigInt(maxTopicSize), 13); // optional, 0 is null
b.writeUInt8(replicationFactor, 21); // must be > 0
b.writeUInt8(bName.length, 22);
b.writeBigUInt64LE(messageExpiry, 9); // 0 is unlimited
b.writeBigUInt64LE(maxTopicSize, 17); // optional, 0 is null
b.writeUInt8(replicationFactor, 25); // must be > 0
b.writeUInt8(bName.length, 26);

return Buffer.concat([
streamIdentifier,
Expand All @@ -69,9 69,3 @@ export const CREATE_TOPIC = {
};

export const createTopic = wrapCommand<CreateTopic, Boolean>(CREATE_TOPIC);

// export const createTopic = (cli: Client) => async (arg: CreateTopic) => {
// return CREATE_TOPIC.deserialize(
// await cli.sendCommand(CREATE_TOPIC.code, CREATE_TOPIC.serialize(arg))
// );
// }
18 changes: 9 additions & 9 deletions src/wire/topic/topic.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 6,7 @@ export type BaseTopic = {
name: string,
createdAt: Date,
partitionsCount: number
messageExpiry: number,
messageExpiry: bigint,
maxTopicSize: bigint,
replicationFactor: number
sizeBytes: bigint,
Expand Down Expand Up @@ -37,17 37,17 @@ export const deserializeBaseTopic = (p: Buffer, pos = 0): BaseTopicSerialized =>
const id = p.readUInt32LE(pos);
const createdAt = toDate(p.readBigUint64LE(pos 4));
const partitionsCount = p.readUInt32LE(pos 12);
const messageExpiry = p.readUInt32LE(pos 16);
const maxTopicSize = p.readBigUInt64LE(pos 20);
const replicationFactor = p.readUInt8(pos 28);
const sizeBytes = p.readBigUInt64LE(pos 29);
const messagesCount = p.readBigUInt64LE(pos 37);
const messageExpiry = p.readBigUInt64LE(pos 16);
const maxTopicSize = p.readBigUInt64LE(pos 24);
const replicationFactor = p.readUInt8(pos 32);
const sizeBytes = p.readBigUInt64LE(pos 33);
const messagesCount = p.readBigUInt64LE(pos 41);

const nameLength = p.readUInt8(pos 45);
const name = p.subarray(pos 46, pos 46 nameLength).toString();
const nameLength = p.readUInt8(pos 49);
const name = p.subarray(pos 50, pos 50 nameLength).toString();

return {
bytesRead: 4 8 4 4 8 1 8 8 1 nameLength,
bytesRead: 4 8 4 8 8 1 8 8 1 nameLength,
data: {
id,
name,
Expand Down
18 changes: 9 additions & 9 deletions src/wire/topic/update-topic.command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 8,8 @@ export type UpdateTopic = {
streamId: Id,
topicId: Id,
name: string,
messageExpiry?: number,
maxTopicSize?: number,
messageExpiry?: bigint,
maxTopicSize?: bigint,
replicationFactor?: number,
};

Expand All @@ -20,8 20,8 @@ export const UPDATE_TOPIC = {
streamId,
topicId,
name,
messageExpiry = 0,
maxTopicSize = 0,
messageExpiry = 0n,
maxTopicSize = 0n,
replicationFactor = 1,
}: UpdateTopic) => {
const streamIdentifier = serializeIdentifier(streamId);
Expand All @@ -31,11 31,11 @@ export const UPDATE_TOPIC = {
if (bName.length < 1 || bName.length > 255)
throw new Error('Topic name should be between 1 and 255 bytes');

const b = Buffer.allocUnsafe(4 8 1 1);
b.writeUInt32LE(messageExpiry, 0); // 0 is unlimited ???
b.writeBigUInt64LE(BigInt(maxTopicSize), 4); // optional, 0 is null
b.writeUInt8(replicationFactor, 12); // must be > 0
b.writeUInt8(bName.length, 13);
const b = Buffer.allocUnsafe(8 8 1 1);
b.writeBigUInt64LE(messageExpiry, 0); // 0 is unlimited ???
b.writeBigUInt64LE(maxTopicSize, 8); // optional, 0 is null
b.writeUInt8(replicationFactor, 16); // must be > 0
b.writeUInt8(bName.length, 17);

return Buffer.concat([
streamIdentifier,
Expand Down

0 comments on commit cb4f0d1

Please sign in to comment.