Skip to content

Instantly share code, notes, and snippets.

@intech
Created May 2, 2021 15:33
Show Gist options
  • Save intech/cef1c814341565ca5543dcd10828857d to your computer and use it in GitHub Desktop.
Save intech/cef1c814341565ca5543dcd10828857d to your computer and use it in GitHub Desktop.
Moleculer Middleware redis-smq
const { callbackify } = require("util");
const { GracefulStopTimeoutError } = require("moleculer").Errors;
const { Message, Producer, Consumer } = require("redis-smq");
const events = require("redis-smq/src/events");
module.exports = function QueueMiddleware() {
const producers = new Map();
const consumers = new Map();
function gracefulShutdown(broker, items) {
if (!items || !items.size) return broker.Promise.resolve();
return new broker.Promise(resolve => {
let timedOut = false;
const timeout = setTimeout(() => {
timedOut = true;
broker.logger.error(new GracefulStopTimeoutError({
items: items.keys()
}));
resolve();
}, 5e3);
let first = true;
const checkForDown = () => {
broker.logger.info(`🦄 Queue items down ${items.size}`);
if (items.size) {
clearTimeout(timeout);
resolve();
} else {
if (first) {
broker.logger.warn(
`🦄 Queue waiting for ${items.size} running item(s)...`
);
first = false;
}
if (!timedOut) setTimeout(checkForDown, 100);
}
};
setImmediate(checkForDown);
});
}
function getQueueName(schema) {
if (schema.version != null && !(schema.settings || {
}).$noVersionPrefix) {
return formatQueueName(`v${schema.version}.${schema.name}`);
}
return formatQueueName(schema.name);
}
function formatQueueName(queue) {
return queue.toLowerCase().replace(/[^a-z0-9_-]/g, "-");
}
return {
name: "Queue",
serviceCreating(service, schema) {
if (schema.queues) {
for(let [name, handler] of Object.entries(schema.queues)) {
let options = {
};
if(typeof handler === "object") {
handler = handler.handler;
options = handler.options;
} else if(typeof handler !== "function") throw new Error(`Required handler for queue '${name}'`);
Consumer.queueName = `${getQueueName(schema)}-${name}`;
const consumer = new Consumer(service.broker.options.queue, options);
consumer.consume = callbackify(handler).bind(service);
consumer.on(events.DOWN, () => consumers.delete(name));
consumers.set(Consumer.queueName, consumer);
consumer.run();
service.broker.logger.info(
`🦄 Queue consumer ${name} in service ${Consumer.queueName}`
);
}
service.broker.logger.info(
`🦄 Queue consumer registers ${Object.keys(schema.queues).length}`
);
}
},
// After the broker created
created(broker) {
class Job extends Message {
constructor(queue) {
super();
this.queue = formatQueueName(queue);
}
async produce() {
return new broker.Promise((resolve, reject) => {
let producer = null;
if (!producers.has(this.queue)) {
producer = new Producer(this.queue, broker.options.queue);
producer.on(events.DOWN, () => producers.delete(this.queue));
producers.set(this.queue, producer);
} else producer = producers.get(this.queue);
producer.produceMessage(this, err => (err ? reject(err) : resolve()));
});
}
}
if(!broker.options.queue) broker.options.queue = {
};
broker.options.queue.namespace = broker.options.namespace;
if(!Array.isArray(broker.options.replCommands)) broker.options.replCommands = [];
broker.options.replCommands.push({
command: "queue",
alias: "job",
description: "Add new job",
options: [
{
option: "-n, --queue <queueName>", description: "Queue name"
},
{
option: "-b, --body <body>", description: "Job body"
}
],
types: {
//number: ["service"]
},
async action(broker, args) {
const { options } = args;
console.log(options);
const job = broker.queue.newJob(options.queue);
let payload = options.body;
if (typeof(payload) == "string") payload = JSON.parse(payload);
if(payload) job.setBody(payload);
const startTime = process.hrtime();
const result = await job.produce();
const diff = process.hrtime(startTime);
const duration = (diff[0] + diff[1] / 1e9) * 1000;
broker.logger.info(`Done ${options.queue}:${job.getId()}, duration: ${duration}ms`);
broker.logger.info(job.toString(), result);
}
});
broker.logger.info("🦄 Queue middleware ready");
broker.queue = {
newJob: (queue) => new Job(queue)
};
},
started(broker) {
this.logger.info(broker.options.queue);
},
// Before broker stopping
stopping(broker) {
return Promise.all([
gracefulShutdown(broker, producers),
gracefulShutdown(broker, consumers)
]);
}
};
};
@intech
Copy link
Author

intech commented May 2, 2021

Simple integrate with REPL and magic layer for consumer&producer =)
Снимок экрана от 2021-05-02 18-35-40
Снимок экрана от 2021-05-02 18-35-25

@intech
Copy link
Author

intech commented May 2, 2021

const job = broker.queue.newJob("v1.batch.test"); // template queues: v1.service.handler or service.handler
job.setBody({ test: 123 });
// in docs more

Job instance of Message: https://github.com/weyoss/redis-smq/blob/master/docs/api/message.md

@weyoss
Copy link

weyoss commented Feb 3, 2022

Hey @intech, I have found your gist while googling and decided to leave you a comment.

Many design and API changes have been made since the last time we were discussing RedisSMQ.

Now the MQ is more suitable for micro-services. Please take a look at https://github.com/weyoss/redis-smq for an overview.

Also the number of Redis connections used by the MQ, and the number of queries to Redis server, were optimized and significantly reduced while maintaining the performance.

So, I hope you try it again. Your feedbacks are welcome )

@intech
Copy link
Author

intech commented Feb 3, 2022

@weyoss hey! Thanks for the reply. I'll check the new version!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment