Created
May 5, 2012 11:42
-
-
Save mborho/2601766 to your computer and use it in GitHub Desktop.
qcs wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function worker(msg) { | |
console.log('###### WORKER #######'); | |
console.log(msg.MessageId+' #### '); | |
console.log(msg.Body); | |
msg.del(function(error) { | |
// Message deletion never results in an error. If a message is successfully | |
// deleted, it will simply never appear in the queue again. | |
console.log('Message deleted!'); | |
// _receive(cb); | |
}); | |
} | |
var jobQueue = new lib.queue.JobQueue(nconf.get('db'), worker); | |
jobQueue.start() | |
jobQueue.send({bla:"fasel"}); | |
// A normal import. | |
var cqs = require('cqs'); | |
function JobQueue(conf, worker) { | |
var _conf = conf; | |
var _dns = conf.server.replace(/http:\/\//g, "http://"+_conf.user+":"+conf.password+"@")+":"+conf.port; | |
var _cqs = cqs.defaults({ "couch": _dns, "db": "test_silozippr"}); | |
var _queues = {} | |
var _worker = worker; | |
var _idleInterval = 9000;//00; | |
function _startQueues() { | |
_cqs.ListQueues(function(error, list) { | |
if(error) console.log(error); | |
else { | |
console.log("Found " + list.length + " queues:"); | |
list.forEach(function(queue) { | |
_queues[queue.name] = queue; | |
console.log(" * " + queue.name); | |
}) | |
} | |
if(_queues.pubsub == undefined) { | |
_cqs.CreateQueue("pubsub", function(error, queue) { | |
if(error) { | |
console.log("pubsub queue failed to create"); | |
} else { | |
console.log("pubsub queue created"); | |
_queues['pubsub'] = queue; | |
_receive(); | |
} | |
}) | |
} else { | |
console.log('pubsub queue exists'); | |
_receive(); | |
} | |
}) | |
} | |
function _receive() { | |
if(_queues.pubsub) { | |
_queues.pubsub.receive(function(error, messages) { | |
console.log('Message received: '+messages.length); | |
if(error) console.log(error) | |
else { | |
if(messages.length > 0) { | |
messages.forEach(function(msg) { | |
_worker(msg); | |
_receive(); | |
// msg.del(function(error) { | |
// // Message deletion never results in an error. If a message is successfully | |
// // deleted, it will simply never appear in the queue again. | |
// console.log('Message deleted!'); | |
// _receive(cb); | |
// }); | |
}); | |
} else { | |
console.log('next check in '+(_idleInterval/1000)+' secs'); | |
setTimeout(_receive, _idleInterval); | |
} | |
} | |
}); | |
} else { | |
console.log('queue receive: pubsub queue missing'); | |
} | |
} | |
function _send(data) { | |
if(_queues.pubsub) { | |
_queues.pubsub.send(data, function(error, message) { | |
if(error) console.log(error); | |
else { | |
console.log('queue: message sended ' + JSON.stringify(message.Body)); | |
} | |
}); | |
} else { | |
console.log('queue receive: pubsub queue missing'); | |
} | |
} | |
return { | |
start: function() { | |
_startQueues(); | |
}, | |
send: function (data) { | |
// _send(data); | |
var interval = Math.floor(Math.random() * (10000 - 6000 + 1) + 6000); | |
console.log('next message in '+interval); | |
var call = function() {_send(data)}; | |
setInterval(call, interval) | |
} | |
} | |
} | |
exports.JobQueue = JobQueue; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment