Skip to content

Instantly share code, notes, and snippets.

@anantn
Created December 27, 2012 22:11
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save anantn/4392522 to your computer and use it in GitHub Desktop.
Save anantn/4392522 to your computer and use it in GitHub Desktop.
Firebase: Implementing a worker queue pattern using firebase_queue_pop.js
var Firebase = require("./firebase-node.js");
function Queue(ref) {
this._ref = ref;
}
Queue.prototype.pop = function(cb) {
this._ref.startAt().limit(1).once("child_added", this._pop.bind(this, cb));
}
Queue.prototype._pop = function(cb, snapshot) {
var val = null;
var self = this;
// If the current head is empty just return.
if (!snapshot.val()) {
cb(null);
return;
}
// The value returned by the transaction callback will be the new value.
// If we return undefined, then the transaction will be cancelled.
snapshot.ref().transaction(function(data) {
// If the transaction succeded, val will still be null. Stash the value
// and then delete it.
if (!val) {
val = data;
return null;
}
// If the transaction failed (because some other process grabbed
// the head of the queue first), just cancel this transaction and try again.
// The next call to pop is asynchronous as recommended by node.js.
process.nextTick(function() {
self.pop(cb);
});
return;
}, function(success, snapshot) {
// The transaction succeded, just return the stashed value to process.
if (success) {
// node.js recommends making all callbacks asynchronous.
// This prevents any blocking operations from holding up our queue and
// also removes the possibility of recursion exhausting the scope chain.
process.nextTick(function() {
cb(val);
});
}
});
}
Queue.prototype.push = function(val) {
return this._ref.push(val);
}
var jobs = 10;
var queue = new Queue(new Firebase("https://anant.firebaseio.com/queue/"));
function doNextJob() {
if (jobs > 0) {
// Wait for random time between 1 and 10 seconds for job to "finish".
var time = Math.floor(Math.random() * 11) + 1;
queue.pop(function(val) {
console.log("Processing job " + val + " for " + time + " seconds");
setTimeout(doNextJob, time * 1000);
});
} else {
// We've finished our 10 jobs.
console.log("Finished 10 jobs, exiting.");
process.exit();
}
}
queue._ref.once("value", function(val) {
if (!val) {
for (var i = 1; i <= 100; i++) {
queue.push(i);
}
}
console.log("Press return to start processing jobs!");
var stdin = process.openStdin();
stdin.on("data", function() {
console.log("Fetching...");
doNextJob();
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment