Skip to content

Instantly share code, notes, and snippets.

@jasonphillips
Last active April 18, 2017 15:37
Show Gist options
  • Save jasonphillips/81f2d73ba9d76b1de4cdcdf3528bbe04 to your computer and use it in GitHub Desktop.
Save jasonphillips/81f2d73ba9d76b1de4cdcdf3528bbe04 to your computer and use it in GitHub Desktop.
RabbitMQ Router (Express-like) for Topic Exchanges
const amqp = require('amqplib');
class JackRabbit {
constructor(url, opts) {
this.channel = this.makeConnection(url, opts);
this.errorCB = (error) => console.log(error);
this.exchanges = {};
this.queued = {};
this.channel.catch((error) => this.errorCB(error));
}
exchange(ex, opts) {
return new JackRabbitExchange(this, ex, opts);
}
makeConnection(url, opts) {
return new Promise((resolve, reject) => {
amqp.connect(url, opts).then((conn) => {
process.once('SIGINT', () => conn.close() );
conn.createChannel().then((ch) => resolve(ch));
});
});
}
createHandler(ex, route, opts, cb) {
this.channel.then((ch) => {
ch.assertQueue(route, opts)
.then(() => ch.bindQueue(route, ex, route))
.then(() => ch.prefetch(1))
.then(() => ch.consume(route, this.wrapHandler(ch, ex, cb)));
});
}
wrapHandler(ch, ex, cb) {
return (msg) => {
ch.pub = (route, msg) => ch.publish(
ex, route, new Buffer(JSON.stringify(msg))
);
msg.json = JSON.parse(msg.content);
const response = cb(msg, ch);
if (response===true) ch.ack(msg);
if (response===false) ch.nack(msg);
}
}
addExchange(exchange, name) {
this.exchanges[name] = exchange;
}
publish(ex, route, json) {
this.channel.then((ch) => {
this.exchanges[ex].then(() => {
ch.publish(ex, route, new Buffer(JSON.stringify(json)));
});
}, (e) => this.errorCB(e));
}
onError(cb) {
this.errorCB = cb;
}
}
class JackRabbitExchange {
constructor(rabbit, ex, opts) {
this.ex = ex;
this.rabbit = rabbit;
this.exchange = this.makeExchange(ex, opts);
rabbit.addExchange(this.exchange, ex);
}
handle(route, opts, cb) {
this.exchange.then(() => {
this.rabbit.createHandler(this.ex, route, opts, cb);
});
return this;
}
makeExchange(ex, opts) {
return new Promise((resolve, reject) => {
this.rabbit.channel.then((ch) =>
ch.assertExchange(ex, 'topic', opts).then((exchange) => resolve(exchange))
);
});
}
}
export default JackRabbit;
const JackRabbit = require('./JackRabbit');
const app = new JackRabbit('amqp://localhost')
app.exchange('coolExchange')
.handle('cool.tasks.#', null, (msg, ch) => {
console.log('I got a task message', msg.json);
ch.pub('cool.events.forYou', {hey: 'over to you, other route'});
return true;
})
.handle('cool.events.#', {noack:true}, (msg, ch) => {
console.log('And I received an event message:', msg.json);
})
const publishTask = (coolTask) => app.publish('coolExchange', 'cool.tasks.newTasks', coolTask);
publishTask({look: 'I sent this immediately and it waited for instantiation'});
I got a task message {"look":"I sent this immediately and it waited for instantiation"}
And I received an event message: {"hey":"over to you, other route"}
@jasonphillips
Copy link
Author

A router-like interface for RabbitMQ, when using a 'topic' exchange. See usage.js above.

Advantages:

  • simple, chainable syntax for handling routes (which are queue routing patterns)
  • bootstraps your needed sequence automatically in background:
    1. creates connection,
    2. creates a channel,
    3. asserts your exchanges,
    4. asserts your queues,
    5. binds your route patterns to queues,
    6. binds your handlers to queues
  • adds helpers to handler:
    1. msg.json containing JSON-decoded message.contents
    2. passes channel as ch, with convenience method ch.pub for publishing to same exchange as route
    3. will ack(msg) or nack for you if route simply returns true/false (you can still call ch.ack if desired)
    4. allows immediate creation of functions for publishing, to be exported elsewhere in your app without waiting (see publishTask method created in example); will be queued up via promises and fired when connections all ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment