Last active
January 16, 2017 03:45
-
-
Save ORESoftware/6fe442796fddb9b09db26670e7b7589c to your computer and use it in GitHub Desktop.
Comparing Observables with Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const stream = require('stream'); | |
function getReadableStream(fn) { | |
return new stream.Readable({ | |
objectMode: true, | |
read: function (n) { | |
// when read fires the stream API is telling us | |
// consumers (writables) want more data | |
// the stream API should take a callback | |
// but instead we need to call this.push | |
// to push more data onto the readable's buffer | |
fn(n, (err, chunk) => { | |
this.push(chunk); | |
}) | |
} | |
}); | |
} | |
let items = Buffer.from('abcdefg'); | |
function readLocalBuffer(x) { | |
const b = items.slice(0, x); | |
//side effect city | |
items = items.slice(x, items.length); | |
return b; | |
} | |
let strm = getReadableStream(function (n, cb) { | |
// verbosity is for clarity | |
// normally we should be reading data from some source outside | |
// our program but here we just use a buffer in memory | |
process.nextTick(function () { | |
const bytes = readLocalBuffer(1); | |
const bytesRead = bytes.length; | |
cb(null, bytesRead > 0 ? bytes : null); | |
}); | |
}); | |
strm | |
.on('data', (d) => { | |
console.log(' => readable next data => ', String(d)); | |
}) | |
.on('readable', () => { | |
console.log(' readable stream is readable.'); | |
}) | |
.on('close', (e) => { | |
console.log(' => readable is closed'); | |
}) | |
.on('error', (e) => { | |
console.log(' => readable error => ', e.stack || e); | |
}) | |
.on('end', () => { | |
console.log(' readable stream all ended here.'); | |
}); | |
function getTransformStream() { | |
return new stream.Transform({ | |
transform: function (chunk, encoding, cb) { | |
setTimeout(function () { | |
cb(null, String(chunk) + '-transformed'); | |
}, 100); | |
} | |
}); | |
} | |
strm = strm | |
.pipe(getTransformStream()); | |
strm | |
.on('data', (d) => { | |
console.log(' => transform next data => ', String(d)); | |
}) | |
.on('readable', () => { | |
console.log(' transform stream is readable.'); | |
}) | |
.on('close', (e) => { | |
console.log(' => transform is closed'); | |
}) | |
.on('error', (e) => { | |
console.log(' => transform error => ', e.stack || e); | |
}) | |
.on('end', () => { | |
console.log(' transform stream all ended here.'); | |
}) | |
.on('finish', () => { | |
console.log(' transform is all finished here.'); | |
}); | |
function getWritable() { | |
return new stream.Transform({ | |
write: function (chunk, encoding, cb) { | |
// we don't actually write anything out | |
// but this is where you would write some destination | |
// and fire the callback when you are done writing | |
setTimeout(cb, 100); | |
} | |
}); | |
} | |
strm = strm | |
.pipe(getWritable()); | |
strm | |
.on('drain', (d) => { | |
console.log(' => writable is drained => ', String(d)); | |
}) | |
.on('close', (d) => { | |
console.log(' => writable is closed => ', String(d)); | |
}) | |
.on('error', (e) => { | |
console.log(' => writable error => ', e.stack || e); | |
}) | |
.on('finish', () => { | |
console.log(' writable is all finished here.'); | |
}); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment