Skip to content

Instantly share code, notes, and snippets.

@khankuan
Last active August 29, 2015 14:08
Show Gist options
  • Save khankuan/e59126b24fae42a1d59e to your computer and use it in GitHub Desktop.
Save khankuan/e59126b24fae42a1d59e to your computer and use it in GitHub Desktop.
/* Exports */
module.exports = function(opts){
var processFunction = opts.processFunction; // Returns a promise
var delay = opts.delay || 100;
var concurrency = opts.concurrency || 10;
var deleteIfFail = opts.deleteIfFail || false;
var queue = [];
var processing = false;
function process(){
// Ignore if nothing left to process
if (queue.length == 0)
return;
processing = true;
// Grab next 10
var toSend = queue.slice(0, concurrency);
processFunction(toSend)
// If success, remove items
.then(function(data){
queue.splice(0, toSend.length);
}, function(){
if (deleteIfFail)
queue.splice(0, toSend.length);
})
// Continue next batch
.finally(function(){
processing = false;
process();
});
}
return {
queue: function(item){
queue.push(item);
// Ignore if already running
if (processing)
return;
processing = true;
// Start processing after some delay
setTimeout(function(){
process();
}, delay);
}
}
};
var batchQueue = require('./BatchQueue')({
processFunction: function(items){
return new Promise(function(resolve, reject){
var params = {
Entries: items.map(function(data, i){
return {
Id: i+"",
MessageBody: JSON.stringify(data),
}
}),
QueueUrl: queueUrl,
};
// Send
sqsQueue.sendMessageBatch(params, function(err, data) {
if (err)
reject(err);
else
resolve();
});
});
}
});
helper.queue = function(item){
batchQueue.queue(item);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment