Skip to content

Instantly share code, notes, and snippets.

@mkozjak
Created September 18, 2016 11:55
Show Gist options
  • Save mkozjak/93ef7ca95d51dde6c628f31a5ad391d0 to your computer and use it in GitHub Desktop.
Save mkozjak/93ef7ca95d51dde6c628f31a5ad391d0 to your computer and use it in GitHub Desktop.
"use strict"
const assertArgs = require("assert-args")
const nats = require("nats")
const url = require("url")
const handlers = require("./handlers")
const utils = require("../utils")
module.exports = class ServiceBus {
constructor(env, config) {
let args = assertArgs(arguments, {
"env": "object",
"[config]": "object"
})
this.env = args.env
this.config = args.config
}
async connect() {
switch (this.config.type) {
case "nats":
try {
this.connection = nats.connect(url.format({
protocol: this.config.protocol,
hostname: this.config.hostname,
port: this.config.port,
slashes: true
}))
return this.connection
} catch (error) {
throw error
}
break
case "rabbitmq":
try {
this.connection = await amqp.connect(url.format({
protocol: this.config.protocol,
auth: this.config.username + ":" + this.config.password,
hostname: this.config.hostname,
port: this.config.port,
slashes: true
}))
} catch (error) {
throw error
}
this.channel = await this.connection.createChannel()
let queues = this.config.queues
for (let name in queues) {
try {
switch (queues[name].type) {
// push -> only one consumer takes the job
case "push":
await this.channel.assertExchange(queues[name].exchange, "topic")
break
}
} catch (error) {
throw error
}
}
this.channel.on("error", (error) => {
throw error
})
this.channel.on("close", () => {
throw new Error("channel closed")
})
return this.channel
}
}
async handleRequests() {
for (let name in handlers) {
await handlers[name].call(this.env)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment