Skip to content

Instantly share code, notes, and snippets.

@zerobias
Created February 16, 2021 12:54
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zerobias/7eab81f592211f5971709bc406d4feb6 to your computer and use it in GitHub Desktop.
Save zerobias/7eab81f592211f5971709bc406d4feb6 to your computer and use it in GitHub Desktop.
server sent events with effector
const express = require('express')
const {createStore, createEvent, sample} = require('effector')
const clientConnected = createEvent()
const statusRequested = createEvent()
const updateReceived = createEvent()
const clientClosed = createEvent()
const pushUpdate = createEvent()
const clients$ = createStore([])
.on(clientConnected, (list, client) => [...list, client])
.on(clientClosed, (list, id) => list.filter(c => c.id !== id))
clients$.watch(statusRequested, (clients, res) => {
res.json({clients: clients.length})
})
clients$.watch(updateReceived, (clients, data) => {
const content = `data: ${JSON.stringify(data)}\n\n`
for (const {res} of clients) {
res.write(content)
}
})
clientClosed.watch(id => {
console.log(`${id} Connection closed`)
})
const port = process.env.port || 8080
const app = express()
app.post('/items', require('body-parser').json(), (req, res) => {
const newData = req.body
items.push(newData)
res.json(newData)
updateReceived(newData)
})
app.get('/subscribe', (req, res) => {
// Mandatory headers and http status to keep connection open
res.writeHead(200, {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache'
})
// After client opens connection send all items as string
res.write(`data: ${JSON.stringify(items)}\n\n`)
// Generate an id based on timestamp and save res
// object of client connection on clients list
// Later we'll iterate it and send updates to each client
const clientId = Date.now()
clientConnected({
id: clientId,
res
})
// When client closes connection we update the clients list
// avoiding the disconnected one
req.on('close', () => {
clientClosed(clientId)
})
})
app.get('/status', (req, res) => statusRequested(res))
let items = []
app.listen(port, () => {
console.log('server listening on port: %d', port)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment