Skip to content

Instantly share code, notes, and snippets.

@christianguevara
Last active January 25, 2018 20:30
Show Gist options
  • Save christianguevara/347307fb0801444697099ccf32cdb81b to your computer and use it in GitHub Desktop.
Save christianguevara/347307fb0801444697099ccf32cdb81b to your computer and use it in GitHub Desktop.
Bull, cancelling an active job when a new job is added
var Queue = require('bull');
var _ = require('lodash');
var queue = new Queue('dev-1', {redis: {port: 6379, host: '127.0.0.1'}});
var tests = [
{audio: 'http://example.com/audio1.mp3', itemId: 12, id: 1},
{video: 'http://example.com/audio1.mp3', itemId: 12, id: 2}
];
var jobs = {
// 1234: { // ItemId (3rd party unique id)
// 123: '123' //JobId
// }
};
tests.forEach(function (value, i) {
setTimeout(function () {
queue.add('pre-approval', value, {
removeOnComplete: true
})
.then(function (job) {
console.log('-1 added job ', job.id);
if (jobs[job.data.itemId]) {
jobs[job.data.itemId][job.id] = 'added'
} else {
jobs[job.data.itemId] = {};
jobs[job.data.itemId][job.id] = 'added'
}
console.log('--2 jobs ' + JSON.stringify(jobs));
var procArrays = _.invertBy(jobs[job.data.itemId]);
_.each(procArrays.processing, function (j) {
console.log('Cancelling added job', JSON.stringify(j));
queue.getJob(j)
.then(function (job) {
if (job) {
job
.releaseLock(job.lockKey())
.then(function () {
// return job.remove();
// return job.cancel();
return job.scripts.remove();
// return job.moveToFailed(new Error('REMOVIDO'), true);
});
// job.remove();
// job.moveToFailed(new Error('REMOVIDO'));
// job.cancel();
}
})
});
});
}, 250 * i);
});
queue.process('pre-approval', 1, function (job, done) {
console.log('Procesando trabajo', job.id);
jobs[job.data.itemId][job.id] = 'processing';
if (job.data.id === 1) {
setTimeout(function () {
done(null, 'Job ' + job.id + ' completed');
}, 15000);
}
if (job.data.id === 2) {
setTimeout(function () {
done(null, 'Job ' + job.id + ' completed');
}, 5000);
}
console.log('---3 jobs ' + JSON.stringify(jobs));
});
queue
.on('error', function (error) {
// An error occured.
console.log('Job error', error);
})
.on('active', function (job, jobPromise) {
// A job has started. You can use `jobPromise.cancel()`` to abort it.
console.log('Job active', job.id);
})
.on('stalled', function (job) {
// A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop.
console.log('Job stalled', job.id);
})
.on('progress', function (job, progress) {
// A job's progress was updated!
console.log('Job progress', job.id);
})
.on('completed', function (job, result) {
// A job successfully completed with a `result`.
console.log('Job completed', job.id);
jobs[job.data.itemId][job.id] = 'completed';
})
.on('failed', function (job, err) {
// A job failed with reason `err`!
console.log('Job failed', job.id, err);
})
.on('paused', function () {
// The queue has been paused.
console.log('Job paused', job.id);
})
.on('resumed', function (job) {
// The queue has been resumed.
console.log('Job resumed', job.id);
})
.on('cleaned', function (jobs, type) {
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
// jobs, and `type` is the type of jobs cleaned.
console.log('Job cleaned', job.id);
});
@christianguevara
Copy link
Author

This is a Gist as a feature request, is not working actually and is related to: #848

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment