Skip to content

Instantly share code, notes, and snippets.

@chainhead
Last active March 24, 2020 13:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chainhead/715e48a2f157e2bfe3f012f587f8bf5b to your computer and use it in GitHub Desktop.
Save chainhead/715e48a2f157e2bfe3f012f587f8bf5b to your computer and use it in GitHub Desktop.
MQTT and Server Sent Events with NodeJs

Introduction

This gist describes an MQTT client writing sensor data to an Server Sent Events (SSE) end-point.

Getting started

git clone https://gist.github.com/chainhead/715e48a2f157e2bfe3f012f587f8bf5b
npm i express mqtt
node server.js

Testing

Receiving SSE

In a terminal, run either of these commands.

git clone https://gist.github.com/chainhead/715e48a2f157e2bfe3f012f587f8bf5b
npm i express mqtt
node client.js
curl -N http://localhost:3000/events

You can also see the events in a browser by pointing the browser to http://localhost:3000/events. However, please also the notes on browser support in server.js below.

The terminal will show the MQTT messages published from another terminal. See below.

Publishing SSE

In a separate terminal, publish MQTT messages to test.mosquitto.org on port 1883. The following code snippet shows the usage of mqtt command as the client. It can be installed with npm i -g mqtt.

mqtt publish -h test.mosquitto.org -t mqtt/sse/demo -m '{"a":"A"}'

The server.js will receive the MQTT payload and publish a SSE for the same.

const http = require('http')
//
http.get({
agent: false,
path: '/events',
host: 'localhost',
port: 3000
}, (res) => {
res.on('data', (data) => {
console.log(data.toString())
})
})
const http = require('http')
const stream = require('stream')
const express = require('express')
const mqtt = require('mqtt')
//
var sse = {}
//
const streamR = new stream.Readable({
objectMode: true,
read() { }
})
//
const streamW = new stream.Writable({
objectMode: true,
write: (data, _, done) => {
if (Object.keys(sse).length > 0) {
// Browser support
// sse.write(JSON.stringify(data) + '\n\n')
sse.write(JSON.stringify(data))
}
done()
}
})
//
const client = mqtt.connect('mqtt://test.mosquitto.org')
client.on('connect', function () {
// Subscription is set-up for a topic
client.subscribe('mqtt/sse/demo', function (err) {
console.log('Topic subscribed.')
})
})
//
client.on('message', function (topic, message) {
console.log(message.toString())
streamR.push(JSON.parse(message.toString()))
streamR.pipe(streamW)
})
//
const app = express()
app.get('/events', (req, res) => {
res.status(200).set({
"connection": "keep-alive",
"cache-control": "no-cache",
// Browser support
// "content-type": "text/event-stream"
"content-type": "application/json"
})
sse = res
})
//
http.createServer(app).listen(3000, () => {
console.log('Server up')
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment