Skip to content

Instantly share code, notes, and snippets.

@mattvv
Created June 8, 2016 22:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattvv/101828cd335b92992b513bf57549d229 to your computer and use it in GitHub Desktop.
Save mattvv/101828cd335b92992b513bf57549d229 to your computer and use it in GitHub Desktop.
var async = require('async');
var _ = require('underscore');
module.exports = {
initialize: function(api, next){
function nullOrErrorResult(err, result) {
return (err || result === null || result.length === 0);
}
//returns a function which takes a callback(err, data)
//callback:
// - err will be null or contain an error if one occurs
// - data will contain an object with:
// - 'timestamp': { class, queue, args }
function getDelayedTaskSchedule(api) {
return function(done) {
var scheduler = api.resque.scheduler;
var redis = scheduler.connection.redis;
redis.zrangebyscore(scheduler.connection.key('delayed_queue_schedule'), '-inf', '+inf', function(err, items) {
if (nullOrErrorResult(err, items)) {
done(err, {});
} else {
async.reduce(items, {}, function(memo, timestamp, callback) {
var key = scheduler.connection.key('delayed:' + timestamp);
redis.lrange(key, 0, -1, function(err, jobs) {
if(err) {
callback(err);
} else {
var jobList = jobs.map(JSON.parse);
memo[timestamp] = jobList;
callback(null, memo);
}
});
}, done);
}
});
};
}
//returns a function which takes a callback(err, data)
//callback:
// - err will be null or contain an error if one occurs
// - data will contain an object with:
// - 'queuename': [{ class, args }, ... ]
function getQueuedTasks(api) {
return function(done) {
var scheduler = api.resque.scheduler;
var redis = scheduler.connection.redis;
redis.smembers(scheduler.connection.key('queues'), function(err, queues) {
if(nullOrErrorResult(err, queues)) {
done(err, {});
} else {
async.map(queues, function(queue, callback) {
redis.lrange(scheduler.connection.key('queue:' + queue), 0, -1, function(err, items) {
var result = {
name: queue,
items: []
};
if(!nullOrErrorResult(err, items)) {
for(var i = 0; i < items.length; i++) {
result.items.push(JSON.parse(items[i]));
}
}
callback(err, result);
});
}, function(err, queueObjects) {
var result = {};
for(var i = 0; i < queueObjects.length; i ++) {
var queueItem = queueObjects[i];
result[queueItem.name] = queueItem.items;
}
done(err, result);
});
}
});
};
}
//returns a function which takes a callback(err, data)
//callback:
// - err will be null or contain an error if one occurs
// - data will contain an object with:
// - 'workername': 'idle' | { class, args }
function getWorkerStatus(api) {
return function(done) {
var scheduler = api.resque.scheduler;
var redis = scheduler.connection.redis;
redis.smembers(scheduler.connection.key('workers'), function(err, workers) {
if(nullOrErrorResult(err, workers)) {
done(err, {});
} else {
async.map(workers, function(workerName, callback) {
redis.get(scheduler.connection.key('worker:' + workerName), function(err, statusString) {
if(err) {
callback(err);
} else {
var result = {name: workerName};
if(statusString == null) {
result.status = 'idle';
} else {
var status = JSON.parse(statusString);
result.status = status.payload;
}
callback(null, result);
}
});
}, function(err, workerItems) {
var result = {};
for(var i = 0; i < workerItems.length; i ++) {
var workerItem = workerItems[i];
result[workerItem.name] = workerItem.status;
}
done(err, result);
});
}
});
};
}
//returns a function which takes a callback(err, data)
//callback:
// - err will be null or contain an error if one occurs
// - data will contain an object with:
// - 'jobName': {status: 'scheduled', at: timestamp, in: timeToTimestamp} |
// {status: 'running', running: 'workerName'} |
// {status: 'enqueued', queue: 'queueName'} |
// {status: 'dequeued'}
function getStatusByJob(api) {
return function(done) {
var taskList = Object.keys(api.tasks.tasks);
var result = {};
for(var i = 0; i < taskList.length; i++) {
result[taskList[i]] = { status: 'dequeued' };
}
async.parallel({
schedule: getDelayedTaskSchedule(api),
queuedTasks: getQueuedTasks(api),
workers: getWorkerStatus(api)
}, function(err, results) {
if(err) {
done(err, {});
} else {
_.each(results.schedule, function(jobs, timestamp) {
var timeToTimestamp = timestamp - Math.round(new Date().getTime() / 1000);
_.each(jobs, function(job) {
result[job.class] = {status: 'scheduled', at: timestamp, in: timeToTimestamp};
});
});
_.each(results.queuedTasks, function(jobs, queue) {
_.each(jobs, function(job) {
result[job.class] = {status: 'enqueued', queue: queue};
});
});
_.each(results.workers, function(job, worker) {
if(job === 'idle') {
return;
} else {
result[job.class] = {status: 'running', running: worker};
}
});
done(null, result);
}
});
};
}
api.resqueHelper = {
delayedSchedule: getDelayedTaskSchedule,
queuedTasks: getQueuedTasks,
workerStatus: getWorkerStatus,
jobs: getStatusByJob
};
next();
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment