Last active
October 30, 2016 11:16
-
-
Save bencooling/0fec79c8d0aaec4e2361 to your computer and use it in GitHub Desktop.
Node, basic read, transform & write streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var stream = require('stream'); | |
var a = new stream.Readable({ | |
read: function(){} | |
}); | |
var b = new stream.Transform({ | |
transform: function (chunk, encoding, done) { | |
var str = chunk.toString() + 'bah'; | |
this.push(str); | |
done(); | |
} | |
}); | |
var c = new stream.Writable({ | |
write: function(chunk, encoding, done){ | |
console.log(chunk.toString()); | |
} | |
}); | |
a.push('foo'); | |
a.pipe(b).pipe(c); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var stream = require('stream') | |
, a = new stream.Readable() | |
, b = new stream.Transform( { objectMode: true } ) | |
, c = new stream.Writable() | |
; | |
a._read = function(){} | |
b._transform = function (chunk, encoding, done) { | |
var str = chunk.toString() + 'bah'; | |
this.push(str); | |
done(); | |
}; | |
c._write = function(chunk, encoding, done){ | |
console.log(chunk.toString()); | |
} | |
a.push('foo'); | |
a.pipe(b).pipe(c); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
const fs = require('fs'); | |
const dest = fs.createWriteStream('./dest2.txt'); | |
const CHUNK_SIZE = 10; // 10 byte chunk | |
const dataRead = (buf, offset = 0) => new stream.Readable({ | |
highWaterMark: 1, | |
read(size){ | |
console.log( buf.slice( offset, ( offset + size ) ) ); | |
if ( offset < buf.length ) { | |
this.push( buf.slice( offset, ( offset + size ) ) ); | |
offset += size; | |
} | |
if ( offset >= buf.length ) { | |
this.push( null ); | |
} | |
} | |
}); | |
// Zero filled buffers are not written to file | |
const buf = Buffer.alloc(CHUNK_SIZE).fill('abc', 0, 2); | |
// const buf = Buffer.alloc(CHUNK_SIZE).fill('abcdefghijklmonp'); | |
console.log(buf); | |
dataRead(buf).pipe(dest); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// chunkBuffer module! | |
module.export = ({ size, cb }) => { | |
const cup = Buffer.alloc(size); | |
let level = 0 | |
let i = 1; | |
const drinkCup = water => { | |
cb(cup.slice(0, level), i); | |
i++; | |
if (water){ // on('data') | |
const remaining = cup.length - level; // bytes left to fill cup to top | |
const outCup = water.slice(remaining); // Get spilled water | |
cup.fill(outCup, 0); // Start from beginning of cup buffer | |
level = outCup.length; // Set the level back down to how much had spilled | |
} | |
} | |
// chunkBuffer | |
const fillCup = water => { | |
const newLevel = level + water.length; // byte level after water added | |
cup.fill(water, level); | |
if (newLevel >= cup.length){ // does it spill over? | |
return drinkCup(water); | |
} | |
level = newLevel; | |
} | |
return { | |
chunkBuffer: fillCup, // on('data') | |
completeChunk: drinkCup, // on('end') | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
const fs = require('fs'); | |
const dest = fs.createWriteStream('./dest2.txt'); | |
const CHUNK_SIZE = 10; // 10 byte chunk | |
// Fill our 1MB cup | |
// NOTE: cup is a Buffer, water is a Buffer, level is an integer | |
// cupsize = 10, level = 8, newLevel = 12, water = 4 | |
const fillCup = (cup, level, cupSize, i) => water => { | |
const remaining = cupSize - level; | |
const newLevel = level + water.length; | |
const isComplete = (newLevel >= cupSize); | |
const inCup = water.splice(0, remaining); | |
cup.fill(inCup, level); | |
level = newLevel; | |
if (isComplete){ | |
onChunkFull(cup, i); | |
i++; | |
const outCup = water.splice(remaining); | |
cup.fill(outCup, 0) | |
level = outCup.length; | |
} | |
})(Buffer.alloc(CHUNK_SIZE), 0, CHUNK_SIZE, 1); | |
// TODO: s3.put | |
const onChunkFull = (cup, i) => { | |
console.log(`chunk-${i} `, cup.toString()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var bufferStream = new stream.PassThrough(); | |
bufferStream.end( buffer ); | |
// bufferStream.pipe( process.stdout ); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
const fs = require('fs'); | |
const destination = fs.createWriteStream('./dest.txt'); | |
const dataRead = new stream.Transform({ | |
highWaterMark: 32 * 1024, | |
transform: function (chunk, encoding, done) { | |
console.log('dataRead', chunk); | |
this.push(chunk); | |
done(); | |
} | |
}); | |
const dataStream = fs.createReadStream('./data.txt', { | |
highWaterMark: 16 * 1024 // (32KB) // 100 // (100 bytes) | |
}); | |
dataStream.on('data', function(chunk) { | |
console.log('dataStream', chunk); | |
}); | |
dataStream.pipe(dataRead).pipe(destination);; | |
// // // 201 101 10901 485604 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const src = fs.createReadStream('./data.txt', { | |
highWaterMark: 2, | |
}); | |
src.on('error', console.error); | |
// // Simplified Constructor | |
// const src = new Readable({ | |
// highWaterMark: 1, | |
// read() {}, // calls this.push | |
// }); | |
// src.push('your text here'); // skips highWaterMark? | |
// src.push(null); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment