Skip to content

Commit

Permalink
feat(pipeline): readable, transform & writable with async pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
rickybustillos committed Mar 6, 2022
1 parent 7f3c720 commit f6ea094
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 0 deletions.
49 changes: 49 additions & 0 deletions pipelineTransformExample.mjs
Original file line number Diff line number Diff line change
@@ -0,0 1,49 @@
import { pipeline, Readable, Transform } from 'stream';
import { promisify } from 'util';
import { createWriteStream } from 'fs';

/*
o pipeline funciona do mesmo modo que o pipe,
porém ele resolve problemas de vazamento de memória
e possui um método de callback */
/* o promisify permite dar um await na pipeline */
const pipelineAsync = promisify(pipeline);

const readableStream = Readable({
read () {
for ( let index = 0; index < 1e2; index ) {
const person = { id: Date.now() index, name: `Ricky-${index}` };
const payload = JSON.stringify(person);
this.push(payload);
}
this.push(null);
}
})

const writableMapToCSV = Transform({
transform(chunk, enconding, callback) {
const payload = JSON.parse(chunk);
const result = `${payload.id},${payload.name.toUpperCase()}\n`;
callback(null, result);
}
})

const setHeader = Transform({
transform(chunk, enconding, callback) {
this.counter = this.counter ?? 0;
if (this.counter) {
return callback(null, chunk);
}

this.counter = 1;
callback(null, "id, name\n".concat(chunk));
}
})

await pipelineAsync(
readableStream,
writableMapToCSV,
setHeader,
// process.stdout,
createWriteStream('my.csv')
)
31 changes: 31 additions & 0 deletions pipelineWritableExample.mjs
Original file line number Diff line number Diff line change
@@ -0,0 1,31 @@
import { pipeline, Readable, Writable } from 'stream';
import { promisify } from 'util';

/*
o pipeline funciona do mesmo modo que o pipe,
porém ele resolve problemas de vazamento de memória
e possui um método de callback */
/* o promisify permite dar um await na pipeline */
const pipelineAsync = promisify(pipeline);

const readableStream = Readable({
read: function () {
this.push("Hello world! 0");
this.push("Hello world! 1");
this.push("Hello world! 2");
this.push(null);
}
})

const writableStream = Writable({
write(chunk, enconding, callback) {
console.log('msg', chunk.toString());
callback();
}
})

await pipelineAsync(
readableStream,
// process.stdout,
writableStream,
)

0 comments on commit f6ea094

Please sign in to comment.