Skip to content

Instantly share code, notes, and snippets.

@bernardmcmanus
Last active October 18, 2016 13:14
Show Gist options
  • Save bernardmcmanus/a0eb96d5b562f055a86c5f2a5202b1fc to your computer and use it in GitHub Desktop.
Save bernardmcmanus/a0eb96d5b562f055a86c5f2a5202b1fc to your computer and use it in GitHub Desktop.
Streaming lots of HTTP requests with highland and request
// https://github.com/caolan/highland
// https://github.com/request/request
'use strict';
const util = require('util');
const colors = require('colors'); // @1.1.2
const request = require('request'); // @2.69.0
const highland = require('highland'); // @2.7.4
const Bluebird = require('bluebird'); // @3.3.4
const baseUrl = 'http://md5.jsontest.com/?text=test';
const requestCount = 10;
const maxSockets = 5;
function getUrlGenerator(max) {
let index = 0;
return (push, next) => {
if (index < max) {
push(null, `${baseUrl}${index}`);
index++;
next();
} else {
push(null, highland.nil);
}
};
}
function wrappedRequest(url) {
const req = request.get(url);
req.on('response', () => console.log(`got response for ${url}`.cyan));
console.log(`opened request for ${url}`.gray);
return highland(req);
}
function streamToPromise(stream) {
return new Bluebird((resolve, reject) => {
stream.stopOnError(reject).toArray(resolve);
});
}
function printResult(result) {
console.log('\nDONE\n'.green);
console.log(util.inspect(result, { colors: true }));
console.log('\n\n\n');
}
function runImplementation(fn) {
console.log(`\nRUNNING ${fn.name.toUpperCase()} IMPLEMENTATION`.magenta);
console.log(`==================================================`.magenta);
console.time(`\n${fn.name}`);
return fn()
.tap(() => console.timeEnd(`\n${fn.name}`))
.then(printResult);
}
// ========== IMPLEMENTATIONS ==========
// guarantees order
function sequential() {
return highland(getUrlGenerator(requestCount))
.map(url => wrappedRequest(url))
.sequence()
.flatten()
.through(streamToPromise)
.map(buffer => JSON.parse(buffer));
}
// guarantees order
function batched() {
return highland(getUrlGenerator(requestCount))
.map(url => wrappedRequest(url))
.batch(maxSockets)
.flatten()
.through(streamToPromise)
.map(buffer => JSON.parse(buffer));
}
// guarantees order
function parallel() {
return highland(getUrlGenerator(requestCount))
.map(url => wrappedRequest(url))
.parallel(maxSockets)
.flatten()
.through(streamToPromise)
.map(buffer => JSON.parse(buffer));
}
// no guaranteed order
function limited() {
return highland(getUrlGenerator(requestCount))
.map(url => wrappedRequest(url))
.mergeWithLimit(maxSockets)
.through(streamToPromise)
.map(buffer => JSON.parse(buffer));
}
// no guaranteed order
function hammertime() {
return highland(getUrlGenerator(requestCount))
.map(url => wrappedRequest(url))
.merge()
.through(streamToPromise)
.map(buffer => JSON.parse(buffer));
}
// =====================================
Bluebird.mapSeries([
sequential,
batched,
parallel,
limited,
hammertime
], runImplementation);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment