Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
{EventEmitter} = require 'events'
eventChannel = new EventEmitter()
eventChannel.on 'error', (args...) -> console.log "Event Channel received error:", args...
module.exports = eventChannel
{Server, Db, Timestamp} = require 'mongodb'
client = new Db 'local', new Server('localhost', 27017, {native_parser: true}), {w: 0}
eventChannel = config.require 'load/eventChannel'
getTimestamp = (date) ->
date ||= new Date()
time = Math.floor(date.getTime() / 1000)
new Timestamp 0, time
getDate = (timestamp) ->
new Date timestamp.high_ * 1000
mapOp =
n: 'noop'
i: 'insert'
u: 'update'
r: 'remove'
options = {} # raw
module.exports =
connect: (opts) ->
options.merge opts
watch: (collection) ->
# watch user model
client.open (err) ->
console.log 'Error connecting:', err if err
client.collection 'oplog.rs', (err, oplog) ->
options =
tailable: true
tailableRetryInterval: 1000
numberOfRetries: 1000
currentTime = getTimestamp()
cursor = oplog.find {ts: {$gte: currentTime}}, options
stream = cursor.stream()
stream.on 'data', (data) ->
if collection
return unless data.ns is collection
if options.raw
event = data
else
event =
timestamp: getDate data.ts
operation: mapOp[data.op] or data.op
namespace: data.ns
id: data.h.toString()
criteria: data.o2
data: data.o
eventChannel.emit 'change', event
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.