Mongo watcher - based off christkv's implementation: https://github.com/christkv/realtime/blob/master/lib/app/dataproviders/flow_data_provider.js
{EventEmitter} = require 'events' | |
eventChannel = new EventEmitter() | |
eventChannel.on 'error', (args...) -> console.log "Event Channel received error:", args... | |
module.exports = eventChannel |
{Server, Db, Timestamp} = require 'mongodb' | |
client = new Db 'local', new Server('localhost', 27017, {native_parser: true}), {w: 0} | |
eventChannel = config.require 'load/eventChannel' | |
getTimestamp = (date) -> | |
date ||= new Date() | |
time = Math.floor(date.getTime() / 1000) | |
new Timestamp 0, time | |
getDate = (timestamp) -> | |
new Date timestamp.high_ * 1000 | |
mapOp = | |
n: 'noop' | |
i: 'insert' | |
u: 'update' | |
r: 'remove' | |
options = {} # raw | |
module.exports = | |
connect: (opts) -> | |
options.merge opts | |
watch: (collection) -> | |
# watch user model | |
client.open (err) -> | |
console.log 'Error connecting:', err if err | |
client.collection 'oplog.rs', (err, oplog) -> | |
options = | |
tailable: true | |
tailableRetryInterval: 1000 | |
numberOfRetries: 1000 | |
currentTime = getTimestamp() | |
cursor = oplog.find {ts: {$gte: currentTime}}, options | |
stream = cursor.stream() | |
stream.on 'data', (data) -> | |
if collection | |
return unless data.ns is collection | |
if options.raw | |
event = data | |
else | |
event = | |
timestamp: getDate data.ts | |
operation: mapOp[data.op] or data.op | |
namespace: data.ns | |
id: data.h.toString() | |
criteria: data.o2 | |
data: data.o | |
eventChannel.emit 'change', event |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment