- Read records from database.
- For each record issue HTTP GET for a webservice and recieve a response.
- Store resonses to corresponding document back.
-
-
Save alexbeletsky/58239003a4852a2a4a99 to your computer and use it in GitHub Desktop.
Async.js vs Rx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var async = require('async'); | |
var mongodb = require('mongojs'); | |
var request = require('request'); | |
var mongo = mongodb('mongodb://localhost:27017/db', ['accounts']); | |
var url = 'http://httpbin.org/get'; | |
async.waterfall([ | |
fetchAccounts, | |
resolveAccounts, | |
storeAccounts | |
], function (err) { | |
if (err) { | |
return console.err('failed', err); | |
} | |
console.log('completed'); | |
}); | |
function fetchAccounts(callback) { | |
mongo.accounts.find({}).toArray(callback); | |
} | |
function resolveAccounts(accounts, callback) { | |
var resolve = function (account, next) { | |
request({url: url, qs: {id: account._id}}, function (err, response, body) { | |
if (err) { | |
return next(err); | |
} | |
callback(null, {id: account._id, result: body}); | |
}); | |
}; | |
async.map(accounts, resolve, callback); | |
} | |
function storeResults(results, callback) { | |
var store = function (results, next) { | |
mongo.findAndModify({ | |
query: {_id: new mongo.ObjectId(result.id)}, | |
update: {$set: {result: result}}, | |
'new': true | |
}, next); | |
}; | |
async.map(results, store, callback); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var rx = require('rx'); | |
var mongodb = require('mongojs'); | |
var request = require('request'); | |
var mongo = mongodb('mongodb://localhost:27017/db', ['accounts']); | |
var url = 'http://httpbin.org/get'; | |
var accountsStream = function() { | |
return rx.Observable.create(function (observer) { | |
var accounts = mongo.accounts.find({}); | |
accounts.on('data', function (doc) { | |
observer.onNext(doc); | |
}); | |
accounts.on('error', function (err) { | |
observer.onError(err); | |
}); | |
accounts.on('end', function () { | |
observer.onCompleted(); | |
}); | |
}); | |
}; | |
var resolveAccounts = accountsStream() | |
.flatMap(function (account) { | |
return rx.Observable.create(function (observer) { | |
request({url: url, qs: {id: account._id}, json: true}, function (err, response, body) { | |
if (err) { | |
return observer.onError(err); | |
} | |
observer.onNext({id: account._id, result: body}); | |
}); | |
}); | |
}); | |
var storeStream = resolveAccounts | |
.flatMap(function (result) { | |
return rx.Observable.create(function (observer) { | |
mongo.accounts.findAndModify({ | |
query: {_id: new mongo.ObjectId(result.id)}, | |
update: {$set: {result: result}}, | |
'new': true | |
}, function (err, updated) { | |
if (err) { | |
return observer.onError(err); | |
} | |
observer.onNext(updated); | |
}); | |
}); | |
}); | |
storeStream.subscribe(function (result) { | |
console.log('completed', result); | |
}, function (err) { | |
console.err('failed'); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment