Skip to content

Instantly share code, notes, and snippets.

@benmoss
Last active March 22, 2016 12:14
amqp = require('amqplib')
whence = require('when')
class MessageBus
constructor: (@amqp) ->
if config.rabbit?.url?.
@connect()
else
@createFakeChannel()
connect: ->
whence(@amqp.connect(config.rabbit.url))
.with(@)
.then(@createChannel)
.then(@createExchange)
.then(@handleUnrouteableMessages)
.then(@handleDisconnections)
.catch(@reconnect)
createChannel: (@connection) ->
@connection.createChannel()
createExchange: (@channel) ->
@channel.assertExchange "ex", "topic",
autoDelete: true
createFakeChannel: ->
@channel = publish: -> null
handleUnrouteableMessages: ->
@channel.on 'return', (msg) ->
log.error "Message returned to sender",
content: JSON.parse(msg.content)
attributes: msg.fields
handleDisconnections: ->
@connection.on 'error', (e) =>
@reconnect(e)
reconnect: (e) ->
log.error "MessageBus disconnected, attempting to reconnect",
error: e.toString()
@createFakeChannel()
setTimeout(@connect.bind(@), 3000)
publish: (routingKey, content) ->
@channel.publish "ex", routingKey, new Buffer(JSON.stringify(content)),
mandatory: true
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment