Skip to content

Instantly share code, notes, and snippets.

@vladgovor77771
Last active November 23, 2020 17:25
Show Gist options
  • Save vladgovor77771/1e182c87b67acf5b9f6ff0fe6b7152b6 to your computer and use it in GitHub Desktop.
Save vladgovor77771/1e182c87b67acf5b9f6ff0fe6b7152b6 to your computer and use it in GitHub Desktop.
Queue arena based on bullmq. Allows chaining jobs.
const { Queue, QueueEvents } = require('bullmq');
const { EventEmitter } = require('events');
const config = require('../config');
const createUid = require('uid');
class QueueArena extends EventEmitter {
constructor({ redisDb = 1 }) {
super();
this.redisDb = redisDb;
this.redisConfig = {
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
db: 1
}
this.queueNames = new Set();
this.queues = new Map();
this.queueEvents = new Map();
this.chainIds = new Map();
this.chainSteps = new Map();
this.chains = new Map();
this.chainResults = new Map();
}
use(queueNames) {
if (!Array.isArray(queueNames)) queueNames = [ queueNames ];
queueNames.filter(name => !this.queueNames.has(name)).map(name => {
this.queueNames.add(name);
let queue = new Queue(name, { connection: this.redisConfig });
this.queues.set(name, queue);
let queueEvents = new QueueEvents(name, { connection: this.redisConfig });
this.queueEvents.set(name, queueEvents);
queueEvents.on('completed', (async ({ jobId, returnvalue }) => {
let chainId = this.chainIds.get(`${name}:${jobId}`);
if (!chainId) return;
let chainResult = this.chainResults.get(chainId);
chainResult.push({ queueName: name, jobId, returnvalue });
let chainStep = this.chainSteps.get(chainId);
let chain = this.chains.get(chainId);
chainStep++;
this.chainSteps.set(chainId, chainStep);
this.emit(`chain:${chainId}:progress`, { chainStep, stepReturnValue: returnvalue });
this.emit(`${name}:${jobId}:completed`, returnvalue);
if (chainStep >= chain.length) this.emit(`chain:${chainId}:completed`, chainResult);
else {
let nextJob = chain[chainStep];
if (chain[chainStep - 1].passResultAsJobData) nextJob.data = Object.assign(returnvalue, nextJob.data);
nextJob.chainId = chainId;
await this.addJob(nextJob);
}
}).bind(this));
queueEvents.on('failed', (async ({ jobId, failedReason }) => {
let chainId = this.chainIds.get(`${name}:${jobId}`);
if (!chainId) return;
this.emit(`chain:${chainId}:failed`, { queueName: name, jobId, failedReason });
this.emit(`${name}:${jobId}:failed`, failedReason);
}).bind(this));
});
}
async addJob(_job = {}) {
let {
queueName,
data,
options,
chainId,
jobName = '__default__',
} = _job;
let queue = this.queues.get(queueName);
if (!queue) throw new Error('Queue not using!');
let job = await queue.add(jobName, data, options);
if (!chainId) {
chainId = createUid(16);
this.chainResults.set(chainId, []);
this.chainSteps.set(chainId, 0);
this.chains.set(chainId, [ _job ]);
}
this.chainIds.set(`${queueName}:${job.id}`, chainId);
job.awaitResult = () => new Promise((resolve, reject) => {
this.once(`${queueName}:${job.id}:completed`, returnvalue => resolve(returnvalue));
this.once(`${queueName}:${job.id}:failed`, err => reject(new Error(err)));
});
return job;
}
async addChain(jobs) {
if (!jobs) return null;
if (!Array.isArray(jobs)) jobs = [ jobs ];
let chainId = createUid(16);
this.chains.set(chainId, jobs);
this.chainSteps.set(chainId, 0);
this.chainResults.set(chainId, []);
let firstJob = jobs[0];
firstJob.chainId = chainId;
const job = await this.addJob(firstJob);
const chain = {
firstJob: job,
jobs,
awaitResult: () => new Promise((resolve, reject) => {
this.once(`chain:${chainId}:completed`, chainResult => resolve(chainResult));
this.once(`chain:${chainId}:failed`, ({ queueName, jobId, failedReason }) => {
let err = new Error(failedReason);
err.queueName = queueName;
err.jobId = jobId;
reject(err)
});
})
}
return chain;
}
}
module.exports = QueueArena;
const { Worker } = require('bullmq');
const config = require('../config');
const sleep = require('sleep-promise');
const redisConfig = {
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
db: 1
}
const worker1 = new Worker('test1', async (job) => {
let { counter } = job.data;
counter++;
console.log('incremented to', counter);
await sleep(1000);
return { counter };
}, { connection: redisConfig });
const worker2 = new Worker('test2', async (job) => {
let { counter } = job.data;
counter++;
console.log('incremented to', counter);
await sleep(1000);
return { counter };
}, { connection: redisConfig });
const worker3 = new Worker('test3', async (job) => {
let { counter } = job.data;
counter++;
console.log('incremented to', counter);
await sleep(1000);
return { counter };
}, { connection: redisConfig });
const worker4 = new Worker('test4', async (job) => {
let { counter } = job.data;
counter++;
console.log('incremented to', counter);
await sleep(1000);
return { counter };
}, { connection: redisConfig });
const QueueArena = require('./index');
const test = async () => {
let arena = new QueueArena({ redisDb: 1 });
arena.use([ 'test1', 'test2', 'test3', 'test4' ]);
let chain = await arena.addChain([{
queueName: 'test1',
data: { counter: 1 },
passResultAsJobData: true
}, {
queueName: 'test2',
passResultAsJobData: true
}, {
queueName: 'test3',
passResultAsJobData: true
}, {
queueName: 'test4',
}]);
try {
let chainResult = await chain.awaitResult();
console.log(chainResult);
} catch (err) {
// throwing Error in workers will stop chain and emit `chain:${chainId}:failed` => { queueName, jobId, failedReason }
console.warn(err);
}
}
test();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment