Created
May 2, 2011 21:21
-
-
Save Sephi-Chan/952396 to your computer and use it in GitHub Desktop.
ProductionQueue server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var settings = { | |
callback: { | |
hostname: '127.0.0.1', | |
port: 4567, | |
path: '/finish_production', | |
secretKey: 'my-super-secret-key' | |
} | |
} | |
var http = require('http'), | |
uuid = require('uuid'), | |
redis = require('redis'), | |
express = require('express'); | |
var app = express.createServer(), | |
redisClient = redis.createClient(); | |
app.use(express.bodyParser()); | |
function addItemToQueue(queueIdentifier, item){ | |
console.log("Start production of item " + item.uuid + "."); | |
var queueKey = "queues:" + queueIdentifier, | |
listKey = queueKey + ":items"; | |
setTimeout(function(listKey){ | |
redisClient.lpop(listKey, function(error, uuid){ | |
redisClient.lindex(listKey, 0, function(error, nextItemUuid){ | |
var oldItemKey = 'items:' + uuid; | |
redisClient.del(oldItemKey); | |
if(nextItemUuid){ | |
var nextItemKey = 'items:' + nextItemUuid; | |
redisClient.hset(nextItemKey, 'status', 'active'); | |
redisClient.hget(nextItemKey, 'duration', function(error, duration){ | |
addItemToQueue(queueIdentifier, { | |
uuid: nextItemUuid, | |
duration: duration | |
}); | |
}); | |
} | |
else { | |
redisClient.hset(queueKey, 'status', 'idle'); | |
} | |
}); | |
}); | |
var body = JSON.stringify({ | |
uuid: item.uuid | |
}), | |
options = { | |
host: settings.callback.hostname, | |
port: settings.callback.port, | |
path: settings.callback.path, | |
method: 'POST', | |
headers: { | |
'X-PQS-Secret-Key': settings.callback.secretKey, | |
'Content-Type': 'application/json', | |
'Content-Length': body.length | |
} | |
}, | |
request = http.request(options, function(){}); | |
request.on('error', function(){}); | |
request.end(body); | |
}, item.duration, listKey); | |
} | |
/** | |
* Create a queue. | |
* Request body must include a parameter. | |
* identifier: Identify the queue. | |
* TODO: | |
*/ | |
app.post('/queues', function(request, response){ | |
var queue = { | |
identifier: request.body.identifier, | |
status: 'idle' | |
}, | |
queueKey = 'queues:' + queue.identifier; | |
redisClient.hmset( | |
queueKey, | |
'identifier', queue.identifier, | |
'status', queue.status | |
); | |
response.send(JSON.stringify(queue)); | |
}); | |
/** | |
* Add an item to the queue. | |
* Request body must include few parameters. | |
* identifier: Identify the item. Not used by the system. | |
* - duration: Exprimed in seconds. | |
* - callbackUrl: Callback URL where is sent the request when the item is finished. | |
* | |
* TODO: Create the queue on the fly. | |
*/ | |
app.post('/queues/:identifier/items', function(request, response){ | |
var queueIdentifier = request.param('identifier'), | |
queueKey = "queues:" + queueIdentifier; | |
redisClient.exists(queueKey, function(error, exists){ | |
if(exists){ | |
redisClient.hget(queueKey, 'status', function(error, queueStatus){ | |
var itemUuid = uuid.generate().toLowerCase(), | |
itemKey = 'items:' + itemUuid, | |
listKey = queueKey + ":items", | |
duration = parseInt(request.body.duration * 1000); | |
if(queueStatus == 'idle'){ | |
// Create the item as active, activate the queue and push the item to the list. | |
redisClient.hmset( | |
itemKey, | |
'status', 'active', | |
'identifier', request.body.identifier, | |
'duration', duration | |
); | |
redisClient.hset(queueKey, 'status', 'active'); | |
redisClient.rpush(listKey, itemUuid); | |
addItemToQueue(queueIdentifier, { | |
uuid: itemUuid, | |
duration: duration | |
}); | |
} | |
else { | |
// Create the item as idle and push the item to the list. | |
redisClient.hmset( | |
itemKey, | |
'status', 'idle', | |
'identifier', request.body.identifier, | |
'duration', request.body.duration * 1000 | |
); | |
redisClient.rpush(listKey, itemUuid); | |
} | |
response.send(JSON.stringify({ queue_status: queueStatus })); | |
}); | |
} | |
else { | |
response.send(JSON.stringify({ | |
error: 'queues_does_not_exist' | |
})); | |
} | |
}); | |
}); | |
app.listen(1808, 'localhost'); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment