Skip to content

Instantly share code, notes, and snippets.

@mkozjak
Created September 18, 2016 11:53
Show Gist options
  • Save mkozjak/9ee78c417d533bb0e1de6fa0ec92c0e0 to your computer and use it in GitHub Desktop.
Save mkozjak/9ee78c417d533bb0e1de6fa0ec92c0e0 to your computer and use it in GitHub Desktop.
"use strict"
const assertArgs = require("assert-args")
const nats = require("nats")
const url = require("url")
const handlers = require("./handlers")
const utils = require("../utils")
module.exports = class ServiceBus
{
constructor(env, config)
{
let args = assertArgs(arguments,
{
"env": "object",
"[config]": "object"
})
this.env = args.env
this.config = args.config
}
async connect()
{
switch (this.config.type)
{
case "nats":
try
{
this.connection = nats.connect(url.format(
{
protocol: this.config.protocol,
hostname: this.config.hostname,
port: this.config.port,
slashes: true
}))
return this.connection
}
catch (error)
{
throw error
}
break
case "rabbitmq":
try
{
this.connection = await amqp.connect(url.format(
{
protocol: this.config.protocol,
auth: this.config.username + ":" + this.config.password,
hostname: this.config.hostname,
port: this.config.port,
slashes: true
}))
}
catch (error)
{
throw error
}
this.channel = await this.connection.createChannel()
let queues = this.config.queues
for (let name in queues)
{
try
{
switch (queues[name].type)
{
// push -> only one consumer takes the job
case "push":
await this.channel.assertExchange(queues[name].exchange, "topic")
break
}
}
catch (error)
{
throw error
}
}
this.channel.on("error", (error) =>
{
throw error
})
this.channel.on("close", () =>
{
throw new Error("channel closed")
})
return this.channel
}
}
async handleRequests()
{
for (let name in handlers)
{
await handlers[name].call(this.env)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment