Skip to content

Commit

Permalink
feat: add SendMessages command
Browse files Browse the repository at this point in the history
  • Loading branch information
T1B0 committed Jul 4, 2024
1 parent 40d91dd commit e1d39d8
Show file tree
Hide file tree
Showing 11 changed files with 522 additions and 7 deletions.
11 changes: 10 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 14,8 @@
"license": "MIT",
"dependencies": {
"@fails-components/webtransport": "^1.0.8",
"@fails-components/webtransport-transport-http3-quiche": "^1.0.8"
"@fails-components/webtransport-transport-http3-quiche": "^1.0.8",
"uuidv7": "^0.6.3"
},
"devDependencies": {
"@types/node": "^20.10.7",
Expand Down
13 changes: 8 additions & 5 deletions src/tcp.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 47,19 @@ export const sendCommandWithResponse = (s: Socket) =>
head.writeUint32LE(command, 4);

console.log(
'==> CMD', head.readInt32LE(4),
'==> CMD', command,
translateCommandCode(command),
'LENGTH', head.readInt32LE(0));
head.subarray(4, 8).toString('hex'),
'LENGTH', payloadSize,
head.subarray(0, 4).toString('hex')
);

const cmd = Buffer.concat([head, payload]);
console.log('==> sending cmd', command, cmd /**, cmd.toString()*/);
console.log(
'==> sending cmd', command, cmd.toString('hex')
);
console.log('==> socket write', s.write(cmd));

console.log('==> full cmd', cmd.toString('hex'));

return new Promise((resolve, reject) => {
const dataCb = (d: Buffer, l: number) => {
console.log('<== #DATA', d, l);
Expand Down
54 changes: 54 additions & 0 deletions src/tcp.send-message.e2e.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,54 @@

import { createClient, sendCommandWithResponse } from './tcp.client.js';
import { v7 } from './wire/uuid.utils.js';

import { LOGIN } from './wire/session/login.command.js';
import { SEND_MESSAGE } from './wire/message/send-message.command.js';
import { CREATE_TOPIC } from './wire/topic/create-topic.command.js';
import { CREATE_STREAM } from './wire/stream/create-stream.command.js';
import { LOGOUT } from './wire/session/logout.command.js';


try {
// create socket
const s = await createClient('127.0.0.1', 8090);
console.log('CLI', s.readyState);

// LOGIN
const loginCmd = LOGIN.serialize('iggy', 'iggy');
console.log('LOGIN', loginCmd.toString());
const r = await sendCommandWithResponse(s)(LOGIN.code, loginCmd);
console.log('RESPONSE_login', r, r.toString(), LOGIN.deserialize(r));

// // CREATE_STREAM
// const createStreamCmd = CREATE_STREAM.serialize(101, 'test-send-message');
// const r_createStream = await sendCommandWithResponse(s)(
// CREATE_STREAM.code, createStreamCmd
// );
// console.log('RESPONSE_createStream', CREATE_STREAM.deserialize(r_createStream));

// // CREATE_TOPIC
// const ctp = CREATE_TOPIC.serialize(
// 101, 1, 'test-topic-sm', 3, 0, 0, 1
// );
// const r_createTopic = await sendCommandWithResponse(s)(CREATE_TOPIC.code, ctp);
// console.log('RESPONSE_createTopic', CREATE_TOPIC.deserialize(r_createTopic));

// SEND MESSAGE
const cmdSm = SEND_MESSAGE.serialize(
101, 'test-topic-sm',
[{ id: v7(), payload: 'yolo msg' }]
);

const r1 = await sendCommandWithResponse(s)(SEND_MESSAGE.code, cmdSm);
console.log('RESPONSE SEND_MESSAGE', SEND_MESSAGE.deserialize(r1));


// LOGOUT
const rOut = await sendCommandWithResponse(s)(LOGOUT.code, LOGOUT.serialize());
console.log('RESPONSE LOGOUT', LOGOUT.desserialize(rOut));


} catch (err) {
console.error('FAILED!', err);
}
84 changes: 84 additions & 0 deletions src/wire/message/header.type.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,84 @@

export enum HeaderKind {
Raw = 1,
String = 2,
Bool = 3,
Int32 = 6,
Int64 = 7,
Int128 = 8,
Uint32 = 11,
Uint64 = 12,
Uint128 = 13,
Float = 14,
Double = 15
}

export type HeaderValueRaw = {
kind: HeaderKind.Raw,
value: Buffer
}

export type HeaderValueString = {
kind: HeaderKind.String,
value: string
}

export type HeaderValueBool = {
kind: HeaderKind.Bool,
value: boolean
}

export type HeaderValueInt32 = {
kind: HeaderKind.Int32,
value: number
}

export type HeaderValueInt64 = {
kind: HeaderKind.Int64,
value: bigint
}

export type HeaderValueInt128 = {
kind: HeaderKind.Int128,
value: Buffer // | ArrayBuffer // ?
}

export type HeaderValueUint32 = {
kind: HeaderKind.Uint32,
value: number
}

export type HeaderValueUint64 = {
kind: HeaderKind.Uint64,
value: bigint
}

export type HeaderValueUint128 = {
kind: HeaderKind.Uint128,
value: Buffer // | ArrayBuffer // ?
}

export type HeaderValueFloat = {
kind: HeaderKind.Float,
value: number
}

export type HeaderValueDouble = {
kind: HeaderKind.Double,
value: number
}

export type HeaderValue =
HeaderValueRaw |
HeaderValueString |
HeaderValueBool |
HeaderValueInt32 |
HeaderValueInt64 |
HeaderValueInt128 |
HeaderValueUint32 |
HeaderValueUint64 |
HeaderValueUint128 |
HeaderValueFloat |
HeaderValueDouble;

export type Headers = Record<string, HeaderValue>;
155 changes: 155 additions & 0 deletions src/wire/message/header.utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 1,155 @@

import {
boolToBuf,
int32ToBuf,
int64ToBuf,
uint32ToBuf,
uint64ToBuf,
floatToBuf,
doubleToBuf
} from '../number.utils.js';

import {
type HeaderValueRaw,
type HeaderValueString,
type HeaderValueBool,
type HeaderValueInt32,
type HeaderValueInt64,
type HeaderValueInt128,
type HeaderValueUint32,
type HeaderValueUint64,
type HeaderValueUint128,
type HeaderValueFloat,
type HeaderValueDouble,
HeaderKind
} from './header.type.js';


export type HeaderValue =
HeaderValueRaw |
HeaderValueString |
HeaderValueBool |
HeaderValueInt32 |
HeaderValueInt64 |
HeaderValueInt128 |
HeaderValueUint32 |
HeaderValueUint64 |
HeaderValueUint128 |
HeaderValueFloat |
HeaderValueDouble;

export type Headers = Record<string, HeaderValue>;


type BinaryHeaderValue = {
kind: HeaderKind,
value: Buffer
}

export const serializeHeaderValue = (header: HeaderValue) => {
const { kind, value } = header;
switch (kind) {
case HeaderKind.Raw: return value;
case HeaderKind.String: return Buffer.from(value);
case HeaderKind.Bool: return boolToBuf(value);
case HeaderKind.Int32: return int32ToBuf(value);
case HeaderKind.Int64: return int64ToBuf(value);
case HeaderKind.Int128: return value;
case HeaderKind.Uint32: return uint32ToBuf(value);
case HeaderKind.Uint64: return uint64ToBuf(value);
case HeaderKind.Uint128: return value;
case HeaderKind.Float: return floatToBuf(value);
case HeaderKind.Double: return doubleToBuf(value);
}
};

const createHeaderValue = (header: HeaderValue): BinaryHeaderValue => ({
kind: header.kind,
value: serializeHeaderValue(header)
});

export const serializeHeader = (key: string, v: BinaryHeaderValue) => {
const bKey = Buffer.from(key)

const b1 = Buffer.alloc(4);
b1.writeUInt32LE(bKey.length);

const b2 = Buffer.alloc(5);
b2.writeUInt8(v.kind);
b2.writeUInt8(v.value.length);

return Buffer.concat([
b1,
bKey,
b2,
v.value
]);
};

export const EMPTY_HEADERS = uint32ToBuf(0);

export const serializeHeaders = (headers?: Headers) => {
if (!headers)
return EMPTY_HEADERS;
return Object.keys(headers).reduce(
(ac: Buffer, c: string) => Buffer.concat([
ac, serializeHeader(c, createHeaderValue(headers[c]))]),
Buffer.alloc(0)
);
};



// export type InputHeaderValue = boolean | number | string | bigint | Buffer;
// export type InputHeaders = Record<string, InputHeaderValue>;

// const isFloat = (n: number) => n % 1 !== 0;

// export const createHeaderValueFloat = (v: number): HeaderValue =>
// ({ kind: HeaderKind.Float, value: floatToBuf(v) });

// export const createHeaderValueDouble = (v: number): HeaderValue =>
// ({ kind: HeaderKind.Double, value: doubleToBuf(v) });

// export const createHeaderValueInt32 = (v: number): HeaderValue =>
// ({ kind: HeaderKind.Int32, value: int32ToBuf(v) });

// export const createHeaderValueInt64 = (v: bigint): HeaderValue =>
// ({ kind: HeaderKind.Int64, value: int64ToBuf(v) });

// export const createHeaderValueUInt32 = (v: number): HeaderValue =>
// ({ kind: HeaderKind.Uint32, value: uint32ToBuf(v) });

// export const createHeaderValueUInt64 = (v: bigint): HeaderValue =>
// ({ kind: HeaderKind.Uint64, value: uint64ToBuf(v) });

// export const createHeaderValueBool = (v: boolean): HeaderValue =>
// ({ kind: HeaderKind.Bool, value: boolToBuf(v) });

// export const createHeaderValueBuffer = (v: Buffer): HeaderValue =>
// ({ kind: HeaderKind.Raw, value: v });

// export const createHeaderValueString = (v: string): HeaderValue =>
// ({ kind: HeaderKind.String, value: Buffer.from(v) });


// // guess wire type from js type ?
// const guessHeaderValue = (v: InputHeaderValue): HeaderValue => {
// if (typeof v === 'number') {
// if (isFloat(v))
// return createHeaderValueFloat(v);
// else
// return createHeaderValueInt32(v); // BAD KARMA
// }
// if (typeof v === 'bigint') {
// return createHeaderValueInt64(v); // BAD KARMA
// }
// if (typeof v === 'boolean')
// return createHeaderValueBool(v);
// if (typeof v === 'string')
// return createHeaderValueString(v);
// if (v instanceof Buffer)
// return createHeaderValueBuffer(v);

// throw new Error(`unable to serialize headers param ${v} - ${typeof v}`)
// }
Loading

0 comments on commit e1d39d8

Please sign in to comment.