Created
March 16, 2015 01:34
-
-
Save ameensol/2d5b640a3b6529f7a6ef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* 3/13/2015 NH2 | |
* listMembers.js | |
* For a given list, sync its members to Twitter's values | |
*/ | |
var stream = require('readable-stream') | |
var util = require('util') | |
var after = require('after') | |
var through = require('through2') | |
var streamify = require('stream-array') | |
var Twitter = require('../twitter') | |
var utility = require('../utility') | |
var ListMembers = module.exports = function (options, models) { | |
this.models = models | |
options || (options = {}) | |
var hwm = options.hwm || 16 | |
stream.Transform.call(this, {objectMode: true, highWaterMark: hwm}) | |
} | |
util.inherits(ListMembers, stream.Transform) | |
ListMembers.prototype._transform = function (list, encoding, callback) { | |
var Tweeter = this.models.Tweeter | |
var dbAuthorities = list.Authorities | |
// tokens are passed in from the stream on the list object | |
var auths = list.auths | |
// if no tokens are provided, exit with error | |
if (!(auths && auths.length)) { | |
var err = new Error('No auth tokens provided') | |
err.code = 100 | |
return callback(err) | |
} | |
Twitter.listMembers(list.id, auths, function (err, apiAuthorities) { | |
if (err) return callback(err) | |
// diff.create is authorities to create, diff.remove to remove | |
var diff = utility.diffArrays(apiAuthorities, dbAuthorities, 'id') | |
// the total operations to perform in streams | |
var diffCount = diff.create.length + diff.remove.length | |
// after all operations are complete, pass the list on | |
var next = after(diffCount, callback.bind(null, null, list)); | |
// finds or creates an authority, only passes on its id | |
var foc = through.obj(function (authority, enc, cb) { | |
Tweeter.findOrCreate({where: {id: authority.id}, defaults: authority}) | |
.spread(function (savedAuthority, created) { | |
cb(null, savedAuthority) | |
}, function (err) { | |
cb(err) | |
}) | |
}) | |
// associates the authority with the list | |
var associate = through.obj(function (authority, enc, cb) { | |
list.addAuthority(authority.id).then(function () { | |
cb(null, authority) | |
}, function (err) { | |
cb(err) | |
}) | |
}) | |
// disassociates the authority from the list | |
var remove = through.obj(function (authority, enc, cb) { | |
list.removeAuthority(authority.id).then(function () { | |
cb(null, authority) | |
}, function (err) { | |
cb(err) | |
}) | |
}) | |
// sink for both creation and deletion pipelines, acts as a counter | |
var Drain = function () { | |
stream.Writable.call(this, {objectMode: true}) | |
} | |
util.inherits(Drain, stream.Writable) | |
// calls next to let 'after' know that I've finished another task | |
Drain.prototype._write = function (data, enc, cb) { | |
next() | |
cb() | |
} | |
// instance of drain to add to the pipeline | |
var createDrain = new Drain() | |
var removeDrain = new Drain() | |
// execute creation pipeline | |
streamify(diff.create) | |
.pipe(foc) | |
.pipe(associate) | |
.pipe(createDrain); | |
// execute deletion pipeline | |
streamify(diff.remove) | |
.pipe(remove) | |
.pipe(removeDrain) | |
}) | |
} | |
ListMembers.prototype._flush = function (callback) { | |
callback() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment