Skip to content

Instantly share code, notes, and snippets.

@crobinson42
Last active October 11, 2019 16:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save crobinson42/3ffb66c62823abc5ad2546dbce8e3883 to your computer and use it in GitHub Desktop.
Save crobinson42/3ffb66c62823abc5ad2546dbce8e3883 to your computer and use it in GitHub Desktop.
Watch mongodb and publish to kafka
/*
NOTE: this is a naive first attempt at tailing mongo collections to publish kafka messages
and removed module imports and other stuff for brevity....
*/
function normalizeMongoDocument(document) {
// convert "_id" to "id"
const normalized = { id: document._id, ...document }
delete normalized._id
return normalized
}
function normalizeMongoOp(mongoOp) {
if (mongoOp === 'delete') return 'destroy'
if (mongoOp === 'insert') return 'create'
if (mongoOp === 'replace') return 'update'
if (mongoOp === 'update') return 'update'
return mongoOp
}
// Use connect method to connect to the server
MongoClient.connect(coreMongoUrl, {
useNewUrlParser: true,
useUnifiedTopology: true,
}).then(client => {
debug('Connected to MongoDB server')
const db = client.db(CoreModels.config.MONGO_DB_NAME)
// a clone of the apps main db to track record diffs
const dbDiff = client.db(`${CoreModels.config.MONGO_DB_NAME}_clone`)
db.on('close', function dbOnClose() {
console.log('DB connection closed...', new Date().toISOString())
})
// Define change stream
const changeStream = db.watch(undefined, {
// options: https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch
fullDocument: 'updateLookup',
})
// start listen to changes
changeStream.on('change', async event => {
debug('changeStream.on(change) triggered')
let _id
try {
_id = event.fullDocument._id
} catch (e) {
_id = event.documentKey._id
}
if (!_id) return
// fullDocument doesn't always exist, ie: delete op
const newRecord =
normalizeMongoOp(event.operationType) === 'destroy'
? {}
: normalizeMongoDocument(event.fullDocument || {})
let oldRecord
try {
// get old record from diff db
oldRecord = await dbDiff
.collection(event.ns.coll)
.findOne({ _id })
.then(record => record && normalizeMongoDocument(record))
if (normalizeMongoOp(event.operationType) === 'destroy') {
// add new record to diff db
await dbDiff.collection(event.ns.coll).deleteOne({ _id })
} else {
// add new record to diff db
await dbDiff.collection(event.ns.coll).findOneAndUpdate(
{ _id },
{ $set: event.fullDocument },
{
upsert: true,
},
)
}
} catch (e) {
console.log('failed to perform op on clone db', e)
// todo: log error
}
if (
!['delete', 'insert', 'replace', 'update'].includes(event.operationType)
) {
// we only care about crud ops https://docs.mongodb.com/manual/reference/change-events/
debug(
`changeStream.on(change) discarding operationType: ${
event.operationType
}`,
)
return
}
const kafkaMsg = {
model: event.ns.coll,
oldRecord: oldRecord || {},
op: normalizeMongoOp(event.operationType),
record: newRecord,
}
debug(
`changeStream.on(change) model: ${kafkaMsg.model}, op: ${kafkaMsg.op}`,
)
kafkaProducer.send({
topic: ServerUtils.constants.kafkaTopics.DB_CORE_MODEL_CHANGE(
kafkaMsg.model,
),
messages: [{ value: JSON.stringify(kafkaMsg) }],
})
})
changeStream.on('close', event => {
console.log('')
console.log('changeStream close', event)
console.log('')
})
changeStream.on('end', event => {
console.log('')
console.log('changeStream end', event)
console.log('')
})
changeStream.on('error', event => {
console.log('changeStream error', new Date().toISOString())
console.log(JSON.stringify(event, null, 2))
})
changeStream.on('reconnect', event => {
console.log('changeStream reconnect', new Date().toISOString())
console.log(JSON.stringify(event, null, 2))
})
changeStream.on('timeout', event => {
console.log('changeStream timeout', new Date().toISOString())
console.log(JSON.stringify(event, null, 2))
})
// changeStream.on('resumeTokenChanged', event => {
// console.log('changeStream resumeTokenChanged', event)
// })
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment