Skip to content

Instantly share code, notes, and snippets.

@victusfate
Last active December 16, 2016 08:35
Show Gist options
  • Save victusfate/1e2ce9eb73de32b78d2690d660f0f9c8 to your computer and use it in GitHub Desktop.
Save victusfate/1e2ce9eb73de32b78d2690d660f0f9c8 to your computer and use it in GitHub Desktop.
graceful queue and worker shutdown take 1
'use strict';
const kue = require('kue');
const url = require('url');
const redis = require('redis');
var getRedis = () => {
var redisUrl = url.parse(process.env.LOCAL_REDIS_URL || config.Redis.RedisUrl)
, client = redis.createClient(redisUrl.port, redisUrl.hostname);
// console.log({ action: 'app', redisUrl: redisUrl });
if (redisUrl.auth) {
client.auth(redisUrl.auth.split(":")[1]);
}
return client;
}
const watchStuckJobsTime = 1000;
const workerShutdownTime = 3000;
const jobInactiveTime = workerShutdownTime + 500;
const killDelayTime = 5000; // allow all workers in this process time to gracefully shutdown
const concurrency = 5;
var queue = kue.createQueue({
redis: {
createClientFactory: getRedis
}
});
queue.on( 'error', (err) => {
console.log({ action: 'rawBatchJob.queue.err', err:err });
});
queue.watchStuckJobs(watchStuckJobsTime);
// the queue application process requires graceful shutdown, already covered by worker below
// and will work when workers are run in several different processes
// process.on( 'SIGTERM', function ( sig ) {
// queue.shutdown( killDelayTime, function(err) {
// console.log({ action: 'Kue shutdown SIGTERM: ', err: err||'' });
// process.exit( 0 );
// });
// });
// process.on( 'SIGINT', function ( sig ) {
// queue.shutdown( killDelayTime, function(err) {
// console.log({ action: 'Kue shutdown SIGINT: ', err: err||'' });
// process.exit( 0 );
// });
// });
// quick spam of jobs
for (let i = 0;i < 10; i++) {
const job = queue.create('email', {
title: 'Marky Mark Job ' + Math.random(),
to: 'Mr. Universe',
template: 'blah blah blah'
})
.removeOnComplete( true )
.save( (err) => {
if( !err ) {
console.log( job.id );
}
});
}
let fMessageWorker = (oJob, oData) => {
console.log({ id: oJob.toJSON().id, job: oJob.toJSON().type, oData: oData })
// return Promise.resolve();
return new Promise( (resolve,reject) => {
const N = 100;
let iterate = (i,N) => {
setTimeout( () => {
// console.log({ action: 'fMessageWorker.iterate', i:i, N: N });
i++;
oJob.progress(i,N);
if (i === N) {
resolve();
}
else {
iterate(i,N);
}
}, Math.random() * 100 );
}
iterate(0,N);
});
}
const jobInfo = (job) => {
const oJob = job.toJSON() || {};
return { id: oJob.id, type: oJob.type, state: oJob.state };
}
const setJobInactive = (jobId,jobType,delayMS) => {
const sAction = 'setJobInactive';
return new Promise( (resolve,reject) => {
setTimeout( () => {
kue.Job.get( jobId, jobType, (err,job) => {
if (job) { // job may be complete/removed
const oJob = job.toJSON();
console.log({ action: sAction + '.success' , job: jobInfo(job) });
if (oJob.state === 'active' || oJob.state === 'failed') {
job.inactive( (err) => {
if (err) {
console.error({action: sAction + '.err', job: jobInfo(job), err:err });
reject(err);
}
else {
console.log({ action: sAction + '.success' , job: jobInfo(job) });
resolve();
}
});
}
else {
console.log({ action: sAction + '.skipping', job: jobInfo(job) });
resolve();
}
}
else {
console.log({ action: sAction + '.skipping.job.done', id: jobId, type: jobType });
resolve();
}
});
},delayMS);
})
}
queue.process('email', concurrency, (job, ctx, done) => {
const oJob = job.toJSON();
const jobId = oJob.id;
const jobType = oJob.type;
// process.once, Adds a one time listener function for the event named eventName.
// The next time eventName is triggered, this listener is removed and then invoked.
process.once( 'SIGINT', () => {
const sAction = 'SIGINT';
setJobInactive(jobId,jobType,jobInactiveTime)
.then( () => {
console.error({action: sAction + '.setJobInactive.success' });
})
.catch( (err) => {
console.error({action: sAction + '.setJobInactive.err', err:err });
})
ctx.pause(workerShutdownTime, (err) => {
if (err) {
console.error({action: sAction + '.err', err:err });
}
else {
console.error({action: sAction + '.gracefully.shutdown' });
}
setTimeout( () => {
console.info({action: sAction + '.process.exit' });
process.exit();
},killDelayTime);
});
})
process.once( 'SIGTERM', () => {
const sAction = 'SIGTERM';
setJobInactive(jobId,jobType,jobInactiveTime)
.then( () => {
console.error({action: sAction + '.setJobInactive.success' });
})
.catch( (err) => {
console.error({action: sAction + '.setJobInactive.err', err:err });
});
ctx.pause(workerShutdownTime, (err) => {
if (err) {
console.error({action: sAction + '.err', err:err });
}
else {
console.error({action: sAction + '.gracefully.shutdown' });
}
setTimeout( () => {
console.info({action: sAction + '.process.exit' });
process.exit();
},killDelayTime);
});
})
fMessageWorker(job, job.data)
.then( (data) => {
done();
})
.catch( (err) => {
done(err);
})
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment