Skip to content

Instantly share code, notes, and snippets.

@scttnlsn
Created July 30, 2012 22:16
Show Gist options
  • Star 56 You must be signed in to star a gist
  • Fork 20 You must be signed in to fork a gist
  • Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Save scttnlsn/3210919 to your computer and use it in GitHub Desktop.
Pub/sub with MongoDB and Node.js

Pub/sub with MongoDB and Node.js

Initialize the capped collection:

$ mongo
> use pubsub
> db.createCollection('messages', { capped: true, size: 100000 })
> db.messages.insert({})
> exit

Running:

$ npm install mongodb
$ node subscriber.js

In another terminal:

$ node publish.js
var mongo = require('mongodb');
var server = new mongo.Server('localhost', 27017);
var db = new mongo.Db('pubsub', server);
db.open(function(err) {
if (err) throw err;
db.collection('messages', function(err, collection) {
if (err) throw err;
setInterval(function() {
collection.insert({ foo: 'bar', time: Date.now() }, function(err) {
if (err) throw err;
console.log('published', doc._id);
});
}, 2000);
});
});
var mongo = require('mongodb');
var server = new mongo.Server('localhost', 27017);
var db = new mongo.Db('pubsub', server);
db.open(function(err) {
if (err) throw err;
db.collection('messages', function(err, collection) {
if (err) throw err;
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, doc) {
if (err) throw err;
var query = { _id: { $gt: doc._id }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, doc) {
if (err) throw err;
console.log(doc);
next();
});
})();
});
});
});
@cjtylor
Copy link

cjtylor commented Jan 24, 2013

Hi, thanks very much for this post - this is very useful to me.
One question, does the subscribe.js essentially polling the mongodb, or it only queries the db when there is new data arrival?

Why i am asking is:
I checked the log of mongo db - and i see action every 2 seconds, even if i did not publish any new data into the db.
Thanks very much!

below is the mongo log:

Thu Jan 24 08:18:23 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2098ms
Thu Jan 24 08:18:25 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2095ms
Thu Jan 24 08:18:27 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2100ms
Thu Jan 24 08:18:29 [conn5] getmore pubsub.messages query: { _id: { $gt: ObjectId('5100e3dc47191fae38cd3664') } } cursorid:2982739666572129973 reslen:20 2095ms

@xaiki
Copy link

xaiki commented Feb 6, 2013

hey, I've turned this gist into a module: https://gist.github.com/xaiki/4722178
would you consider uploading it to npm ?

@xaiki
Copy link

xaiki commented Feb 6, 2013

https://github.com/scttnlsn/mubsub
oh you already did that, sorry =)

@lcneves
Copy link

lcneves commented Feb 13, 2019

subscribe.js, line 26: We're trusting that cursor.nextObject() will put the callback in the event loop, which is a reasonable assumption for an asynchronous call, but it still relies on the implementation. If cursor.nextObject() calls the callback directly, we will eventually have a "Maximum call stack exceeded" error. Just to be on the safe side, we could change this line to process.nextTick(() => next());.

@papnkukn
Copy link

Update for the 2019s - mongodb 3.3.4 or similar

const MongoClient = require('mongodb').MongoClient;

const url = 'mongodb://localhost:27017';
const dbname = 'pubsub';

MongoClient.connect(url, { useUnifiedTopology: true }, function(err, client) {
  if (err) throw err;
 
  const db = client.db(dbname);
  
  db.collection('messages', function(err, collection) {
      if (err) throw err;

      var latest = collection.find({ }).limit(1); //.sort({ $natural: -1 })

      latest.next(function(err, doc) {
          if (err) throw err;

          var query = { _id: { $gt: doc._id }};
          
          var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
          var cursor = collection.find(query, options); //.sort({ $natural: 1 })

          (function next() {
              cursor.next(function(err, message) {
                  if (err) throw err;
                  console.log(message);
                  next();
                  //or better process.nextTick(() => next());
              });
          })();
      });
  });
 
  //client.close();
});

@xtianus79
Copy link

is this now obsolete with streams?

@hugojerez
Copy link

An adaptation I made with Mongoose on a project with es-lint rules

      this.mongooseClient =  this.app.get('mongooseClient');
      this.Model  = this.mongooseClient.model('events');
subscribe(callback) {
      let lastMessageIDReceived; 
      // subscribe to message queue and emit data
      // Get the last registered item
      var latest = this.Model.find({ }).sort({ $natural: -1 }).limit(1); 
  
      latest.cursor().next((err, doc)=> {
        if (err) throw err;
  
        var query = { _id: { $gt: doc._id }};
        var options = { data:1, tailable: true, awaitdata: true, numberOfRetries: -1 };
        var cursor = this.Model.find(query, options).sort({ $natural: -1 }).limit(1);
      
        const cursorIterator =  () =>{
          cursor.cursor().next((err, message) =>{
            if (err) throw err;
            // Avoid nulls and Repeated messages
            if(message && lastMessageIDReceived != String(message._id)){
                // JSON Parse-Stringify to remove Getters and Setters
              const result =  JSON.parse(JSON.stringify(message));
                // String cast to remove Getters and Setters
                lastMessageIDReceived = String(message._id);
              // Execute our custom callback
              callback(result.data);
            }
            cursorIterator();
          });
        };
        // Here begins the iteration
        cursorIterator();
      });
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment