Skip to content

Instantly share code, notes, and snippets.

@dolphin278
Last active February 12, 2022 16:30
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save dolphin278/5445957 to your computer and use it in GitHub Desktop.
Save dolphin278/5445957 to your computer and use it in GitHub Desktop.
Pub/sub example for nodejs using mongodb

Uses capped collection, tailable cursors and streams.

What's here?

  • init.js recreates collection capped collection 'queue' on mongodb.
  • writer.js spams queue with new messages
  • worker.js processes all messages saved to queue,
  • onceWorker.js processes only unprocessed messages, so you can spawn several of them and each of your messages will be processed by only one worker.

Steps

  1. Create database named captest on your locally installed mongodb, or change connection strings in all *.js files
  2. Start node ./init.js — it will recreate capped collection for messages. Note: it drops collection named 'queue' so don't run it on your database with valuable data.
  3. Start node ./writer.js — it will start to push data to your collection

Notes

  • Don't use in production — it's just example — it does not handle any errors.
  • You can support your workers with queries during stream creation — it will allow you to pick only subset of all messages.
  • Try to stop onceWorker while writer is still working, wait for some time, then start it again — you'll see how onceWorker will process all messages that were added in the time of it's "downtime"
  • Try to start several instances of onceWorker.js to see how they balance workload
  • You can implement worker.js as a Stream itself, and re-implement onceWorker as a Stream that takes event objects from worker.js Stream. Do it as a homework :)

Links

var mongodb = require('mongodb');
MongoClient = mongodb.MongoClient;
function onCreated(err, collection) {
console.log('capped collection recreated');
process.exit();
}
function onDropped(db) {
db.createCollection('queue', {
capped: true,
size: 1000000,
max: 100
}, onCreated);
}
function onConnected(err, db) {
if (err) { process.exit(); }
if (db) {
console.log('connected to mongodb');
}
db.dropCollection('queue', onDropped.bind(null, db))
}
MongoClient.connect('mongodb://localhost/captest', onConnected);
var mongodb = require('mongodb');
MongoClient = mongodb.MongoClient;
function onCollection(err, collection) {
var cursor = collection.find({}, { tailable: true }),
cursorStream = cursor.stream(),
itemsProcessed = 0;
cursorStream.on('data', function () {
collection.findAndModify(
{ processed: false },
['_id'],
{ $set: { processed: true }},
function (err, obj) {
if (obj) {
console.log(obj.value)
itemsProcessed++;
}
}
);
});
setInterval(function () {
console.log('itemsProcessed', itemsProcessed);
}, 1000);
}
function onConnected(err, db) {
db.collection('queue', onCollection);
}
MongoClient.connect('mongodb://localhost/captest', onConnected);
var mongodb = require('mongodb');
MongoClient = mongodb.MongoClient;
function onCollection(err, collection) {
var cursor = collection.find({}, { tailable: true, awaitdata: true }),
cursorStream = cursor.stream(),
itemsProcessed = 0;
cursorStream.on('data', function (data) {
console.log(data.value);
itemsProcessed++;
});
setInterval(function () {
console.log('itemsProcessed', itemsProcessed);
}, 1000);
}
function onConnected(err, db) {
db.collection('queue', onCollection);
}
MongoClient.connect('mongodb://localhost/captest', onConnected);
var mongodb = require('mongodb');
MongoClient = mongodb.MongoClient;
function onCollection(err, collection) {
var itemsSent = 0;
setInterval(function () {
collection.insert({
value: Math.round(Math.random() * 99999999),
processed: false
}, function (err, doc) {
itemsSent++;
});
}, 750);
setInterval(function () {
console.log('items sent', itemsSent);
}, 1000);
}
function onConnected(err, db) {
db.collection('queue', onCollection);
}
MongoClient.connect('mongodb://localhost/captest', onConnected);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment