Skip to content

Instantly share code, notes, and snippets.

@freidamachoi
Forked from katowulf/app.js
Created June 3, 2014 19:45
Show Gist options
  • Save freidamachoi/9bc08033921f981f9a5f to your computer and use it in GitHub Desktop.
Save freidamachoi/9bc08033921f981f9a5f to your computer and use it in GitHub Desktop.
Kato's queue service --- we use this.
var Firebase = require('firebase');
var Request = require('./request.js');
var Reply = require('./reply.js');
var Tasks = require('./tasks');
var Queue = require('./queue.js');
var fb = new Firebase(process.env.FIREBASE_URL).child('queue');
fb.auth(process.env.FIREBASE_TOKEN, function(err) {
if(err) { throw err; }
else {
new Queue(fb.child('in'), processRequest);
}
});
function processRequest(requestData) {
var req = new Request(requestData);
var res = new Response(replyToRequest.bind(null, req));
if( req.uid && req.task ) {
// assumptions: all incoming tasks have a `uid` and an `action`
// and may also contain an optional `data` object.
// If any requirements are missing, then the request is ignored
Tasks.route(req, res);
}
}
function replyToRequest(request, reply) {
fb.child('out/'+request.uid).push(reply.getData());
}
/**
* Modified from Firebase-work-queue (https://github.com/firebase/firebase-work-queue)
*/
function WorkQueue(queueRef, processingCallback) {
this.processingCallback = processingCallback;
queueRef.on("child_added", function(snap) {
this.tryToProcess(snap);
}, this);
}
WorkQueue.prototype.tryToProcess = function(snap) {
// claims the queue item by deleting it, if successful, it
// belongs to us. In my queues, I generally don't delete these,
// but instead write them to a "processing" queue until they
// are done. In this way, if the server should crash due to a bug
// or issue, then I can find the in-process items that would
// otherwise be lost
snap.ref().transaction(function(theItem) {
dataToProcess = theItem;
if(theItem) return null;
else return; // abort by returning undefined
}, function(error, committed, snapshot) {
if (error) console.error(error);
if(committed) {
console.log("Claimed a job.");
self.processingCallback(dataToProcess);
} else {
console.log("Another worker beat me to the job.");
}
});
}
}
module.exports = WorkQueue;
function Reply(callback) {
this.callback = callback;
this.data = null;
}
Response.prototype = {
yes: function(data) {
this.data = {
success: true,
error: null,
data: data
};
this.callback(this);
},
no: function(error) {
this.data = {
success: false,
error: error,
data: {}
};
this.callback(this);
}
};
module.exports = Reply;
function Request(request) {
this.action = request.action;
this.uid = request.uid;
this.data = request.data;
};
module.exports = Request;
var util = require('util');
exports.run = function(request, reply) {
if( request.data.isGeorgeCostanza ) {
reply.no({message: 'No soup for you!'});
}
else {
reply.yes({message: 'How can I accept any less from my customers?'});
}
}
var fs = require('fs');
var Tasks = {}
// reads the tasks/ directory where this file is located and
// creates a Task out of each task.<name>.js file
fs.readdirSync(__dirname).forEach(function(route) {
var m = route.match(/^task.([a-z-]+).js$/);
m && (Tasks[ m[1] ] = require('./'+route));
});
exports.route = function(request, reply) {
if(Tasks.hasOwnProperty(request.task) {
try {
var Task = Tasks[request.task];
Task.run(request, reply);
}
catch(e) {
reply.err(e);
}
}
else {
reply.err('Invalid task: '+request.task);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment