Skip to content

Instantly share code, notes, and snippets.

@alexbeletsky
Last active May 17, 2017 07:32
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexbeletsky/58239003a4852a2a4a99 to your computer and use it in GitHub Desktop.
Save alexbeletsky/58239003a4852a2a4a99 to your computer and use it in GitHub Desktop.
Async.js vs Rx

Workflow Description

  1. Read records from database.
  2. For each record issue HTTP GET for a webservice and recieve a response.
  3. Store resonses to corresponding document back.
var async = require('async');
var mongodb = require('mongojs');
var request = require('request');
var mongo = mongodb('mongodb://localhost:27017/db', ['accounts']);
var url = 'http://httpbin.org/get';
async.waterfall([
fetchAccounts,
resolveAccounts,
storeAccounts
], function (err) {
if (err) {
return console.err('failed', err);
}
console.log('completed');
});
function fetchAccounts(callback) {
mongo.accounts.find({}).toArray(callback);
}
function resolveAccounts(accounts, callback) {
var resolve = function (account, next) {
request({url: url, qs: {id: account._id}}, function (err, response, body) {
if (err) {
return next(err);
}
callback(null, {id: account._id, result: body});
});
};
async.map(accounts, resolve, callback);
}
function storeResults(results, callback) {
var store = function (results, next) {
mongo.findAndModify({
query: {_id: new mongo.ObjectId(result.id)},
update: {$set: {result: result}},
'new': true
}, next);
};
async.map(results, store, callback);
}
var rx = require('rx');
var mongodb = require('mongojs');
var request = require('request');
var mongo = mongodb('mongodb://localhost:27017/db', ['accounts']);
var url = 'http://httpbin.org/get';
var accountsStream = function() {
return rx.Observable.create(function (observer) {
var accounts = mongo.accounts.find({});
accounts.on('data', function (doc) {
observer.onNext(doc);
});
accounts.on('error', function (err) {
observer.onError(err);
});
accounts.on('end', function () {
observer.onCompleted();
});
});
};
var resolveAccounts = accountsStream()
.flatMap(function (account) {
return rx.Observable.create(function (observer) {
request({url: url, qs: {id: account._id}, json: true}, function (err, response, body) {
if (err) {
return observer.onError(err);
}
observer.onNext({id: account._id, result: body});
});
});
});
var storeStream = resolveAccounts
.flatMap(function (result) {
return rx.Observable.create(function (observer) {
mongo.accounts.findAndModify({
query: {_id: new mongo.ObjectId(result.id)},
update: {$set: {result: result}},
'new': true
}, function (err, updated) {
if (err) {
return observer.onError(err);
}
observer.onNext(updated);
});
});
});
storeStream.subscribe(function (result) {
console.log('completed', result);
}, function (err) {
console.err('failed');
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment