Skip to content

Instantly share code, notes, and snippets.

@xaiki
Forked from scttnlsn/README.md
Last active May 8, 2018 14:54
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save xaiki/4722178 to your computer and use it in GitHub Desktop.
Save xaiki/4722178 to your computer and use it in GitHub Desktop.

Pub/sub with MongoDB and Node.js

Setup:

$ mongo
> use pubsub
> db.createCollection('messages', { capped: true, size: 100000 })
> db.messages.insert({})

$ npm install mongoskin

Subscribe:

$ node subscribe-test.js

Publish:

$ mongo
> use pubsub
> db.messages.insert({ message: 'Hello world', time: Date.now() })
var mubsub = require ('./subscribe.js');
var ms = new mubsub ('pubsub');
ms.subscribe ('messages', function (msg) {
console.log(msg);
});
console.log ('got here');
function mubsub (db) {
var mongo = require('mongoskin');
if (typeof db == "string") {
var db = mongo.db('localhost:27017/' + db + '?auto_reconnect', {safe:true});
} else if (! db instanceof mongo.Db) {
console.error ("db must be a mongo.Db or a string.");
return new Error("wrong db object");
}
mubsub.prototype.subscribe = function (col, fn) {
db.open(function(err, db) {
if (err) throw err;
db.collection(col, function(err, collection) {
if (err) throw err;
var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
latest.nextObject(function(err, doc) {
if (err) throw err;
var query = { _id: { $gt: doc._id }};
var options = { tailable: true, awaitdata: true, numberOfRetries: -1 };
var cursor = collection.find(query, options).sort({ $natural: 1 });
(function next() {
cursor.nextObject(function(err, message) {
if (err) throw err;
fn (message);
next();
});
})();
});
});
});
}
}
exports = module.exports = function(db) {
return new mubsub(db);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment