streamify
is a microframework for designing microservices in an input->transform->output fashion. It can be used as a global application or a Docker image and the neat thing is that entire pipelines and any of their parts can be easily reused, modified and shared.
streamify
harnesses the power of native Node Streams and Transformers to build services with well-defined and configurable throughput. Read about highwatermark and backpressure concepts to get maximum value from this rendition of Pipes&Filters design pattern.
Initially demonstrated with NATS for inter-service communication, the project's architecture allows for scalability and integration with various transport systems like PostgreSQL, Kafka, and gRPC.
See example of configuring a microservice that generates a song from csv file.
npm i -g node-streamify
then call streamify
from command line.
Check streamify FILENAME COPY
, which should work just like cp
or copy
to see if it works.
If you have NATS up and running (listening on, say, port 4222) try streamify FILENAME nats:4222/unique_topic
to make a file available for download.
You can then download it using streamify nats:4222/unique_topic copy.txt
, on another machine or terminal window, wherever NATS instance is available, using a configuration file with authorization credentials if you so please.
<run> [[-w worker]] <source> [target]
<run>
is your command of choise:streamify
if you're using a globalnpm i -g node-streamify
installnode dist/main
is you're building from sourcedocker run --rm IMAGE
if you have a docker image configured
-w --worker
a (chain of) workload(s) implementing{ Transform } from 'node:stream'
interface. Optional, multiple choice, order matters. Built-in workers include:-w from:PACKAGE:EXPORTED
or-w from:FILENAME
: custom workers syntax, see command and docker usage examples-w row2obj
: (object mode) converts incoming arrays to objects with properties described in first entry (header)-w aggregate
: aggregates input up until an (optional) threshhold. A call without parameters will aggregate input until source runs out then pass it to next worker. Invoking-w aggregate:4
will split input into a stream of arrays of length 4.-w extract:PROPERTY
: (object mode) passes specified property to next worker-w toJSON
: useful for switching from object mode-w gzip
: maps torequire('node:zlib').createGzip()
-w slow [(N,n-N)]
: slows down execution by specified N of ms usingsetTimeout
<source> [target]
- If only one option is given it is considered to be the source. The syntax isPROTOCOL:OPTIONS
. If the no colon is present protocol defaults tofile
, empty value defaults tostd
(stdin for source, stdout for target). Built-in protocols are:std
andfile
: duhnats
: An open source data layer for microservices architecturesdocker compose -f "./usecases/nats/docker.compose.yml" up -d --build
to see example
- a song-generation prompt microservice one liner:
streamify --verbose ./temp/chord-progressions.csv -w from:csv-parse:parse row2obj aggregate generate-a-song extract:prompt ../temp/last-prompt.txt
. - file tranfer and transform via NATS
docker compose -f "./usecases/nats/docker.compose.yml" up -d --build
- playground:
- up Writer & NATS
docker compose -f "docker.compose.yml" up writer nats
- finetune HighWaterMark until you hit max payload configured in NATS
-
docker run --rm -e NATS_OUT_HWM=800 -v ${PWD}/temp:/home/node/temp --net=host nats-reader ../temp/sample.txt nats:4222/file-transfer
- or
NATS_OUT_HWM=800 streamify temp/sample.txt nats:4222/file-transfer -w slow:100-5000 --verbose
if you have streamify globally installed
- or
- check file integrity when transfer is complete:
md5sum temp/sample.txt temp/copy-over-nats.txt
- add
-w gzip
and check if resulting file is a valid zip
- up Writer & NATS
- homemade
cp
:streamify FILE COPY
docker run --rm -v ${PWD}/temp:/home/node/temp reader ../temp/sample.txt ../temp/copy2.txt
- homemade
cat
:streamify FILE
docker run --rm -v ${PWD}/temp:/home/node/temp reader ../temp/sample.txt
sed
with A LOT of extra hoops (add-w
's as you please):FILE | streamify std RESULTFILE
tr -dc 'a-zA-Z0-9' </dev/urandom | head -c 10K | docker run -i --rm -v ${PWD}/temp:/home/node/temp reader std ../temp/doc2
- local
cd streamify && npm run start:dev -- -- [[-w worker]] <source> [target]
npm run start:dev -- -- ../temp/sample.txt nats:4222/file-transfer
- reader
- docker
docker compose -f "docker.compose.debug.yml" up -d --build
- create a big file:
head -c 50M /dev/urandom > temp/sample.txt
tr -dc 'a-zA-Z0-9\n' </dev/urandom | head -c 50M > temp/sample.txt
Next milestone is integrating csv capabilities and pg queries and writes.