public

My Kue worker callback system.

  • Download Gist
Kue Worker Process Callback System.md
Markdown

Update 2013-06-30

DEFINITELY DON'T USE THIS. It looks like I probably reinvented the wheel here. I could have just shoved the new jobs into redis and monitored them... See here.

When you're using Kue to process jobs in a separate process, you can't simply execute a callback when the job is finished. This is an example of communication between the two processes.

I would have liked to have used the id that Kue provides each job automatically (which I believe is the same id it receives in Redis) but app.js needs to know the id of the job before it gets sent to the worker so that it can match the id when it receives a message.

Please let me know if you have a better idea on how to solve the issue or if you can improve my code.

app.js
JavaScript
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
var child = require('child_process');
var async = require('async');
 
var worker = child.fork("./worker.js");
 
//When a message is received, search activeJobs for it, call finished callback, and delete the job
worker.on('message', function(m) {
for(var i = 0; i < activeJobs.length; i++) {
if(m.jobId == activeJobs[i].jobId) {
activeJobs[i].finished(m.err, m.results);
activeJobs.splice(i,1);
break;
}
}
});
 
// local job system
var newJobId = 0;
var activeJobs = [];
 
function Job(input, callback) {
this.jobId = newJobId;
input.jobId = newJobId;
newJobId++;
activeJobs.push(this);
 
worker.send(input);
 
this.finished = function(err, results) {
callback(err, results);
}
}
 
 
var deleteIt = function(req, res) {
async.series([
function(callback) {
// An *EXAMPLE* asynchronous task that is passed off to the worker to be processed
// and requires a callback (because of async.series)
new Job({
jobType:'filesystem',
title:'delete project directory',
operation: 'delete',
path: '/deleteMe'
}, function(err) {
callback(err);
});
},
//Delete it from the database
function(callback) {
someObject.remove(function(err) {
callback(err);
});
},
],
function(err) {
if(err) console.log(err);
});
};
worker.js
JavaScript
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
var kue = require('kue');
var fs = require('fs-extra');
 
var jobs = kue.createQueue();
 
//Jobs that are sent arrive here
process.on('message', function(message) {
if(message.jobType) {
var job = jobs.create(message.jobType, message).save();
} else {
console.log(new Error("Worker: No jobType specified, message ignored");
}
});
 
jobs.process('filesystem', function(job, done) {
if(job.data.operation == 'delete') {
fs.delete(job.data.path, function(err) {
notifyFinished(job.data.jobId, err);
done(err);
});
}
});
 
function notifyFinished(id, error, results) {
process.send({jobId: id, status: 'finished', error: error, results: results});
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.