Skip to content

Instantly share code, notes, and snippets.

@mieszko4
Last active April 20, 2016 15:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mieszko4/5595b5c18a7ca70a62c4cb2d35aa87e0 to your computer and use it in GitHub Desktop.
Save mieszko4/5595b5c18a7ca70a62c4cb2d35aa87e0 to your computer and use it in GitHub Desktop.
JsMeetup - Streaming in javascript - Stream apppoach
const util = require('util');
const Transform = require('stream').Transform;
const intercomDrinks = require('intercom-drinks').default;
function Stream() {
if (!(this instanceof Stream)) {
return new Stream();
}
Transform.call(this, {
writableObjectMode: true
});
}
util.inherits(Stream, Transform);
Stream.prototype._transform = function (customer, encoding, done) {
//7. pick name and id, 8. join LF
this.push(`${customer.user_id}:${customer.name}\n`);
done();
};
module.exports = Stream;
const util = require('util');
const Transform = require('stream').Transform;
const intercomDrinks = require('intercom-drinks').default;
function Stream(latitude, longitude, radius) {
if (!(this instanceof Stream)) {
return new Stream();
}
Transform.call(this, {
readableObjectMode: true,
writableObjectMode: true
});
this._latitude = latitude;
this._longitude = longitude;
this._radius = radius;
}
util.inherits(Stream, Transform);
Stream.prototype._transform = function (customer, encoding, done) {
// 5. calculate distance and filter
if (intercomDrinks.isNearbyCustomer(this._latitude, this._longitude, this._radius, customer)) {
this.push(customer);
}
done();
};
module.exports = Stream;
const SortedArray = require('sorted-array');
const ThrottleStream = require('m-throttle-stream');
const JsonStream = require('m-json-stream');
const intercomDrinks = require('intercom-drinks').default;
const request = require('request');
const fs = require('fs');
const ValidateStream = require('./validate.stream.js');
const FilterStream = require('./filter.stream.js');
const CustomerToStringStream = require('./customer-to-string.stream.js');
//define parameters
const radius = 100000; //in meters
const latitude = 53.3381985;
const longitude = -6.2592576;
const onlineSourceUrl = 'https://gist.githubusercontent.com/brianw/19896c50afa89ad4dec3/raw/6c11047887a03483c50017c1d451667fd62a53ca/gistfile1.txt';
const offlineSourceFilename = 'input.jsonl';
const onlineSource = request(onlineSourceUrl); //1. read
// const offlineSource = fs.createReadStream(offlineSourceFilename);
const destination = onlineSource
/* THROTTLE
.pipe(new ThrottleStream({
maxTimeout: 100,
bufferSize: 10000 //should be higher than chunks comming in
}))
*/
.pipe(new JsonStream({strict: false})) //2: split and 3. parse customer
.pipe(new ValidateStream()) //4. validate
.pipe(new FilterStream(latitude, longitude, radius)) // 5. calculate distance and filter
;
///* NO-SORT
destination
.pipe(new CustomerToStringStream()) // 7. pick name and id, 8. join LF
.pipe(process.stdout)
;
//*/
/* SORT
//prepare result array for insert sort
const nearbyCustomers = new SortedArray([], (a, b) => {
return a.user_id - b.user_id; //ascending
});
//on each row representing object
destination.on('data', (customer) => {
nearbyCustomers.insert(customer); //insert to sorted array
})
destination.on('end', () => {
//console.log('total pushed', nearbyCustomers.array.length);
console.log(nearbyCustomers.array.map(intercomDrinks.printCustomer).join('\n'));
});
*/
const util = require('util');
const Transform = require('stream').Transform;
const intercomDrinks = require('intercom-drinks').default;
function Stream() {
if (!(this instanceof Stream)) {
return new Stream();
}
Transform.call(this, {
readableObjectMode: true,
writableObjectMode: true
});
}
util.inherits(Stream, Transform);
Stream.prototype._transform = function (object, encoding, done) {
var customer
try { //4. validate
customer = intercomDrinks.validateCustomer(object);
} catch (e) {
//ignore
return;
}
if (typeof customer === 'object') {
this.push(customer);
}
done();
};
module.exports = Stream;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment