Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save 4xle/e6e47081382c5af90bdd48d040317263 to your computer and use it in GitHub Desktop.
Save 4xle/e6e47081382c5af90bdd48d040317263 to your computer and use it in GitHub Desktop.
Working example of Node.js readable and writable stream implementation
'use strict';
// This is a working example of a readable stream sending rows to insert into
// SQL Server.
const Readable = require('stream').Readable;
const mssql = require('mssql');
const MsSqlStreamInsert = require('../../lib/source/mssql-stream-insert');
const showProgress = require('../../lib/transform/show-progress')();
const options = require('../../.config');
const AnySourceReadable = require('../../lib/source/any-source-readable');
options.mssql.database = 'RE_Reporter';
options.verbose = false;
options.tableName = 'sf_contact';
options.tableDefinition = [
['re_legacy_id', mssql.Int, {
nullable: false
}],
['sf_id', mssql.VarChar(18), {
nullable: false
}],
['re_constiuent_id', mssql.VarChar(20), {
nullable: true
}]
];
const msSqlStreamInsert = new MsSqlStreamInsert(options);
// Trivial implementation of a source that sends rows till max is reached.
class Source {
constructor(max) {
this.i = 0;
this.max = max;
this.stopped = false;
this.intervalId = null;
}
// readStart() gets called every time _read() is called. which could be
// millions of times. we don't need to do anything here except say stopped =
// false and the very first time we are called, kick off our data generating
// interval.
readStart() {
this.stopped = false;
if (!this.intervalId) {
// Kick off the data generating function on an interval.
this.intervalId = setInterval(() => {
// If we are stopped, do nothing.
if (!this.stopped) {
// as long as we are not stopped we keep sending data.
if (this.i++ < this.max) {
showProgress();
this.onData({
re_legacy_id: this.i,
sf_id: `SF_${this.i}`,
re_constiuent_id: `RE_${this.i}`
});
} else {
// once we have sent all our rows, clear the interval that sends
// data and tell the downstream that we are done by calling the
// onEnd() method.
clearInterval(this.intervalId);
this.onEnd();
}
}
}, 2);
}
}
// This will only be called when the downstream can't handle any more data.
// So we nicely stop sending it data till it calls _read() again.
readStop() {
console.log('readStop');
this.stopped = true;
}
}
// The following shows that we correctly get back pressure on the stream and
// are able to start pushing again when the downstream is ready.
// jstein@VORLPC105 MINGW64 ~/sfloader (master)
// $ node spottest/john/mssql-stream-insert-10000000.js
// .readStop
// .readStop
// .readStop
// .readStop
// .readStop
// Look Ma! No callbacks! Just pure note streaming joy.
// MsSqlStreamInsert logs into SQL Server, drops the table, creates it,
// and when it's all good and ready starts accepting input from AnySourceReadable.
new AnySourceReadable(
new Source(3333), {
objectMode: true,
highWaterMark: 3
})
.pipe(msSqlStreamInsert)
.on('error', (err) => {
throw err;
})
.on('finish', () => {
console.log('finish');
// TODO must be something else i need to close because we are't quitting
// immediately.
mssql.close();
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment