Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Server-Sent Events nodejs example. This shows how to detect the client disconnection.
var express = require('express');
var app = express();
// response header for sever-sent events
const SSE_RESPONSE_HEADER = {
'Connection': 'keep-alive',
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
};
// Connected users (request object of each user) :
var users = {};
// SSE starting endpoint
// You can access url `http://localhost:3000/sse/<userId>`
//
// Caution:
// This example exposes <userId> as URI parameter for testing purpose.
// In reality, you should use one stored in req.session.
app.get('/sse/:userId', function(req, res) {
let userId = getUserId(req);
// data for sending
let data;
// Stores this connection
users[userId] = req;
// Writes response header.
res.writeHead(200, SSE_RESPONSE_HEADER);
// Interval loop
let intervalId = setInterval(function() {
console.log(`*** Interval loop. userId: "${userId}"`);
// Creates sending data:
data = {
userId,
users: Object.keys(users).length,
// memoryUsage: process.memoryUsage()
time: new Date().getTime(),
};
// Note:
// For avoidance of client's request timeout,
// you should send a heartbeat data like ':\n\n' (means "comment") at least every 55 sec (30 sec for first time request)
// even if you have no sending data:
if (!data)
res.write(`:\n\n`);
else
res.write(`data: ${JSON.stringify(data)}\n\n`);
}, 3000);
// Note: Heatbeat for avoidance of client's request timeout of first time (30 sec)
res.write(`:\n\n`);
req.on("close", function() {
let userId = getUserId(req);
console.log(`*** Close. userId: "${userId}"`);
// Breaks the interval loop on client disconnected
clearInterval(intervalId);
// Remove from connections
delete users[userId];
});
req.on("end", function() {
let userId = getUserId(req);
console.log(`*** End. userId: "${userId}"`);
});
});
function getUserId(req) {
// Note:
// In reality, you should use userId stored in req.session,
// but not URI parameter.
return req.params.userId;
}
app.listen(3000, function() {
console.log('Example app listening on port 3000!');
});
@DRSDavidSoft
Copy link

DRSDavidSoft commented Feb 8, 2019

Thanks for this simple and easy sse example, it is exactly what I needed.

P.S. you have a missing r in Heartbeat here:

// Note: Heatbeat for avoidance of client's request timeout of first time (30 sec) 

@arnaudambro
Copy link

arnaudambro commented Nov 12, 2019

Hi @akirattii, I am using your code, because I like it quite much, especially because you gave me an idea about how to store a connection somehow, with var users = {} .
For myself, I am storing the Response res in this object, not the Request req as you wrote in your code, so that when required to send a message message through the connection opened by a userId, I can retrieve the res from the userId, and use it to send the message by res.write(data:${JSON.stringify(message)}\n\n`.

But I think I am doing something wrong, storing the res in a global variable... I thought about storing it in DB (I am using MongoDB there), but I don't see how either.

What do you think ?

Here below is my code

const SSE_RESPONSE_HEADER = {
  'Connection': 'keep-alive',
  'Content-Type': 'text/event-stream',
  'Cache-Control': 'no-cache',
  'X-Accel-Buffering': 'no'
};

const getUserId = (req, from) => {
  try {
    // console.log(from, req.body, req.params)
    if (!req) return null;
    if (Boolean(req.body) && req.body.userId) return req.body.userId;
    if (Boolean(req.params) && req.params.userId) return req.params.userId;
    return null
  } catch (e) {
    console.log('getUserId error', e)
    return null;
  }
}

global.usersStreams = {}

exports.setupStream = (req, res, next) => {

  let userId = getUserId(req);
  if (!userId) {
    next({ message: 'stream.no-user' })
    return;
  }

  // Stores this connection
  global.usersStreams[userId] = {
    res,
    lastInteraction: null,
  }

  // Writes response header.
  res.writeHead(200, SSE_RESPONSE_HEADER);

  // Note: Heatbeat for avoidance of client's request timeout of first time (30 sec)
  const heartbeat = {type: 'heartbeat'}
  res.write(`data: ${JSON.stringify(heartbeat)}\n\n`);
  global.usersStreams[userId].lastInteraction = Date.now()

  // Interval loop
  const maxInterval = 55000;
  const interval = 3000;
  let intervalId = setInterval(function() {
    if (!global.usersStreams[userId]) return;
    if (Date.now() - global.usersStreams[userId].lastInteraction < maxInterval) return;
    res.write(`data: ${JSON.stringify(heartbeat)}\n\n`);
    global.usersStreams[userId].lastInteraction = Date.now()
  }, interval);


  req.on("close", function() {
    let userId = getUserId(req, 'setupStream on close');
    // Breaks the interval loop on client disconnected
    clearInterval(intervalId);
    // Remove from connections
    delete global.usersStreams[userId];
  });

  req.on("end", function() {
    let userId = getUserId(req, 'setupStream on end');
    clearInterval(intervalId);
    delete global.usersStreams[userId];
  });

};

exports.sendStream = async (userId, data) => {
  if (!userId) return;
  if (!global.usersStreams[userId]) return;
  if (!data) return;

  const { res } = global.usersStreams[userId];

  res.write(`data: ${JSON.stringify({ type: 'event', data })}\n\n`);
  global.usersStreams[userId].lastInteraction = Date.now();

};

@abhinavchawla13
Copy link

abhinavchawla13 commented Feb 20, 2020

Thanks for the example.
@arnaudambro, I have a similar requirement as yours as I need to push notifications to specific users on some of my API calls. How did you end up managing it? Did you store all the res in some data structure? Thanks :)

@arnaudambro
Copy link

arnaudambro commented Feb 21, 2020

@abhinavchawla13
Copy link

abhinavchawla13 commented Feb 21, 2020

@arnaudambro, thanks 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment