Skip to content

Instantly share code, notes, and snippets.

@ameensol
Created March 16, 2015 01:34
Show Gist options
  • Save ameensol/2d5b640a3b6529f7a6ef to your computer and use it in GitHub Desktop.
Save ameensol/2d5b640a3b6529f7a6ef to your computer and use it in GitHub Desktop.
/* 3/13/2015 NH2
* listMembers.js
* For a given list, sync its members to Twitter's values
*/
var stream = require('readable-stream')
var util = require('util')
var after = require('after')
var through = require('through2')
var streamify = require('stream-array')
var Twitter = require('../twitter')
var utility = require('../utility')
var ListMembers = module.exports = function (options, models) {
this.models = models
options || (options = {})
var hwm = options.hwm || 16
stream.Transform.call(this, {objectMode: true, highWaterMark: hwm})
}
util.inherits(ListMembers, stream.Transform)
ListMembers.prototype._transform = function (list, encoding, callback) {
var Tweeter = this.models.Tweeter
var dbAuthorities = list.Authorities
// tokens are passed in from the stream on the list object
var auths = list.auths
// if no tokens are provided, exit with error
if (!(auths && auths.length)) {
var err = new Error('No auth tokens provided')
err.code = 100
return callback(err)
}
Twitter.listMembers(list.id, auths, function (err, apiAuthorities) {
if (err) return callback(err)
// diff.create is authorities to create, diff.remove to remove
var diff = utility.diffArrays(apiAuthorities, dbAuthorities, 'id')
// the total operations to perform in streams
var diffCount = diff.create.length + diff.remove.length
// after all operations are complete, pass the list on
var next = after(diffCount, callback.bind(null, null, list));
// finds or creates an authority, only passes on its id
var foc = through.obj(function (authority, enc, cb) {
Tweeter.findOrCreate({where: {id: authority.id}, defaults: authority})
.spread(function (savedAuthority, created) {
cb(null, savedAuthority)
}, function (err) {
cb(err)
})
})
// associates the authority with the list
var associate = through.obj(function (authority, enc, cb) {
list.addAuthority(authority.id).then(function () {
cb(null, authority)
}, function (err) {
cb(err)
})
})
// disassociates the authority from the list
var remove = through.obj(function (authority, enc, cb) {
list.removeAuthority(authority.id).then(function () {
cb(null, authority)
}, function (err) {
cb(err)
})
})
// sink for both creation and deletion pipelines, acts as a counter
var Drain = function () {
stream.Writable.call(this, {objectMode: true})
}
util.inherits(Drain, stream.Writable)
// calls next to let 'after' know that I've finished another task
Drain.prototype._write = function (data, enc, cb) {
next()
cb()
}
// instance of drain to add to the pipeline
var createDrain = new Drain()
var removeDrain = new Drain()
// execute creation pipeline
streamify(diff.create)
.pipe(foc)
.pipe(associate)
.pipe(createDrain);
// execute deletion pipeline
streamify(diff.remove)
.pipe(remove)
.pipe(removeDrain)
})
}
ListMembers.prototype._flush = function (callback) {
callback()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment