Skip to content

Instantly share code, notes, and snippets.

@ameensol
Created March 16, 2015 20:46
Show Gist options
  • Save ameensol/b241ed3225b421fc0d33 to your computer and use it in GitHub Desktop.
Save ameensol/b241ed3225b421fc0d33 to your computer and use it in GitHub Desktop.
/* 3/13/2015 NH2
* listMembers.js
* For a given list, sync its members to Twitter's values
*/
var stream = require('readable-stream')
var util = require('util')
var after = require('after')
var through = require('through2')
var streamify = require('stream-array')
var parallel = require('concurrent-transform')
var log = require('../logs')()
var Twitter = require('../twitter')
var utility = require('../utility')
var ListMembers = module.exports = function (options, models) {
this.models = models
options || (options = {})
var hwm = options.hwm || 16
// how many concurrent db create/associate/delete ops to run
this.conc = options.conc || 1
stream.Transform.call(this, {objectMode: true, highWaterMark: hwm})
}
util.inherits(ListMembers, stream.Transform)
ListMembers.prototype._transform = function (list, encoding, callback) {
var Tweeter = this.models.Tweeter
var dbAuthorities = list.Authorities
// tokens are passed in from the stream on the list object
var auths = list.auths
// if no tokens are provided, log error and skip this list
if (!(auths && auths.length)) {
var err = new Error('[Stream.ListMembers] No auth tokens provided')
err.list = list
log.error(err)
return callback()
}
Twitter.listMembers(list.id, auths, function (err, apiAuthorities) {
// if twitter api doesnt like this list, log error and skip
if (err) {
log.error('[Twitter.listMembers] Unable to retrieve list')
log.error(err)
return callback()
}
// diff.create is authorities to create, diff.remove to remove
var diff = utility.diffArrays(apiAuthorities, dbAuthorities, 'id')
// the total operations to perform in streams
var diffCount = diff.create.length + diff.remove.length
// after all operations are complete, pass the list on
var next = after(diffCount, callback.bind(null, null, list));
// finds or creates an authority, returns it
var foc = through.obj(function (authority, enc, cb) {
// retry to avoid trivial db errors
Qretry(function () {
return Tweeter.findOrCreate(
{where: {id: authority.id}, defaults: authority}
)
}, { maxRetry: 3, interval: 200, intervalMultiplicator: 2})
.spread(function (savedAuthority, created) {
cb(null, savedAuthority)
}, function (err) {
log.error('[Stream.ListMembers][Tweeter.findOrCreate]')
log.error(err)
cb()
});
})
// associates the authority with the list
var associate = through.obj(function (authority, enc, cb) {
// retry to avoid trivial db errors
Qretry(function () {
return list.addAuthority(authority.id)
}, { maxRetry: 3, interval: 200, intervalMultiplicator: 2})
.spread(function (savedAuthority, created) {
cb(null, savedAuthority)
}, function (err) {
log.error('[Stream.ListMembers][List.addAuthority]')
log.error(err)
cb()
});
})
// disassociates the authority from the list
var remove = through.obj(function (authority, enc, cb) {
// retry to avoid trivial db errors
Qretry(function () {
return list.removeAuthority(authority.id)
}, { maxRetry: 3, interval: 200, intervalMultiplicator: 2})
.spread(function (savedAuthority, created) {
cb(null, savedAuthority)
}, function (err) {
log.error('[Stream.ListMembers][List.removeAuthority]')
log.error(err)
cb()
});
})
// sink for both creation and deletion pipelines, acts as a counter
var Drain = function () {
stream.Writable.call(this, {objectMode: true})
}
util.inherits(Drain, stream.Writable)
// calls next to let 'after' know that I've finished another task
Drain.prototype._write = function (data, enc, cb) {
next()
cb()
}
// instance of drain to add to the pipeline
var createDrain = new Drain()
var removeDrain = new Drain()
// execute creation pipeline
streamify(diff.create)
.pipe(parallel(foc, this.conc))
.pipe(parallel(associate, this.conc))
.pipe(createDrain);
// execute deletion pipeline
streamify(diff.remove)
.pipe(parallel(remove, this.conc))
.pipe(removeDrain)
})
}
ListMembers.prototype._flush = function (callback) {
callback()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment