Skip to content

Instantly share code, notes, and snippets.

@efossas
Created July 18, 2019 05:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save efossas/6fddd02fd9fa92cba6df33233ac7b1a2 to your computer and use it in GitHub Desktop.
Save efossas/6fddd02fd9fa92cba6df33233ac7b1a2 to your computer and use it in GitHub Desktop.
SocketIO Server That Listens To Mongo Change Streams
'use strict';
const config = require('./config.json');
const mongo = require('mongodb');
const MUUID = require('uuid-mongodb');
const http = require('http').createServer();
const io = require('socket.io')(http);
const redisAdapter = require('socket.io-redis');
const events = require('./events.js');
let database = () => {
return new Promise((resolve, reject) => {
mongo.MongoClient.connect(
config.mongo.url,
{ useNewUrlParser: true },
(err, connection) => {
if (err) {
reject(err);
return;
}
resolve(connection);
}
);
});
};
let sockets = () => {
return new Promise((resolve, reject) => {
io.adapter(redisAdapter({ host: 'redis', port: 6379 }));
io.on('connect', (socket) => {
socket.on('disconnect', () => {
// clean up code goes here
});
events.Check(socket);
events.JoinRoom(socket);
events.LeaveRoom(socket);
});
http.listen(80);
resolve(io);
});
};
let watchChangeStreams = (connection, io) => {
config.mongo.db.forEach(dbName => {
connection.db(dbName).watch([], {
fullDocument: 'updateLookup'
}).on('change', (change) => {
/* only emit updated values */
let cobj;
if (change.operationType === 'update') {
cobj = change.updateDescription.updatedFields;
} else {
cobj = change.fullDocument;
}
/* mongo converts UUID to binary, so let's change it back to a string */
try {
if (!change.fullDocument.uuid) {
return;
} else if (change.fullDocument.uuid.constructor.name !== "Binary") {
throw "constructor.name must be Binary";
} else if (change.fullDocument.uuid.sub_type !== 4) { // http://bsonspec.org/spec.html
throw "Binary object's subtype is not standard UUID";
}
cobj.uuid = MUUID.from(change.fullDocument.uuid).toString();
} catch (err) {
console.error(err);
return;
}
/* emit the change to anyone who has subcribed to the UUID */
console.info('emitting to: ' + cobj.uuid + ' ' + change.ns.coll);
io.to(cobj.uuid).emit(change.ns.coll, cobj);
});
});
console.log(JSON.stringify({ watching: config.mongo.db }));
};
Promise.all([database(), sockets()])
.then(([db, io]) => {
watchChangeStreams(db, io);
})
.catch(err => {
console.error(err);
process.exit(1);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment