Created
March 16, 2015 20:46
-
-
Save ameensol/b241ed3225b421fc0d33 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* 3/13/2015 NH2 | |
* listMembers.js | |
* For a given list, sync its members to Twitter's values | |
*/ | |
var stream = require('readable-stream') | |
var util = require('util') | |
var after = require('after') | |
var through = require('through2') | |
var streamify = require('stream-array') | |
var parallel = require('concurrent-transform') | |
var log = require('../logs')() | |
var Twitter = require('../twitter') | |
var utility = require('../utility') | |
var ListMembers = module.exports = function (options, models) { | |
this.models = models | |
options || (options = {}) | |
var hwm = options.hwm || 16 | |
// how many concurrent db create/associate/delete ops to run | |
this.conc = options.conc || 1 | |
stream.Transform.call(this, {objectMode: true, highWaterMark: hwm}) | |
} | |
util.inherits(ListMembers, stream.Transform) | |
ListMembers.prototype._transform = function (list, encoding, callback) { | |
var Tweeter = this.models.Tweeter | |
var dbAuthorities = list.Authorities | |
// tokens are passed in from the stream on the list object | |
var auths = list.auths | |
// if no tokens are provided, log error and skip this list | |
if (!(auths && auths.length)) { | |
var err = new Error('[Stream.ListMembers] No auth tokens provided') | |
err.list = list | |
log.error(err) | |
return callback() | |
} | |
Twitter.listMembers(list.id, auths, function (err, apiAuthorities) { | |
// if twitter api doesnt like this list, log error and skip | |
if (err) { | |
log.error('[Twitter.listMembers] Unable to retrieve list') | |
log.error(err) | |
return callback() | |
} | |
// diff.create is authorities to create, diff.remove to remove | |
var diff = utility.diffArrays(apiAuthorities, dbAuthorities, 'id') | |
// the total operations to perform in streams | |
var diffCount = diff.create.length + diff.remove.length | |
// after all operations are complete, pass the list on | |
var next = after(diffCount, callback.bind(null, null, list)); | |
// finds or creates an authority, returns it | |
var foc = through.obj(function (authority, enc, cb) { | |
// retry to avoid trivial db errors | |
Qretry(function () { | |
return Tweeter.findOrCreate( | |
{where: {id: authority.id}, defaults: authority} | |
) | |
}, { maxRetry: 3, interval: 200, intervalMultiplicator: 2}) | |
.spread(function (savedAuthority, created) { | |
cb(null, savedAuthority) | |
}, function (err) { | |
log.error('[Stream.ListMembers][Tweeter.findOrCreate]') | |
log.error(err) | |
cb() | |
}); | |
}) | |
// associates the authority with the list | |
var associate = through.obj(function (authority, enc, cb) { | |
// retry to avoid trivial db errors | |
Qretry(function () { | |
return list.addAuthority(authority.id) | |
}, { maxRetry: 3, interval: 200, intervalMultiplicator: 2}) | |
.spread(function (savedAuthority, created) { | |
cb(null, savedAuthority) | |
}, function (err) { | |
log.error('[Stream.ListMembers][List.addAuthority]') | |
log.error(err) | |
cb() | |
}); | |
}) | |
// disassociates the authority from the list | |
var remove = through.obj(function (authority, enc, cb) { | |
// retry to avoid trivial db errors | |
Qretry(function () { | |
return list.removeAuthority(authority.id) | |
}, { maxRetry: 3, interval: 200, intervalMultiplicator: 2}) | |
.spread(function (savedAuthority, created) { | |
cb(null, savedAuthority) | |
}, function (err) { | |
log.error('[Stream.ListMembers][List.removeAuthority]') | |
log.error(err) | |
cb() | |
}); | |
}) | |
// sink for both creation and deletion pipelines, acts as a counter | |
var Drain = function () { | |
stream.Writable.call(this, {objectMode: true}) | |
} | |
util.inherits(Drain, stream.Writable) | |
// calls next to let 'after' know that I've finished another task | |
Drain.prototype._write = function (data, enc, cb) { | |
next() | |
cb() | |
} | |
// instance of drain to add to the pipeline | |
var createDrain = new Drain() | |
var removeDrain = new Drain() | |
// execute creation pipeline | |
streamify(diff.create) | |
.pipe(parallel(foc, this.conc)) | |
.pipe(parallel(associate, this.conc)) | |
.pipe(createDrain); | |
// execute deletion pipeline | |
streamify(diff.remove) | |
.pipe(parallel(remove, this.conc)) | |
.pipe(removeDrain) | |
}) | |
} | |
ListMembers.prototype._flush = function (callback) { | |
callback() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment