Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save barisere/fa3c2d3be7b65511fb0915794682197f to your computer and use it in GitHub Desktop.
Save barisere/fa3c2d3be7b65511fb0915794682197f to your computer and use it in GitHub Desktop.
function makeJobId (task) {
const hash = crypto.createHash("md5");
hash.update(JSON.stringify(task), "utf8");
return hash.digest("hex");
}
/**
* Registers `workerFn` as a function for adding and processing tasks.
* The tasks are identified by workerFn's name property, so use named functions.
*
* @param {*} workerFn The function to register.
* @returns
*/
function makeWorker (workerFn) {
if (workerFn == null || typeof workerFn !== "function") {
return null;
}
undoQueueWorkers.set(workerFn.name, workerFn);
/**
* Adds jobs to be processed by `workerFn`.
*
* @param {any[]} args Arguments to be passed to `workerFn`.
* @returns {Promise<string|number>}
*/
async function addJob (...args) {
const task = {
worker: workerFn.name,
args
};
const jobId = makeJobId(task);
const job = await queue.add(task, {
jobId
});
return job.id;
};
return addJob;
}
exports.makeWorker = makeWorker;
/**
* queueWorker receives jobs from the queue and calls the worker functions
* that registered them to process them.
*
* @param {object} job The job to process
* @returns {any} The return value of the worker function.
*/
async function queueWorker (job) {
const workerFn = undoQueueWorkers.get(job.data.worker);
if (workerFn == null) {
const error = new Error(`No worker registered for job ${job.id}.`);
error.data = `${job.data}`;
throw error;
}
const args = job.data.args;
const result = await workerFn(...args);
return result;
}
// register queueWorker to process jobs from the queue
queue.process(queueWorker);
/**
* Cancels the processing of a job waiting to be processed.
* The job cannot be cancelled if it has already been, or is being, processed.
*
* @param {string|number} jobId The ID of the job to cancel
* @returns {Promise<boolean>}
*/
async function cancelJob (jobId) {
const job = await queue.getJob(jobId);
if (job != null) {
await job.remove();
return true;
}
return false;
}
exports.cancelJob = cancelJob;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment