Skip to content

Instantly share code, notes, and snippets.

@cjus
Created August 3, 2018 15:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjus/719d77dd9816115d94083234c293c18b to your computer and use it in GitHub Desktop.
Save cjus/719d77dd9816115d94083234c293c18b to your computer and use it in GitHub Desktop.
Sample job queue code
const redis = require('redis');
const moment = require('moment');
class JobQueue {
constructor() {
this.config = null;
this.redisdb = null;
this.redisKey;
}
/**
* @name init
* @summary initialize with config object
* @param {object} config - configurtion object containing specific keys/values
* @return {promise} promise - resolves once database is connected or failure
*/
init(config) {
this.config = config;
this.redisKey = this.config.jobQueueBaseName;
this.jobProcessorName = this.config.jobProcessorName;
return this._configAccessToRedis();
}
/**
* @name close
* @summary close the redis connection
*/
close() {
this.redisdb.quit();
}
/**
* @name _configAccessToRedis
* @summary Configure access to redis and monitor emitted events
* @private
* @return {promise} promise - resolves once database is connected or failure
*/
_configAccessToRedis() {
// from the node redis documentation site: https://github.com/NodeRedis/node_redis
// TODO: validate that these values are reasonable.
return new Promise((resolve, reject) => {
let config = this.config;
try {
let redisOptions = {
retry_strategy: (options) => {
if (options.error.code === 'ECONNREFUSED') {
// End reconnecting on a specific error and flush all commands with a individual error
return new Error('The server refused the connection');
}
if (options.total_retry_time > 1000 * 60 * 60) {
// End reconnecting after a specific timeout and flush all commands with a individual error
return new Error('Retry time exhausted');
}
if (options.times_connected > 10) {
// End reconnecting with built in error
return undefined;
}
// reconnect after
return Math.max(options.attempt * 100, 3000);
}
};
this.redisdb = redis.createClient(config.redis.port, config.redis.url, redisOptions);
this.redisdb
.on('connect', () => {
this.redisdb.select(config.redis.db, (err, data) => {
resolve();
});
});
} catch (e) {
reject();
}
});
}
/**
* @name _safeJSONStringify
* @summary Safe JSON stringify
* @private
* @param {object} obj - object to stringify
* @return {string} string - stringified object.
* Returns undefined if the object can't be stringified
*/
_safeJSONStringify(obj) {
let data;
try {
data = JSON.stringify(obj);
} catch (e) {
}
return data;
}
/**
* @name _safeJSONParse
* @summary Safe JSON parse
* @private
* @param {string} str - string which will be parsed
* @return {object} obj - parsed object
* Returns undefined if string can't be parsed into an object
*/
_safeJSONParse(str) {
let data;
try {
data = JSON.parse(str);
} catch (e) {
}
return data;
}
/**
* @name clearJobQueue
* @summary clears the queue in prep for new job processing
*/
clearJobQueue() {
this.redisdb.del(`${this.redisKey}:queued`);
}
/**
* @name clearProcessingQueue
* @summary clears the processing queue in prep for new job processing
*/
clearProcessingQueue() {
this.redisdb.del(`${this.redisKey}:${this.jobProcessorName}:processing`);
}
/**
* @name enqueue
* @summary Push a job into a job queue
* @param {object} jobObj - job object which will be queued
* @return {promise} promise - returns a promise
*/
enqueue(jobObj) {
return new Promise((resolve, reject) => {
let js = Object.assign({}, jobObj, {
_ts: moment().unix()
});
js = this._safeJSONStringify(js);
if (!js) {
reject(new Error('unable to stringify object'));
return;
}
this.redisdb.rpush(`${this.redisKey}:queued`, js, (err, data) => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
}
/**
* @name dequeue
* @summary Removes a job from the job queue and moves it into the in-processing queue
* @return {promise} promise - returns a promise resolving to the dequeued job.
*/
dequeue() {
return new Promise((resolve, reject) => {
this.redisdb.rpoplpush(`${this.redisKey}:queued`, `${this.redisKey}:${this.jobProcessorName}:processing`, (err, data) => {
if (err) {
reject(err);
} else {
let job = this._safeJSONParse(data);
resolve(job);
}
});
});
}
/**
* @name completed
* @summary mark a job as completed, by removing it from the in-processing queue
* @param {object} job - job which will be marked as completed
* @return {promise} promise - returns a promise resolving to the completed job.
*/
completed(job) {
return new Promise((resolve, reject) => {
this.redisdb.lrem(`${this.redisKey}:${this.jobProcessorName}:processing`, -1, this._safeJSONStringify(job), (err, data) => {
if (err) {
reject(err);
} else {
let job = this._safeJSONParse(data);
resolve(job);
}
});
});
}
/**
* @name _recover
* @summary Called by the recover function
* @param {function} resolve - called to resolve a promise
* @param {function} reject - called to rejecx a promise
* @private
*/
_doRecover(resolve, reject) {
this.redisdb.rpoplpush(`${this.redisKey}:${this.jobProcessorName}:processing`, `${this.redisKey}:queued`, (err, data) => {
if (err) {
reject(err);
} else {
if (!data) {
resolve();
} else {
this._doRecover(resolve, reject);
}
}
});
}
/**
* @name recover
* @summary Used when client processor has stopped (crashed?) and is restarting.
* @description On restart recover() is called to move all of the clients jobs
* from its queue back to the main queue. The thinking here is that
* in a recovery suituation the client could use help resolving the jobs
* when there are other clients able to help. In either case, the client
* can resume dequeing jobs after this call.
* @return {promise} promise - resolves when there are are no more jobs in the clients queue.
*/
recover() {
return new Promise((resolve, reject) => {
this._doRecover(resolve, reject);
});
}
}
module.exports.createClient = function() {
let jobQueue = new JobQueue();
return jobQueue;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment