Skip to content

Instantly share code, notes, and snippets.

@chetandhembre
Created May 22, 2014 13:20
Show Gist options
  • Save chetandhembre/a3658f9f37f9091052f9 to your computer and use it in GitHub Desktop.
Save chetandhembre/a3658f9f37f9091052f9 to your computer and use it in GitHub Desktop.
Bug in Kue while shutdowning worker for particular type of job
[ { queue:
{ client: [Object],
promoter: null,
workers: [Circular],
_events: {} },
type: 'abc',
client:
{ stream: [Object],
options: [Object],
connection_id: 1,
connected: true,
ready: true,
connections: 1,
should_buffer: false,
command_queue_high_water: 1000,
command_queue_low_water: 0,
max_attempts: null,
command_queue: [Object],
offline_queue: [Object],
commands_sent: 139,
connect_timeout: false,
enable_offline_queue: true,
retry_max_delay: null,
retry_timer: null,
retry_totaltime: 0,
retry_delay: 150,
retry_backoff: 1.7,
attempts: 1,
pub_sub_mode: false,
subscription_set: {},
monitoring: false,
closing: false,
server_info: [Object],
auth_pass: null,
parser_module: [Object],
selected_db: null,
old_state: null,
domain: null,
_events: {},
_maxListeners: 10,
port: 6379,
host: '127.0.0.1',
emitted_end: false,
reply_parser: [Object],
send_anyway: false },
running: false,
job: null,
_events: {} },
{ queue:
{ client: [Object],
promoter: null,
workers: [Circular],
_events: {} },
type: 'abcd',
client:
{ stream: [Object],
options: [Object],
connection_id: 1,
connected: true,
ready: true,
connections: 1,
should_buffer: false,
command_queue_high_water: 1000,
command_queue_low_water: 0,
max_attempts: null,
command_queue: [Object],
offline_queue: [Object],
commands_sent: 139,
connect_timeout: false,
enable_offline_queue: true,
retry_max_delay: null,
retry_timer: null,
retry_totaltime: 0,
retry_delay: 150,
retry_backoff: 1.7,
attempts: 1,
pub_sub_mode: false,
subscription_set: {},
monitoring: false,
closing: false,
server_info: [Object],
auth_pass: null,
parser_module: [Object],
selected_db: null,
old_state: null,
domain: null,
_events: {},
_maxListeners: 10,
port: 6379,
host: '127.0.0.1',
emitted_end: false,
reply_parser: [Object],
send_anyway: false },
running: false,
job: null,
_events: {} } ]
[ { queue:
{ client: [Object],
promoter: null,
workers: [Circular],
_events: {} },
type: 'abc',
client:
{ stream: [Object],
options: [Object],
connection_id: 1,
connected: true,
ready: true,
connections: 1,
should_buffer: false,
command_queue_high_water: 1000,
command_queue_low_water: 0,
max_attempts: null,
command_queue: [Object],
offline_queue: [Object],
commands_sent: 137,
connect_timeout: false,
enable_offline_queue: true,
retry_max_delay: null,
retry_timer: null,
retry_totaltime: 0,
retry_delay: 150,
retry_backoff: 1.7,
attempts: 1,
pub_sub_mode: false,
subscription_set: {},
monitoring: false,
closing: false,
server_info: [Object],
auth_pass: null,
parser_module: [Object],
selected_db: null,
old_state: null,
domain: null,
_events: {},
_maxListeners: 10,
port: 6379,
host: '127.0.0.1',
emitted_end: false,
reply_parser: [Object],
send_anyway: false },
running: true,
job: null,
_events: { error: [Function], 'job complete': [Function] } },
{ queue:
{ client: [Object],
promoter: null,
workers: [Circular],
_events: {} },
type: 'abcd',
client:
{ stream: [Object],
options: [Object],
connection_id: 1,
connected: true,
ready: true,
connections: 1,
should_buffer: false,
command_queue_high_water: 1000,
command_queue_low_water: 0,
max_attempts: null,
command_queue: [Object],
offline_queue: [Object],
commands_sent: 137,
connect_timeout: false,
enable_offline_queue: true,
retry_max_delay: null,
retry_timer: null,
retry_totaltime: 0,
retry_delay: 150,
retry_backoff: 1.7,
attempts: 1,
pub_sub_mode: false,
subscription_set: {},
monitoring: false,
closing: false,
server_info: [Object],
auth_pass: null,
parser_module: [Object],
selected_db: null,
old_state: null,
domain: null,
_events: {},
_maxListeners: 10,
port: 6379,
host: '127.0.0.1',
emitted_end: false,
reply_parser: [Object],
send_anyway: false },
running: true,
job: null,
_events: { error: [Function], 'job complete': [Function] } } ]
/**
* Created by ichetandhembre on 21/5/14.
*/
var kue = require('kue');
// create our job queue
var jobs = kue.createQueue();
// start redis with $ redis-server
// create some jobs at random,
// usually you would create these
// in your http processes upon
// user input etc.
function create() {
var name = ['tobi', 'loki', 'jane', 'manny'][Math.random() * 4 | 0];
var job = jobs.create('abc', {
title: 'converting ' + name + '\'s to avi', user: 1, frames: 200
}).attempts(0);
job.on('complete',function (id) {
jobs.shutdown(function (err, response) {
console.log(jobs.workers)
}, 1, 'abc')
}).save();
}
create();
// process video conversion jobs, 1 at a time.
//creating worker for job of type 'abc'
jobs.process('abc', 1, function (job, done) {
var frames = job.data.frames;
function next(i) {
// pretend we are doing some work
convertFrame(i, function (err) {
if (err) return done(err);
// report progress, i/frames complete
job.progress(i, frames);
if (i >= frames) done()
else next(i + Math.random() * 10);
});
}
next(0);
});
//creating worker for job type 'abcd'
jobs.process('abcd', 1, function (job, done) {
var frames = job.data.frames;
function next(i) {
// pretend we are doing some work
convertFrame(i, function (err) {
// err = 'demo'
if (err) return done(err);
// report progress, i/frames complete
job.progress(i, frames);
if (i >= frames) done()
else next(i + Math.random() * 10);
});
}
next(0);
});
function convertFrame(i, fn) {
setTimeout(fn, Math.random() * 50);
}
// start the UI
kue.app.listen(3000);
console.log('UI started on port 3000');
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment