Created
August 17, 2022 21:54
-
-
Save jonshipman/bea61caea0cc6faef3a13bc4ea218564 to your computer and use it in GitHub Desktop.
Example of MongoDB Change Stream updating the same document.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* I wanted to create an example of a change stream that updates itself. | |
* I couldn't find any examples online that weren't a simple "console.log" in the change stream. | |
* The secret sauce is in the $update.$match. | |
* We are only watching for changes that do not update the searchKeywords (in this example). | |
* | |
* In execution this collection has a text index on "searchKeywords" and createEdgeNGrams is a standard algorithm for splitting up words. | |
* If you ran LeadModel.updateOne({_id: anyId}, {name:'Hello World'}), the on('change') will be triggered and update the searchKeywords. | |
* Since this followup update changes searchKeywords, the filter will prevent an infinite loop. | |
*/ | |
import createEdgeNGrams from '../../utils/create-edge-ngrams.js'; | |
import Worker from '../../utils/worker.js'; | |
import { CustomerModel } from '../customer/index.js'; | |
import LeadModel from './model.js'; | |
const $update = [ | |
{ | |
$match: { | |
'updateDescription.updatedFields.searchKeywords': { | |
$exists: false, | |
}, | |
}, | |
}, | |
]; | |
const $options = { fullDocument: 'updateLookup' }; | |
LeadModel.watch($update, $options).on('change', (data) => { | |
const leadId = data.documentKey._id; | |
const lead = data.fullDocument; | |
Worker.once('lead-search-keywords-update-' + leadId, async () => { | |
const searchKeywords = []; | |
if (lead.name) { | |
searchKeywords.push(lead.name); | |
} | |
if (lead.contact) { | |
const contact = await CustomerModel.findOne({ | |
_id: lead.contact, | |
}) | |
.lean() | |
.exec(); | |
if (contact) { | |
if (contact.firstName) { | |
searchKeywords.push(contact.firstName); | |
} | |
if (contact.lastName) { | |
searchKeywords.push(contact.lastName); | |
} | |
} | |
} | |
if (lead.address && lead.address.length > 0) { | |
for (const address of lead.address) { | |
const a = address.toString || address.address1; | |
searchKeywords.push(a); | |
} | |
} | |
let searchKeywordProperty; | |
if (searchKeywords.length > 0) { | |
searchKeywordProperty = createEdgeNGrams( | |
searchKeywords.join(' ').toUpperCase() | |
); | |
} | |
if ( | |
searchKeywordProperty && | |
searchKeywordProperty !== lead.searchKeywords | |
) { | |
await LeadModel.updateOne( | |
{ _id: leadId }, | |
{ | |
searchKeywords: searchKeywordProperty, | |
} | |
); | |
} | |
}); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The Worker is a wrapper around cluster.forking to prevent duplicate events (the class will divvy up activities to available processes).