|
/** |
|
* Configuration for the MongoWatcher class. |
|
*/ |
|
interface MongoWatcherConfig { |
|
/** |
|
* The MongoDB client object. |
|
*/ |
|
client: any; |
|
|
|
/** |
|
* The name of the MongoDB collection to watch. |
|
*/ |
|
collectionName: string; |
|
|
|
/** |
|
* Determines whether to re-watch on error events. Defaults to `true`. |
|
*/ |
|
reWatchOnError?: boolean; |
|
|
|
/** |
|
* Determines whether to re-watch on end events. Defaults to `true`. |
|
*/ |
|
reWatchOnEnd?: boolean; |
|
|
|
/** |
|
* Determines whether to re-watch on close events (if `changeStream.close()` was called explicitly). Defaults to `false`. |
|
*/ |
|
reWatchOnClose?: boolean; |
|
|
|
/** |
|
* Determines whether to re-watch on server election events. Defaults to `true`. |
|
*/ |
|
reWatchOnServerElection?: boolean; |
|
|
|
/** |
|
* Determines whether to use the last resume token when re-watching. Defaults to `true`. |
|
*/ |
|
useResumeToken?: boolean; |
|
|
|
/** |
|
* An array of aggregation pipeline stages through which to pass change stream documents. |
|
* This allows for filtering and manipulating the change stream documents. |
|
*/ |
|
pipeline?: any[]; |
|
|
|
/** |
|
* Optional event listener callbacks. |
|
*/ |
|
listeners?: { |
|
/** |
|
* Callback function to be called on "change" events. |
|
*/ |
|
onChange?: (change: any) => void; |
|
|
|
/** |
|
* Callback function to be called on "close" events. |
|
*/ |
|
onClose?: (data: any) => void; |
|
|
|
/** |
|
* Callback function to be called on "error" events. |
|
*/ |
|
onError?: (data: any) => void; |
|
|
|
/** |
|
* Callback function to be called on "end" events. |
|
*/ |
|
onEnd?: (data: any) => void; |
|
}; |
|
|
|
/** |
|
* Options for the MongoDB change stream. |
|
*/ |
|
watchOptions?: any; |
|
} |
|
|
|
/** |
|
* Metadata for the MongoWatcher class. |
|
*/ |
|
interface MongoWatcherMeta { |
|
/** |
|
* The origin of the registered watcher to be used with logging. |
|
* Defaults to "MongoWatcher-collection-name-here". |
|
*/ |
|
origin?: any; |
|
} |
|
|
|
// keep track of all change streams |
|
const allChangeStreams: any[] = []; |
|
|
|
/** |
|
* @class MongoWatcher |
|
* @classdesc Class representing a wrapper utility around MongoDB change streams |
|
* |
|
* @param {MongoWatcherConfig} config - {@link MongoWatcherConfig | MongoWatcher configuration} object containing different options. |
|
* @param {MongoWatcherMeta} [meta] - {@link MongoWatcherMeta | optional MongoWatcher meta data} - pass object containing anything that needs to be printed with logs |
|
*/ |
|
|
|
|
|
class MongoWatcher { |
|
private config: MongoWatcherConfig; |
|
private meta: MongoWatcherMeta; |
|
private changeStream: any | undefined; |
|
|
|
constructor(config: MongoWatcherConfig, meta?: MongoWatcherMeta) { |
|
const defaultConfig: any = { |
|
reWatchOnError: true, |
|
reWatchOnEnd: true, |
|
reWatchOnClose: false, |
|
reWatchOnServerElection: true, |
|
useResumeToken: true, |
|
pipeline: [], |
|
listeners: {}, |
|
watchOptions: {}, |
|
}; |
|
|
|
this.config = { ...defaultConfig, ...config }; |
|
this.meta = { origin: `MongoWatcher-${this.config.collectionName}`, ...meta }; |
|
this.changeStream = undefined; |
|
} |
|
|
|
/** |
|
* Start watching changes via MongoDB Change streams considering provided config |
|
* & with encapsulated rewatch & resuming logic |
|
*/ |
|
public watch(isReWatch = false): void { |
|
const { client, listeners = {}, ...restOfTheConfig } = this.config; |
|
const { |
|
collectionName, |
|
pipeline = [], |
|
watchOptions = {}, |
|
useResumeToken, |
|
reWatchOnError, |
|
reWatchOnEnd, |
|
reWatchOnClose, |
|
reWatchOnServerElection, |
|
} = restOfTheConfig; |
|
|
|
const { onChange, onError, onEnd, onClose } = listeners; |
|
|
|
const defaultWatchOptions: any = { fullDocument: 'updateLookup' }; |
|
|
|
try { |
|
const collectionObj: any = client.db().collection(collectionName); |
|
this.changeStream = collectionObj.watch(pipeline, { |
|
...defaultWatchOptions, |
|
...watchOptions, |
|
}); |
|
|
|
console.info( |
|
`π π π π π ${this.meta.origin}: Started watching change stream events π π π π π`, |
|
{ config: restOfTheConfig, isReWatch, ...this.meta } |
|
); |
|
|
|
allChangeStreams.push(this.changeStream); |
|
|
|
this.changeStream.on('change', (change) => { |
|
// track resume token if resuming is configured |
|
const resumeToken = change._id; |
|
if (useResumeToken) { |
|
this.config.watchOptions.resumeAfter = resumeToken; |
|
} |
|
|
|
console.info( |
|
`π π π π π ${this.meta.origin}: Received new change stream event π π π π π `, |
|
{ resumeToken, ...this.meta } |
|
); |
|
|
|
console.debug(`[${this.meta.origin}]: change event`, { ...this.meta, change }); |
|
|
|
// call custom callback (if provided) |
|
if (onChange) { |
|
onChange(change); |
|
} |
|
}); |
|
|
|
this.changeStream.on('error', (data) => { |
|
|
|
if (data?.codeName === "ChangeStreamHistoryLost") { |
|
// to avoid getting the same error infinitely, we need to discard resume token |
|
delete this.config.watchOptions.resumeAfter |
|
} |
|
|
|
console.error({ |
|
title: `β β Change stream errored! NO ACTION NEEDED - IT WILL BE RESUMED SHORTLY!`, |
|
origin: this.meta.origin, |
|
error: new Error(data?.codeName), |
|
meta: { ...this.meta, config: restOfTheConfig, data }, |
|
}); |
|
|
|
// call custom callback (if provided) |
|
if (onError) { |
|
onError(data); |
|
} |
|
|
|
if (reWatchOnError) { |
|
this.reWatch(); |
|
} |
|
}); |
|
|
|
this.changeStream.on('end', (data) => { |
|
console.warn( |
|
`π π π π π ${this.meta.origin}: Change stream ended! π π π π π `, |
|
{ ...this.meta, config: restOfTheConfig, data } |
|
); |
|
|
|
// call custom callback (if provided) |
|
if (onEnd) { |
|
onEnd(data); |
|
} |
|
|
|
if (reWatchOnEnd) { |
|
this.reWatch(); |
|
} |
|
}); |
|
|
|
this.changeStream.on('close', (data) => { |
|
console.info( |
|
`π π π π π ${this.meta.origin}: Change stream closed! π π π π π `, |
|
{ ...this.meta, config: restOfTheConfig, data } |
|
); |
|
|
|
// call custom callback (if provided) |
|
if (onClose) { |
|
onClose(data); |
|
} |
|
|
|
if (reWatchOnClose) { |
|
this.reWatch(); |
|
} |
|
}); |
|
|
|
if (reWatchOnServerElection && !isReWatch) { |
|
client.on('serverDescriptionChanged', (event) => { |
|
if (event?.newDescription?.type === 'RSPrimary') { |
|
console.info(`π‘π‘π‘π‘π‘π‘ Server election event: New primary elected: ${event?.address}`, { |
|
event, |
|
}); |
|
this.reWatch(true); |
|
} |
|
}); |
|
} |
|
} catch (error) { |
|
console.error({ |
|
title: `MongoWatcher: Error inside ${this.meta.origin}. Rewatch will be triggered!`, |
|
origin: this.meta.origin, |
|
error, |
|
meta: { meta: this.meta, config: restOfTheConfig }, |
|
}); |
|
this.reWatch(); |
|
} |
|
} |
|
|
|
// private method |
|
private reWatch(isServerElection = false): void { |
|
const delayDuration = isServerElection ? 1000 : 5000; |
|
if (this.changeStream) { |
|
// cleanup existing watchers & stream |
|
this.changeStream.removeAllListeners(); |
|
this.changeStream.close(); |
|
} |
|
|
|
console.warn( |
|
`β β β β β ${this.meta.origin}: Rewatch will be triggered shortly! β β β β β `, |
|
{ ...this.meta } |
|
); |
|
|
|
// add some delay for recovery (if connection issue) & then trigger watch |
|
setTimeout(() => { |
|
this.watch(true); |
|
|
|
// eslint-disable-next-line @typescript-eslint/no-unused-vars |
|
const { client, listeners = {}, ...restOfTheConfig } = this.config; |
|
|
|
if (isServerElection) { |
|
console.warn( |
|
`π π π π π ${this.meta.origin}: Re-initialized the watcher on server election π π π π π`, |
|
{ ...this.meta, config: restOfTheConfig } |
|
); |
|
} else { |
|
console.warn( |
|
`β
β
β
β
β
${this.meta.origin}: Re-initialized the watcher on detection of absence/closure of stream β
β
β
β
β
`, |
|
{ ...this.meta, config: restOfTheConfig } |
|
); |
|
} |
|
|
|
}, delayDuration); |
|
} |
|
} |
|
|
|
/** close all registered change streams : This will be useful for global cleanup (if required) */ |
|
function closeAllChangeStreams(): void { |
|
allChangeStreams?.forEach((changeStream) => { |
|
if (changeStream) { |
|
// cleanup existing watchers & stream |
|
changeStream.removeAllListeners(); |
|
changeStream.close(); |
|
} |
|
}); |
|
} |
|
|
|
export { closeAllChangeStreams, MongoWatcher }; |
|
|