Skip to content

Instantly share code, notes, and snippets.

@langpavel
Created October 26, 2017 23:21
Show Gist options
  • Save langpavel/bc02d9ca024760aa2acdc354bfcaf7f6 to your computer and use it in GitHub Desktop.
Save langpavel/bc02d9ca024760aa2acdc354bfcaf7f6 to your computer and use it in GitHub Desktop.
Node Streams: HashThroughStream, ObjectStreamToJSON and CsvTransformStream
import { Transform } from 'stream';
export default class CsvTransformStream extends Transform {
constructor(options) {
super({
decodeStrings: false,
readableObjectMode: true,
});
this.rowFlushed = true;
this.currentColumn = [];
this.currentRow = [];
this.columns = options && options.columns;
this.ignoreUnnamed = options && options.ignoreUnnamed;
this.emptyAsNull = options && options.emptyAsNull;
}
flushRow() {
if (!this.rowFlushed) {
this.currentRow.push(this.currentColumn.join(''));
this.currentColumn = [];
if (this.columns) {
this.push(
this.currentRow.reduce((result, value, index) => {
const name = this.columns[index];
if (!name && this.ignoreUnnamed) {
return result;
}
if (this.emptyAsNull && value === '') {
// eslint-disable-next-line no-param-reassign
result[name || `column${index}`] = null;
} else {
// eslint-disable-next-line no-param-reassign
result[name || `column${index}`] = value;
}
return result;
}, {}),
);
} else {
this.push(this.currentRow);
}
this.currentRow = [];
this.rowFlushed = true;
}
}
_transform(data, encoding, callback) {
if (encoding !== 'utf8') {
throw new Error('Use utf8 encoding please');
}
const length = data.length;
for (let i = 0; i < length; i += 1) {
const char = data[i];
switch (char) {
case '\uFEFF': {
break;
}
case ';': {
this.rowFlushed = false;
this.currentRow.push(this.currentColumn.join(''));
this.currentColumn = [];
break;
}
case '\r':
case '\n': {
this.flushRow();
break;
}
default: {
this.rowFlushed = false;
this.currentColumn.push(char);
}
}
}
callback();
}
_flush(callback) {
this.flushRow();
callback();
}
}
/* eslint-disable no-underscore-dangle */
import { Transform } from 'stream';
import { createHash } from 'crypto';
export default class HashThroughStream extends Transform {
constructor(options) {
const { hash, digest, ...opts } = options || {};
super(opts);
this.digest = null;
this._hash = createHash(hash || 'sha512');
this._digest = digest || 'hex';
this._size = 0;
}
_transform(chunk, encoding, cb) {
this._hash.update(chunk);
this._size += chunk.length;
cb(null, chunk);
}
_flush(cb) {
this.digest = this._hash.digest(this._digest);
cb();
}
getHash() {
if (this.digest === null) {
throw new Error('Stream is not flushed');
}
return this.digest;
}
getSize() {
if (this.digest === null) {
throw new Error('Stream is not flushed');
}
return this._size;
}
}
import { Transform } from 'stream';
export default class ObjectStreamToJSON extends Transform {
constructor(options) {
super({
writableObjectMode: true,
});
this.push('[\n');
this.pretty = options && options.pretty;
}
_transform(data, encoding, callback) {
this.push(JSON.stringify(data, null, this.pretty ? 2 : null));
this.push(',\n');
callback();
}
_flush(callback) {
this.push('\n]');
callback();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment