Skip to content

Instantly share code, notes, and snippets.

@bertrandom
Created November 18, 2011 21:32
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save bertrandom/1377828 to your computer and use it in GitHub Desktop.
Save bertrandom/1377828 to your computer and use it in GitHub Desktop.
Daemon for queueing tasks for the future for Gearman without using SUBMIT_JOB_EPOCH
require('js-yaml');
function writeLn(line) {
var now = new Date();
console.log(now.toGMTString() + ' - [' + jobs + '] - ' + line);
}
function executeJob(data) {
if (!connection_working) {
setTimeout(executeJob, 1000, data);
return;
}
job = data.job;
jobs--;
writeLn('Executing Job: ' + job.name + ' - ' + job.workload);
try {
var gearmanJob = gearman_client.submitJob(job.name, job.workload, {
background: true,
priority: 'normal',
encoding: 'utf8',
uniqid: data.uniqid
});
} catch (err) {
connection_working = false;
// Probably lost connection to Gearman, let's kick it
writeLn('Submit failed: ' + err.message);
writeLn('Lost connection to Gearman, reconnecting...')
reconnect();
jobs++;
executeJob(data);
}
}
function reconnect() {
gearman_client.end();
gearman_client = gearman.createClient(params.gearman_tasks_1_port, params.gearman_tasks_1_host);
gearman_client.getConnection().on("connect", function () {
writeLn('Connection successful.');
connection_working = true;
});
gearman_client.getConnection().on("error", function (err) {
connection_working = false;
writeLn('Connection failed, trying to reconnect in one second...');
setTimeout(reconnect, 1000);
});
}
var jobs = 0;
var connection_working = false;
var config = require(__dirname + '/config/parameters.yml');
var params = config[0].parameters;
var gearman = require("gearman"),
gearman_client = gearman.createClient(params.gearman_tasks_1_port, params.gearman_tasks_1_host);
gearman_client.getConnection().on("connect", function () {
writeLn('Connection successful.');
connection_working = true;
});
gearman_client.getConnection().on("error", function (err) {
connection_working = false;
writeLn('Connection failed, trying to reconnect in one second...');
setTimeout(reconnect, 1000);
});
var http = require('http');
writeLn('Starting server on port ' + params.westernunion_port);
http.createServer(function (req, res) {
req.setEncoding('utf8');
var fullBody = '';
req.on('data', function(chunk) {
fullBody += chunk.toString();
});
req.on('end', function() {
res.writeHead(200, {'Content-Type': 'text/plain'});
res.end('SUCCESS\n');
var data = JSON.parse(fullBody);
var recv_string = 'Received Job: ' + data.job.name + ' - ' + data.job.workload;
var now = Math.round(new Date().getTime() / 1000);
jobs++;
if ((data.when_to_run - now) <= 0) {
writeLn(recv_string + ' - executing now!');
executeJob(data);
} else {
writeLn(recv_string + ' - executing in ' + (data.when_to_run - now) + ' seconds.');
setTimeout(executeJob, (data.when_to_run - now) * 1000, data);
}
});
}).listen(params.westernunion_port);
@bertrandom
Copy link
Author

Eventually I will turn this into a full repository. Writeup is here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment