Last active
January 25, 2018 20:30
-
-
Save christianguevara/347307fb0801444697099ccf32cdb81b to your computer and use it in GitHub Desktop.
Bull, cancelling an active job when a new job is added
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Queue = require('bull'); | |
var _ = require('lodash'); | |
var queue = new Queue('dev-1', {redis: {port: 6379, host: '127.0.0.1'}}); | |
var tests = [ | |
{audio: 'http://example.com/audio1.mp3', itemId: 12, id: 1}, | |
{video: 'http://example.com/audio1.mp3', itemId: 12, id: 2} | |
]; | |
var jobs = { | |
// 1234: { // ItemId (3rd party unique id) | |
// 123: '123' //JobId | |
// } | |
}; | |
tests.forEach(function (value, i) { | |
setTimeout(function () { | |
queue.add('pre-approval', value, { | |
removeOnComplete: true | |
}) | |
.then(function (job) { | |
console.log('-1 added job ', job.id); | |
if (jobs[job.data.itemId]) { | |
jobs[job.data.itemId][job.id] = 'added' | |
} else { | |
jobs[job.data.itemId] = {}; | |
jobs[job.data.itemId][job.id] = 'added' | |
} | |
console.log('--2 jobs ' + JSON.stringify(jobs)); | |
var procArrays = _.invertBy(jobs[job.data.itemId]); | |
_.each(procArrays.processing, function (j) { | |
console.log('Cancelling added job', JSON.stringify(j)); | |
queue.getJob(j) | |
.then(function (job) { | |
if (job) { | |
job | |
.releaseLock(job.lockKey()) | |
.then(function () { | |
// return job.remove(); | |
// return job.cancel(); | |
return job.scripts.remove(); | |
// return job.moveToFailed(new Error('REMOVIDO'), true); | |
}); | |
// job.remove(); | |
// job.moveToFailed(new Error('REMOVIDO')); | |
// job.cancel(); | |
} | |
}) | |
}); | |
}); | |
}, 250 * i); | |
}); | |
queue.process('pre-approval', 1, function (job, done) { | |
console.log('Procesando trabajo', job.id); | |
jobs[job.data.itemId][job.id] = 'processing'; | |
if (job.data.id === 1) { | |
setTimeout(function () { | |
done(null, 'Job ' + job.id + ' completed'); | |
}, 15000); | |
} | |
if (job.data.id === 2) { | |
setTimeout(function () { | |
done(null, 'Job ' + job.id + ' completed'); | |
}, 5000); | |
} | |
console.log('---3 jobs ' + JSON.stringify(jobs)); | |
}); | |
queue | |
.on('error', function (error) { | |
// An error occured. | |
console.log('Job error', error); | |
}) | |
.on('active', function (job, jobPromise) { | |
// A job has started. You can use `jobPromise.cancel()`` to abort it. | |
console.log('Job active', job.id); | |
}) | |
.on('stalled', function (job) { | |
// A job has been marked as stalled. This is useful for debugging job | |
// workers that crash or pause the event loop. | |
console.log('Job stalled', job.id); | |
}) | |
.on('progress', function (job, progress) { | |
// A job's progress was updated! | |
console.log('Job progress', job.id); | |
}) | |
.on('completed', function (job, result) { | |
// A job successfully completed with a `result`. | |
console.log('Job completed', job.id); | |
jobs[job.data.itemId][job.id] = 'completed'; | |
}) | |
.on('failed', function (job, err) { | |
// A job failed with reason `err`! | |
console.log('Job failed', job.id, err); | |
}) | |
.on('paused', function () { | |
// The queue has been paused. | |
console.log('Job paused', job.id); | |
}) | |
.on('resumed', function (job) { | |
// The queue has been resumed. | |
console.log('Job resumed', job.id); | |
}) | |
.on('cleaned', function (jobs, type) { | |
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned | |
// jobs, and `type` is the type of jobs cleaned. | |
console.log('Job cleaned', job.id); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a Gist as a feature request, is not working actually and is related to: #848