Skip to content

Instantly share code, notes, and snippets.

@mborho
Created May 5, 2012 11:42
Show Gist options
  • Save mborho/2601766 to your computer and use it in GitHub Desktop.
Save mborho/2601766 to your computer and use it in GitHub Desktop.
qcs wrapper
function worker(msg) {
console.log('###### WORKER #######');
console.log(msg.MessageId+' #### ');
console.log(msg.Body);
msg.del(function(error) {
// Message deletion never results in an error. If a message is successfully
// deleted, it will simply never appear in the queue again.
console.log('Message deleted!');
// _receive(cb);
});
}
var jobQueue = new lib.queue.JobQueue(nconf.get('db'), worker);
jobQueue.start()
jobQueue.send({bla:"fasel"});
// A normal import.
var cqs = require('cqs');
function JobQueue(conf, worker) {
var _conf = conf;
var _dns = conf.server.replace(/http:\/\//g, "http://"+_conf.user+":"+conf.password+"@")+":"+conf.port;
var _cqs = cqs.defaults({ "couch": _dns, "db": "test_silozippr"});
var _queues = {}
var _worker = worker;
var _idleInterval = 9000;//00;
function _startQueues() {
_cqs.ListQueues(function(error, list) {
if(error) console.log(error);
else {
console.log("Found " + list.length + " queues:");
list.forEach(function(queue) {
_queues[queue.name] = queue;
console.log(" * " + queue.name);
})
}
if(_queues.pubsub == undefined) {
_cqs.CreateQueue("pubsub", function(error, queue) {
if(error) {
console.log("pubsub queue failed to create");
} else {
console.log("pubsub queue created");
_queues['pubsub'] = queue;
_receive();
}
})
} else {
console.log('pubsub queue exists');
_receive();
}
})
}
function _receive() {
if(_queues.pubsub) {
_queues.pubsub.receive(function(error, messages) {
console.log('Message received: '+messages.length);
if(error) console.log(error)
else {
if(messages.length > 0) {
messages.forEach(function(msg) {
_worker(msg);
_receive();
// msg.del(function(error) {
// // Message deletion never results in an error. If a message is successfully
// // deleted, it will simply never appear in the queue again.
// console.log('Message deleted!');
// _receive(cb);
// });
});
} else {
console.log('next check in '+(_idleInterval/1000)+' secs');
setTimeout(_receive, _idleInterval);
}
}
});
} else {
console.log('queue receive: pubsub queue missing');
}
}
function _send(data) {
if(_queues.pubsub) {
_queues.pubsub.send(data, function(error, message) {
if(error) console.log(error);
else {
console.log('queue: message sended ' + JSON.stringify(message.Body));
}
});
} else {
console.log('queue receive: pubsub queue missing');
}
}
return {
start: function() {
_startQueues();
},
send: function (data) {
// _send(data);
var interval = Math.floor(Math.random() * (10000 - 6000 + 1) + 6000);
console.log('next message in '+interval);
var call = function() {_send(data)};
setInterval(call, interval)
}
}
}
exports.JobQueue = JobQueue;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment