Skip to content

Instantly share code, notes, and snippets.

@rahulbhooteshwar
Created July 31, 2024 05:53
Show Gist options
  • Save rahulbhooteshwar/8e72c9f5c17e27cf69fd1fb6c2e015a4 to your computer and use it in GitHub Desktop.
Save rahulbhooteshwar/8e72c9f5c17e27cf69fd1fb6c2e015a4 to your computer and use it in GitHub Desktop.
Working with MongoDB Change Streams

MongoWatcher

  • This a MongoDB Watcher utility - warpper around Change Streams to abstract & reuse the resuming logic.
  • It also adds capability to act on error/close/end events of change streams.
  • It is fully configurable - if you want to resume or not in case of errors or need custom handler for those events
  • Enables detailed logging for various critical events
  • Rewatch & Resume on server election - when primary node is changed!

TS & JS both versions are provided

  • MongoWatcher.js
  • MongoWatcher.ts

Usage

const config = {
  client: "<YOUR Native JS Mongo DB CLIENT>", // if using mongosse, extract the client & provide it
  collectionName: "<YOUR_COLLECTION_NAME>",
  pipeline: [], // specify specific watch conditions
  listeners: {
    onChange: (data) => {
      const { fullDocument } = data;
      logger.info('=========Case change event processed=========', { _id: fullDocument?._id });
    },
  },
};
const myWatcher = new MongoWatcher(config, { from: 'SampleUsageDemo' }); // from is for better tracking in logs
myWatcher.watch();

// keep track of all change streams
const allChangeStreams = []
/**
* @class MongoWatcher
* @classdesc Class representing a wrapper utility around MongoDB change streams
*
* @param {MongoWatcherConfig} config - {@link MongoWatcherConfig | MongoWatcher configuration} object containing different options.
* @param {MongoWatcherMeta} [meta] - {@link MongoWatcherMeta | optional MongoWatcher meta data} - pass object containing anything that needs to be printed with logs
*/
class MongoWatcher {
constructor(config, meta) {
const defaultConfig = {
reWatchOnError: true,
reWatchOnEnd: true,
reWatchOnClose: false,
reWatchOnServerElection: true,
useResumeToken: true,
pipeline: [],
listeners: {},
watchOptions: {}
}
this.config = { ...defaultConfig, ...config }
this.meta = {
origin: `MongoWatcher-${this.config.collectionName}`,
...meta
}
this.changeStream = undefined
}
/**
* Start watching changes via MongoDB Change streams considering provided config
* & with encapsulated rewatch & resuming logic
*/
watch(isReWatch = false) {
const { client, listeners = {}, ...restOfTheConfig } = this.config
const {
collectionName,
pipeline = [],
watchOptions = {},
useResumeToken,
reWatchOnError,
reWatchOnEnd,
reWatchOnClose,
reWatchOnServerElection
} = restOfTheConfig
const { onChange, onError, onEnd, onClose } = listeners
const defaultWatchOptions = { fullDocument: "updateLookup" }
try {
const collectionObj = client.db().collection(collectionName)
this.changeStream = collectionObj.watch(pipeline, {
...defaultWatchOptions,
...watchOptions
})
console.info(
`πŸš€ πŸš€ πŸš€ πŸš€ πŸš€ ${this.meta.origin}: Started watching change stream events πŸš€ πŸš€ πŸš€ πŸš€ πŸš€`,
{ config: restOfTheConfig, isReWatch, ...this.meta }
)
allChangeStreams.push(this.changeStream)
this.changeStream.on("change", change => {
// track resume token if resuming is configured
const resumeToken = change._id
if (useResumeToken) {
this.config.watchOptions.resumeAfter = resumeToken
}
console.info(
`🚚 🚚 🚚 🚚 🚚 ${this.meta.origin}: Received new change stream event 🚚 🚚 🚚 🚚 🚚 `,
{ resumeToken, ...this.meta }
)
console.debug(`[${this.meta.origin}]: change event`, {
...this.meta,
change
})
// call custom callback (if provided)
if (onChange) {
onChange(change)
}
})
this.changeStream.on("error", data => {
if (data?.codeName === "ChangeStreamHistoryLost") {
// to avoid getting the same error infinitely, we need to discard resume token
delete this.config.watchOptions.resumeAfter
}
console.error({
title: `❌ ❌ Change stream errored! NO ACTION NEEDED - IT WILL BE RESUMED SHORTLY!`,
origin: this.meta.origin,
error: new Error(data?.codeName),
meta: { ...this.meta, config: restOfTheConfig, data }
})
// call custom callback (if provided)
if (onError) {
onError(data)
}
if (reWatchOnError) {
this.reWatch()
}
})
this.changeStream.on("end", data => {
console.warn(
`πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ ${this.meta.origin}: Change stream ended! πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ `,
{ ...this.meta, config: restOfTheConfig, data }
)
// call custom callback (if provided)
if (onEnd) {
onEnd(data)
}
if (reWatchOnEnd) {
this.reWatch()
}
})
this.changeStream.on("close", data => {
console.info(
`πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ ${this.meta.origin}: Change stream closed! πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ `,
{ ...this.meta, config: restOfTheConfig, data }
)
// call custom callback (if provided)
if (onClose) {
onClose(data)
}
if (reWatchOnClose) {
this.reWatch()
}
})
if (reWatchOnServerElection && !isReWatch) {
client.on("serverDescriptionChanged", event => {
if (event?.newDescription?.type === "RSPrimary") {
console.info(
`πŸ’‘πŸ’‘πŸ’‘πŸ’‘πŸ’‘πŸ’‘ Server election event: New primary elected: ${event?.address}`,
{
event
}
)
this.reWatch(true)
}
})
}
} catch (error) {
console.error({
title: `MongoWatcher: Error inside ${this.meta.origin}. Rewatch will be triggered!`,
origin: this.meta.origin,
error,
meta: { meta: this.meta, config: restOfTheConfig }
})
this.reWatch()
}
}
// private method
reWatch(isServerElection = false) {
const delayDuration = isServerElection ? 1000 : 5000
if (this.changeStream) {
// cleanup existing watchers & stream
this.changeStream.removeAllListeners()
this.changeStream.close()
}
console.warn(
`βŒ› βŒ› βŒ› βŒ› βŒ› ${this.meta.origin}: Rewatch will be triggered shortly! βŒ› βŒ› βŒ› βŒ› βŒ› `,
{ ...this.meta }
)
// add some delay for recovery (if connection issue) & then trigger watch
setTimeout(() => {
this.watch(true)
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { client, listeners = {}, ...restOfTheConfig } = this.config
if (isServerElection) {
console.warn(
`πŸ” πŸ” πŸ” πŸ” πŸ” ${this.meta.origin}: Re-initialized the watcher on server election πŸ” πŸ” πŸ” πŸ” πŸ”`,
{ ...this.meta, config: restOfTheConfig }
)
} else {
console.warn(
`βœ… βœ… βœ… βœ… βœ… ${this.meta.origin}: Re-initialized the watcher on detection of absence/closure of stream βœ… βœ… βœ… βœ… βœ…`,
{ ...this.meta, config: restOfTheConfig }
)
}
}, delayDuration)
}
}
/** close all registered change streams : This will be useful for global cleanup (if required) */
function closeAllChangeStreams() {
allChangeStreams?.forEach(changeStream => {
if (changeStream) {
// cleanup existing watchers & stream
changeStream.removeAllListeners()
changeStream.close()
}
})
}
export { closeAllChangeStreams, MongoWatcher }
/**
* Configuration for the MongoWatcher class.
*/
interface MongoWatcherConfig {
/**
* The MongoDB client object.
*/
client: any;
/**
* The name of the MongoDB collection to watch.
*/
collectionName: string;
/**
* Determines whether to re-watch on error events. Defaults to `true`.
*/
reWatchOnError?: boolean;
/**
* Determines whether to re-watch on end events. Defaults to `true`.
*/
reWatchOnEnd?: boolean;
/**
* Determines whether to re-watch on close events (if `changeStream.close()` was called explicitly). Defaults to `false`.
*/
reWatchOnClose?: boolean;
/**
* Determines whether to re-watch on server election events. Defaults to `true`.
*/
reWatchOnServerElection?: boolean;
/**
* Determines whether to use the last resume token when re-watching. Defaults to `true`.
*/
useResumeToken?: boolean;
/**
* An array of aggregation pipeline stages through which to pass change stream documents.
* This allows for filtering and manipulating the change stream documents.
*/
pipeline?: any[];
/**
* Optional event listener callbacks.
*/
listeners?: {
/**
* Callback function to be called on "change" events.
*/
onChange?: (change: any) => void;
/**
* Callback function to be called on "close" events.
*/
onClose?: (data: any) => void;
/**
* Callback function to be called on "error" events.
*/
onError?: (data: any) => void;
/**
* Callback function to be called on "end" events.
*/
onEnd?: (data: any) => void;
};
/**
* Options for the MongoDB change stream.
*/
watchOptions?: any;
}
/**
* Metadata for the MongoWatcher class.
*/
interface MongoWatcherMeta {
/**
* The origin of the registered watcher to be used with logging.
* Defaults to "MongoWatcher-collection-name-here".
*/
origin?: any;
}
// keep track of all change streams
const allChangeStreams: any[] = [];
/**
* @class MongoWatcher
* @classdesc Class representing a wrapper utility around MongoDB change streams
*
* @param {MongoWatcherConfig} config - {@link MongoWatcherConfig | MongoWatcher configuration} object containing different options.
* @param {MongoWatcherMeta} [meta] - {@link MongoWatcherMeta | optional MongoWatcher meta data} - pass object containing anything that needs to be printed with logs
*/
class MongoWatcher {
private config: MongoWatcherConfig;
private meta: MongoWatcherMeta;
private changeStream: any | undefined;
constructor(config: MongoWatcherConfig, meta?: MongoWatcherMeta) {
const defaultConfig: any = {
reWatchOnError: true,
reWatchOnEnd: true,
reWatchOnClose: false,
reWatchOnServerElection: true,
useResumeToken: true,
pipeline: [],
listeners: {},
watchOptions: {},
};
this.config = { ...defaultConfig, ...config };
this.meta = { origin: `MongoWatcher-${this.config.collectionName}`, ...meta };
this.changeStream = undefined;
}
/**
* Start watching changes via MongoDB Change streams considering provided config
* & with encapsulated rewatch & resuming logic
*/
public watch(isReWatch = false): void {
const { client, listeners = {}, ...restOfTheConfig } = this.config;
const {
collectionName,
pipeline = [],
watchOptions = {},
useResumeToken,
reWatchOnError,
reWatchOnEnd,
reWatchOnClose,
reWatchOnServerElection,
} = restOfTheConfig;
const { onChange, onError, onEnd, onClose } = listeners;
const defaultWatchOptions: any = { fullDocument: 'updateLookup' };
try {
const collectionObj: any = client.db().collection(collectionName);
this.changeStream = collectionObj.watch(pipeline, {
...defaultWatchOptions,
...watchOptions,
});
console.info(
`πŸš€ πŸš€ πŸš€ πŸš€ πŸš€ ${this.meta.origin}: Started watching change stream events πŸš€ πŸš€ πŸš€ πŸš€ πŸš€`,
{ config: restOfTheConfig, isReWatch, ...this.meta }
);
allChangeStreams.push(this.changeStream);
this.changeStream.on('change', (change) => {
// track resume token if resuming is configured
const resumeToken = change._id;
if (useResumeToken) {
this.config.watchOptions.resumeAfter = resumeToken;
}
console.info(
`🚚 🚚 🚚 🚚 🚚 ${this.meta.origin}: Received new change stream event 🚚 🚚 🚚 🚚 🚚 `,
{ resumeToken, ...this.meta }
);
console.debug(`[${this.meta.origin}]: change event`, { ...this.meta, change });
// call custom callback (if provided)
if (onChange) {
onChange(change);
}
});
this.changeStream.on('error', (data) => {
if (data?.codeName === "ChangeStreamHistoryLost") {
// to avoid getting the same error infinitely, we need to discard resume token
delete this.config.watchOptions.resumeAfter
}
console.error({
title: `❌ ❌ Change stream errored! NO ACTION NEEDED - IT WILL BE RESUMED SHORTLY!`,
origin: this.meta.origin,
error: new Error(data?.codeName),
meta: { ...this.meta, config: restOfTheConfig, data },
});
// call custom callback (if provided)
if (onError) {
onError(data);
}
if (reWatchOnError) {
this.reWatch();
}
});
this.changeStream.on('end', (data) => {
console.warn(
`πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ ${this.meta.origin}: Change stream ended! πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ πŸ‘‹ `,
{ ...this.meta, config: restOfTheConfig, data }
);
// call custom callback (if provided)
if (onEnd) {
onEnd(data);
}
if (reWatchOnEnd) {
this.reWatch();
}
});
this.changeStream.on('close', (data) => {
console.info(
`πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ ${this.meta.origin}: Change stream closed! πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ πŸ”Œ `,
{ ...this.meta, config: restOfTheConfig, data }
);
// call custom callback (if provided)
if (onClose) {
onClose(data);
}
if (reWatchOnClose) {
this.reWatch();
}
});
if (reWatchOnServerElection && !isReWatch) {
client.on('serverDescriptionChanged', (event) => {
if (event?.newDescription?.type === 'RSPrimary') {
console.info(`πŸ’‘πŸ’‘πŸ’‘πŸ’‘πŸ’‘πŸ’‘ Server election event: New primary elected: ${event?.address}`, {
event,
});
this.reWatch(true);
}
});
}
} catch (error) {
console.error({
title: `MongoWatcher: Error inside ${this.meta.origin}. Rewatch will be triggered!`,
origin: this.meta.origin,
error,
meta: { meta: this.meta, config: restOfTheConfig },
});
this.reWatch();
}
}
// private method
private reWatch(isServerElection = false): void {
const delayDuration = isServerElection ? 1000 : 5000;
if (this.changeStream) {
// cleanup existing watchers & stream
this.changeStream.removeAllListeners();
this.changeStream.close();
}
console.warn(
`βŒ› βŒ› βŒ› βŒ› βŒ› ${this.meta.origin}: Rewatch will be triggered shortly! βŒ› βŒ› βŒ› βŒ› βŒ› `,
{ ...this.meta }
);
// add some delay for recovery (if connection issue) & then trigger watch
setTimeout(() => {
this.watch(true);
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { client, listeners = {}, ...restOfTheConfig } = this.config;
if (isServerElection) {
console.warn(
`πŸ” πŸ” πŸ” πŸ” πŸ” ${this.meta.origin}: Re-initialized the watcher on server election πŸ” πŸ” πŸ” πŸ” πŸ”`,
{ ...this.meta, config: restOfTheConfig }
);
} else {
console.warn(
`βœ… βœ… βœ… βœ… βœ… ${this.meta.origin}: Re-initialized the watcher on detection of absence/closure of stream βœ… βœ… βœ… βœ… βœ…`,
{ ...this.meta, config: restOfTheConfig }
);
}
}, delayDuration);
}
}
/** close all registered change streams : This will be useful for global cleanup (if required) */
function closeAllChangeStreams(): void {
allChangeStreams?.forEach((changeStream) => {
if (changeStream) {
// cleanup existing watchers & stream
changeStream.removeAllListeners();
changeStream.close();
}
});
}
export { closeAllChangeStreams, MongoWatcher };
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment