Skip to content

Instantly share code, notes, and snippets.

@julienXX
Created February 7, 2011 13:50
Show Gist options
  • Save julienXX/814383 to your computer and use it in GitHub Desktop.
Save julienXX/814383 to your computer and use it in GitHub Desktop.
#!/usr/local/bin/node
var sys = require('sys'),
fs = require('fs'),
spawn = require('child_process').spawn,
Log = require('log'),
log = new Log(Log.DEBUG, fs.createWriteStream('poller.log', {'flags': 'a'})),
outQueue = require('./resque').connect({host: 'localhost', port: 6379});
var child = '',
remainingJobs = new Array();
/* JOBS */
var fetcherJobs = {
WallRun: function(args){
var page = args.wall_run;
child = spawn('node', ['fetcher.js', '--pageid', page.page_id, '--apikey', page.api_key, '--accesstoken', page.access_token, '--since', page.last_fetch||0]);
child.stderr.on('data', function (data) {
log.error('Page: ' + page.page_id + ' since: ' + page.last_fetch + ' ' + data);
});
child.on('exit', function(code) {
if(code!=0){
log.error("Fetch failed for " + page.page_id + " since: " + page.last_fetch);
outQueue.enqueue(page.page_id, 'FetchError', page);
}
else{
outQueue.enqueue(page.page_id, 'FetchSuccess', page);
}
remainingJobs.shift();
});
child.on("is_finished_yet", function(){
if(remainingJobs.length < 1){
process.emit("childs_finished");
} else{
console.log(remainingJobs.length + " childs to wait for.");
process.emit("waiting_for_childs");
}
});
}
}
/* WORKER */
var worker = require('./resque').connect({host: 'localhost', port: 6379}).worker('walls', fetcherJobs);
// Triggered every time the Worker polls
worker.on('poll', function(worker, queue) {
log.info('Started polling for new jobs...');
})
// Triggered before a Job is attempted
worker.on('job', function(worker, queue, job) {
var page = job.args[0].wall_run;
remainingJobs.push(page.page_id);
log.info("Processing page: " + page.page_id + " since " + page.last_fetch);
});
// Triggered every time a Job errors
worker.on('error', function(err, worker, queue, job) {log.error(err);});
// Triggered on every successful Job run
worker.on('success', function(worker, queue, job) { log.info('Job finished.'); });
/* SIGNALS HANDLING */
process.on('SIGINT', function () {
this.emit("waiting_for_childs");
});
process.on("waiting_for_childs", function(){
if(remainingJobs.length < 1){
console.log(' Got SIGINT. Terminating poller.');
this.emit('childs_finished');
} else{
console.log('Waiting for childs to die...');
console.log(remainingJobs.length + " childs to wait for.");
child.emit("is_finished_yet");
}
});
process.on("childs_finished", function(){
console.log("\nAll childs finished working.");
process.exit(0);
break;
});
/* RUN */
worker.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment