Skip to content

Instantly share code, notes, and snippets.

@jonshipman
Created August 17, 2022 21:54
Show Gist options
  • Save jonshipman/bea61caea0cc6faef3a13bc4ea218564 to your computer and use it in GitHub Desktop.
Save jonshipman/bea61caea0cc6faef3a13bc4ea218564 to your computer and use it in GitHub Desktop.
Example of MongoDB Change Stream updating the same document.
/**
* I wanted to create an example of a change stream that updates itself.
* I couldn't find any examples online that weren't a simple "console.log" in the change stream.
* The secret sauce is in the $update.$match.
* We are only watching for changes that do not update the searchKeywords (in this example).
*
* In execution this collection has a text index on "searchKeywords" and createEdgeNGrams is a standard algorithm for splitting up words.
* If you ran LeadModel.updateOne({_id: anyId}, {name:'Hello World'}), the on('change') will be triggered and update the searchKeywords.
* Since this followup update changes searchKeywords, the filter will prevent an infinite loop.
*/
import createEdgeNGrams from '../../utils/create-edge-ngrams.js';
import Worker from '../../utils/worker.js';
import { CustomerModel } from '../customer/index.js';
import LeadModel from './model.js';
const $update = [
{
$match: {
'updateDescription.updatedFields.searchKeywords': {
$exists: false,
},
},
},
];
const $options = { fullDocument: 'updateLookup' };
LeadModel.watch($update, $options).on('change', (data) => {
const leadId = data.documentKey._id;
const lead = data.fullDocument;
Worker.once('lead-search-keywords-update-' + leadId, async () => {
const searchKeywords = [];
if (lead.name) {
searchKeywords.push(lead.name);
}
if (lead.contact) {
const contact = await CustomerModel.findOne({
_id: lead.contact,
})
.lean()
.exec();
if (contact) {
if (contact.firstName) {
searchKeywords.push(contact.firstName);
}
if (contact.lastName) {
searchKeywords.push(contact.lastName);
}
}
}
if (lead.address && lead.address.length > 0) {
for (const address of lead.address) {
const a = address.toString || address.address1;
searchKeywords.push(a);
}
}
let searchKeywordProperty;
if (searchKeywords.length > 0) {
searchKeywordProperty = createEdgeNGrams(
searchKeywords.join(' ').toUpperCase()
);
}
if (
searchKeywordProperty &&
searchKeywordProperty !== lead.searchKeywords
) {
await LeadModel.updateOne(
{ _id: leadId },
{
searchKeywords: searchKeywordProperty,
}
);
}
});
});
@jonshipman
Copy link
Author

The Worker is a wrapper around cluster.forking to prevent duplicate events (the class will divvy up activities to available processes).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment