Skip to content

Instantly share code, notes, and snippets.

@takali
Created September 14, 2019 21:13
Show Gist options
  • Save takali/5a1ff202662f92922f754b37bbfaf587 to your computer and use it in GitHub Desktop.
Save takali/5a1ff202662f92922f754b37bbfaf587 to your computer and use it in GitHub Desktop.
Listen to RabbitMQ channel and use a Server Sent Event
//express
const express = require('express');
const app = express();
const PORT = process.env.PORT || 5000
//rabbitMQ
const amqp = require('amqplib/callback_api');
const url = process.env.CLOUDAMQP_URL;
const queue = process.env.QUEUE_NAME;
//SSE
const SseChannel = require('sse-channel');
let sysInfoChannel = new SseChannel({
retryTimeout: 250,
historySize: 300,
pingInterval: 15000,
jsonEncode: true,
cors: {
origins: ['*'] // Defaults to []
}
});
app.get('/', function (req, res) {
res.send('Hello World')
})
app.get('/stream', (req, res) => {
sysInfoChannel.addClient(req, res);
});
app.listen(PORT, function (){
console.log(`Listening on ${ PORT }`);
})
amqp.connect(url, function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
channel.assertQueue(queue, {
durable: false
});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) {
let json = msg.content.toString() // let assume that your message in RabbitMQ is JSON
let obj = JSON.parse(json);
sysInfoChannel.send({
id: Date.now(),
data: {
message: obj.data,
}
});
}, {
noAck: true
});
});
});
{
"name": "message",
"version": "1.0.0",
"description": "",
"main": "dispatcher.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "takali",
"license": "ISC",
"dependencies": {
"amqplib": "^0.5.5",
"express": "^4.17.1",
"sse-channel": "^3.1.1"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment