Skip to content

Instantly share code, notes, and snippets.

@Sephi-Chan
Created May 2, 2011 21:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Sephi-Chan/952396 to your computer and use it in GitHub Desktop.
Save Sephi-Chan/952396 to your computer and use it in GitHub Desktop.
ProductionQueue server
var settings = {
callback: {
hostname: '127.0.0.1',
port: 4567,
path: '/finish_production',
secretKey: 'my-super-secret-key'
}
}
var http = require('http'),
uuid = require('uuid'),
redis = require('redis'),
express = require('express');
var app = express.createServer(),
redisClient = redis.createClient();
app.use(express.bodyParser());
function addItemToQueue(queueIdentifier, item){
console.log("Start production of item " + item.uuid + ".");
var queueKey = "queues:" + queueIdentifier,
listKey = queueKey + ":items";
setTimeout(function(listKey){
redisClient.lpop(listKey, function(error, uuid){
redisClient.lindex(listKey, 0, function(error, nextItemUuid){
var oldItemKey = 'items:' + uuid;
redisClient.del(oldItemKey);
if(nextItemUuid){
var nextItemKey = 'items:' + nextItemUuid;
redisClient.hset(nextItemKey, 'status', 'active');
redisClient.hget(nextItemKey, 'duration', function(error, duration){
addItemToQueue(queueIdentifier, {
uuid: nextItemUuid,
duration: duration
});
});
}
else {
redisClient.hset(queueKey, 'status', 'idle');
}
});
});
var body = JSON.stringify({
uuid: item.uuid
}),
options = {
host: settings.callback.hostname,
port: settings.callback.port,
path: settings.callback.path,
method: 'POST',
headers: {
'X-PQS-Secret-Key': settings.callback.secretKey,
'Content-Type': 'application/json',
'Content-Length': body.length
}
},
request = http.request(options, function(){});
request.on('error', function(){});
request.end(body);
}, item.duration, listKey);
}
/**
* Create a queue.
* Request body must include a parameter.
* identifier: Identify the queue.
* TODO:
*/
app.post('/queues', function(request, response){
var queue = {
identifier: request.body.identifier,
status: 'idle'
},
queueKey = 'queues:' + queue.identifier;
redisClient.hmset(
queueKey,
'identifier', queue.identifier,
'status', queue.status
);
response.send(JSON.stringify(queue));
});
/**
* Add an item to the queue.
* Request body must include few parameters.
* identifier: Identify the item. Not used by the system.
* - duration: Exprimed in seconds.
* - callbackUrl: Callback URL where is sent the request when the item is finished.
*
* TODO: Create the queue on the fly.
*/
app.post('/queues/:identifier/items', function(request, response){
var queueIdentifier = request.param('identifier'),
queueKey = "queues:" + queueIdentifier;
redisClient.exists(queueKey, function(error, exists){
if(exists){
redisClient.hget(queueKey, 'status', function(error, queueStatus){
var itemUuid = uuid.generate().toLowerCase(),
itemKey = 'items:' + itemUuid,
listKey = queueKey + ":items",
duration = parseInt(request.body.duration * 1000);
if(queueStatus == 'idle'){
// Create the item as active, activate the queue and push the item to the list.
redisClient.hmset(
itemKey,
'status', 'active',
'identifier', request.body.identifier,
'duration', duration
);
redisClient.hset(queueKey, 'status', 'active');
redisClient.rpush(listKey, itemUuid);
addItemToQueue(queueIdentifier, {
uuid: itemUuid,
duration: duration
});
}
else {
// Create the item as idle and push the item to the list.
redisClient.hmset(
itemKey,
'status', 'idle',
'identifier', request.body.identifier,
'duration', request.body.duration * 1000
);
redisClient.rpush(listKey, itemUuid);
}
response.send(JSON.stringify({ queue_status: queueStatus }));
});
}
else {
response.send(JSON.stringify({
error: 'queues_does_not_exist'
}));
}
});
});
app.listen(1808, 'localhost');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment