Skip to content

Instantly share code, notes, and snippets.

@bencooling
Last active October 30, 2016 11:16
Show Gist options
  • Save bencooling/0fec79c8d0aaec4e2361 to your computer and use it in GitHub Desktop.
Save bencooling/0fec79c8d0aaec4e2361 to your computer and use it in GitHub Desktop.
Node, basic read, transform & write streams
var stream = require('stream');
var a = new stream.Readable({
read: function(){}
});
var b = new stream.Transform({
transform: function (chunk, encoding, done) {
var str = chunk.toString() + 'bah';
this.push(str);
done();
}
});
var c = new stream.Writable({
write: function(chunk, encoding, done){
console.log(chunk.toString());
}
});
a.push('foo');
a.pipe(b).pipe(c);
var stream = require('stream')
, a = new stream.Readable()
, b = new stream.Transform( { objectMode: true } )
, c = new stream.Writable()
;
a._read = function(){}
b._transform = function (chunk, encoding, done) {
var str = chunk.toString() + 'bah';
this.push(str);
done();
};
c._write = function(chunk, encoding, done){
console.log(chunk.toString());
}
a.push('foo');
a.pipe(b).pipe(c);
const stream = require('stream');
const fs = require('fs');
const dest = fs.createWriteStream('./dest2.txt');
const CHUNK_SIZE = 10; // 10 byte chunk
const dataRead = (buf, offset = 0) => new stream.Readable({
highWaterMark: 1,
read(size){
console.log( buf.slice( offset, ( offset + size ) ) );
if ( offset < buf.length ) {
this.push( buf.slice( offset, ( offset + size ) ) );
offset += size;
}
if ( offset >= buf.length ) {
this.push( null );
}
}
});
// Zero filled buffers are not written to file
const buf = Buffer.alloc(CHUNK_SIZE).fill('abc', 0, 2);
// const buf = Buffer.alloc(CHUNK_SIZE).fill('abcdefghijklmonp');
console.log(buf);
dataRead(buf).pipe(dest);
// chunkBuffer module!
module.export = ({ size, cb }) => {
const cup = Buffer.alloc(size);
let level = 0
let i = 1;
const drinkCup = water => {
cb(cup.slice(0, level), i);
i++;
if (water){ // on('data')
const remaining = cup.length - level; // bytes left to fill cup to top
const outCup = water.slice(remaining); // Get spilled water
cup.fill(outCup, 0); // Start from beginning of cup buffer
level = outCup.length; // Set the level back down to how much had spilled
}
}
// chunkBuffer
const fillCup = water => {
const newLevel = level + water.length; // byte level after water added
cup.fill(water, level);
if (newLevel >= cup.length){ // does it spill over?
return drinkCup(water);
}
level = newLevel;
}
return {
chunkBuffer: fillCup, // on('data')
completeChunk: drinkCup, // on('end')
}
}
const stream = require('stream');
const fs = require('fs');
const dest = fs.createWriteStream('./dest2.txt');
const CHUNK_SIZE = 10; // 10 byte chunk
// Fill our 1MB cup
// NOTE: cup is a Buffer, water is a Buffer, level is an integer
// cupsize = 10, level = 8, newLevel = 12, water = 4
const fillCup = (cup, level, cupSize, i) => water => {
const remaining = cupSize - level;
const newLevel = level + water.length;
const isComplete = (newLevel >= cupSize);
const inCup = water.splice(0, remaining);
cup.fill(inCup, level);
level = newLevel;
if (isComplete){
onChunkFull(cup, i);
i++;
const outCup = water.splice(remaining);
cup.fill(outCup, 0)
level = outCup.length;
}
})(Buffer.alloc(CHUNK_SIZE), 0, CHUNK_SIZE, 1);
// TODO: s3.put
const onChunkFull = (cup, i) => {
console.log(`chunk-${i} `, cup.toString());
}
var bufferStream = new stream.PassThrough();
bufferStream.end( buffer );
// bufferStream.pipe( process.stdout );
const stream = require('stream');
const fs = require('fs');
const destination = fs.createWriteStream('./dest.txt');
const dataRead = new stream.Transform({
highWaterMark: 32 * 1024,
transform: function (chunk, encoding, done) {
console.log('dataRead', chunk);
this.push(chunk);
done();
}
});
const dataStream = fs.createReadStream('./data.txt', {
highWaterMark: 16 * 1024 // (32KB) // 100 // (100 bytes)
});
dataStream.on('data', function(chunk) {
console.log('dataStream', chunk);
});
dataStream.pipe(dataRead).pipe(destination);;
// // // 201 101 10901 485604
const src = fs.createReadStream('./data.txt', {
highWaterMark: 2,
});
src.on('error', console.error);
// // Simplified Constructor
// const src = new Readable({
// highWaterMark: 1,
// read() {}, // calls this.push
// });
// src.push('your text here'); // skips highWaterMark?
// src.push(null);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment