Last active
October 11, 2019 16:32
-
-
Save crobinson42/3ffb66c62823abc5ad2546dbce8e3883 to your computer and use it in GitHub Desktop.
Watch mongodb and publish to kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
NOTE: this is a naive first attempt at tailing mongo collections to publish kafka messages | |
and removed module imports and other stuff for brevity.... | |
*/ | |
function normalizeMongoDocument(document) { | |
// convert "_id" to "id" | |
const normalized = { id: document._id, ...document } | |
delete normalized._id | |
return normalized | |
} | |
function normalizeMongoOp(mongoOp) { | |
if (mongoOp === 'delete') return 'destroy' | |
if (mongoOp === 'insert') return 'create' | |
if (mongoOp === 'replace') return 'update' | |
if (mongoOp === 'update') return 'update' | |
return mongoOp | |
} | |
// Use connect method to connect to the server | |
MongoClient.connect(coreMongoUrl, { | |
useNewUrlParser: true, | |
useUnifiedTopology: true, | |
}).then(client => { | |
debug('Connected to MongoDB server') | |
const db = client.db(CoreModels.config.MONGO_DB_NAME) | |
// a clone of the apps main db to track record diffs | |
const dbDiff = client.db(`${CoreModels.config.MONGO_DB_NAME}_clone`) | |
db.on('close', function dbOnClose() { | |
console.log('DB connection closed...', new Date().toISOString()) | |
}) | |
// Define change stream | |
const changeStream = db.watch(undefined, { | |
// options: https://mongodb.github.io/node-mongodb-native/3.3/api/Collection.html#watch | |
fullDocument: 'updateLookup', | |
}) | |
// start listen to changes | |
changeStream.on('change', async event => { | |
debug('changeStream.on(change) triggered') | |
let _id | |
try { | |
_id = event.fullDocument._id | |
} catch (e) { | |
_id = event.documentKey._id | |
} | |
if (!_id) return | |
// fullDocument doesn't always exist, ie: delete op | |
const newRecord = | |
normalizeMongoOp(event.operationType) === 'destroy' | |
? {} | |
: normalizeMongoDocument(event.fullDocument || {}) | |
let oldRecord | |
try { | |
// get old record from diff db | |
oldRecord = await dbDiff | |
.collection(event.ns.coll) | |
.findOne({ _id }) | |
.then(record => record && normalizeMongoDocument(record)) | |
if (normalizeMongoOp(event.operationType) === 'destroy') { | |
// add new record to diff db | |
await dbDiff.collection(event.ns.coll).deleteOne({ _id }) | |
} else { | |
// add new record to diff db | |
await dbDiff.collection(event.ns.coll).findOneAndUpdate( | |
{ _id }, | |
{ $set: event.fullDocument }, | |
{ | |
upsert: true, | |
}, | |
) | |
} | |
} catch (e) { | |
console.log('failed to perform op on clone db', e) | |
// todo: log error | |
} | |
if ( | |
!['delete', 'insert', 'replace', 'update'].includes(event.operationType) | |
) { | |
// we only care about crud ops https://docs.mongodb.com/manual/reference/change-events/ | |
debug( | |
`changeStream.on(change) discarding operationType: ${ | |
event.operationType | |
}`, | |
) | |
return | |
} | |
const kafkaMsg = { | |
model: event.ns.coll, | |
oldRecord: oldRecord || {}, | |
op: normalizeMongoOp(event.operationType), | |
record: newRecord, | |
} | |
debug( | |
`changeStream.on(change) model: ${kafkaMsg.model}, op: ${kafkaMsg.op}`, | |
) | |
kafkaProducer.send({ | |
topic: ServerUtils.constants.kafkaTopics.DB_CORE_MODEL_CHANGE( | |
kafkaMsg.model, | |
), | |
messages: [{ value: JSON.stringify(kafkaMsg) }], | |
}) | |
}) | |
changeStream.on('close', event => { | |
console.log('') | |
console.log('changeStream close', event) | |
console.log('') | |
}) | |
changeStream.on('end', event => { | |
console.log('') | |
console.log('changeStream end', event) | |
console.log('') | |
}) | |
changeStream.on('error', event => { | |
console.log('changeStream error', new Date().toISOString()) | |
console.log(JSON.stringify(event, null, 2)) | |
}) | |
changeStream.on('reconnect', event => { | |
console.log('changeStream reconnect', new Date().toISOString()) | |
console.log(JSON.stringify(event, null, 2)) | |
}) | |
changeStream.on('timeout', event => { | |
console.log('changeStream timeout', new Date().toISOString()) | |
console.log(JSON.stringify(event, null, 2)) | |
}) | |
// changeStream.on('resumeTokenChanged', event => { | |
// console.log('changeStream resumeTokenChanged', event) | |
// }) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment