Skip to content

Instantly share code, notes, and snippets.

@emeraldsanto
Created December 25, 2021 18:29
Show Gist options
  • Save emeraldsanto/0ac98e6bca5288748e3b9299ee04cace to your computer and use it in GitHub Desktop.
Save emeraldsanto/0ac98e6bca5288748e3b9299ee04cace to your computer and use it in GitHub Desktop.
Type safe MongoDB change stream
import { EventEmitter } from 'events';
import { getInstance } from './core-util/mongo';
import { ChangeEvent, ChangeEventCR, ChangeEventDelete, ChangeEventUpdate, ChangeStream, ObjectId } from 'mongodb';
type ChangeListenerFunction<T> = (payload: T) => void;
interface User {
address: {
city: string
country: string
}
person: {
firstName: string
lastName: string
}
}
export class ChangeListener<TModel extends object> {
private emitter: EventEmitter;
private collection: string;
private changeStream: ChangeStream<TModel> | null = null;
constructor(collection: string) {
this.emitter = new EventEmitter();
this.collection = collection;
this.startWatching();
}
on(name: 'created', fn: ChangeListenerFunction<TModel>): void;
on(name: 'updated', fn: ChangeListenerFunction<TModel>): void;
on(name: 'deleted', fn: ChangeListenerFunction<ObjectId>): void;
on<TKey extends string & keyof TModel>(name: `${TKey}Changed`, fn: ChangeListenerFunction<TModel[TKey] | null>): void;
on(name: string, fn: ChangeListenerFunction<any>): void {
this.emitter.on(name, fn);
}
once(name: 'created', fn: ChangeListenerFunction<TModel>): void;
once(name: 'updated', fn: ChangeListenerFunction<TModel>): void;
once(name: 'deleted', fn: ChangeListenerFunction<ObjectId>): void;
once<TKey extends string & keyof TModel>(name: `${TKey}Changed`, fn: ChangeListenerFunction<TModel[TKey] | null>): void;
once(name: string, fn: ChangeListenerFunction<any>): void {
this.emitter.once(name, fn);
}
off(name: 'created', fn: ChangeListenerFunction<TModel>): void;
off(name: 'updated', fn: ChangeListenerFunction<TModel>): void;
off(name: 'deleted', fn: ChangeListenerFunction<ObjectId>): void;
off<TKey extends string & keyof TModel>(name: `${TKey}Changed`, fn: ChangeListenerFunction<TModel[TKey] | null>): void;
off(name: string, fn: ChangeListenerFunction<any>): void {
this.emitter.off(name, fn);
}
emit(name: 'created', payload: TModel): void;
emit(name: 'updated', payload: TModel): void;
emit(name: 'deleted', payload: ObjectId): void;
emit<TKey extends string & keyof TModel>(name: `${TKey}Changed`, payload: TModel[TKey] | null): void;
emit(name: string, payload: unknown): void {
this.emitter.emit(name, payload);
}
private async startWatching() {
if (this.changeStream) return;
try {
const db = await getInstance();
this.changeStream = db.collection(this.collection).watch(null, { fullDocument: 'updateLookup' });
this.changeStream?.on('change', this.onChange);
} catch (error) {
console.log(error);
this.changeStream?.close();
}
}
private onChange(event: ChangeEvent<TModel>) {
switch (event.operationType) {
case 'insert':
this.onCreate(event);
break;
case 'update':
this.onUpdate(event);
break;
case 'delete':
this.onDelete(event);
break;
default:
console.log(`Operation not supported: ${event.operationType}`);
break;
}
}
private onCreate(event: ChangeEventCR<TModel>) {
console.log(`Created a document in the ${this.collection} collection with ID ${event.documentKey._id}`);
this.emit('created', event.fullDocument!);
}
private onUpdate(event: ChangeEventUpdate<TModel>) {
console.log(`Updated a document in the ${this.collection} collection with ID ${event.documentKey._id}`);
this.emit('updated', event.fullDocument!);
Object
.entries(event.updateDescription.updatedFields)
.forEach(([key, value]) => this.emit(`${key}Changed` as any, value));
event.updateDescription.removedFields
.forEach((key) => this.emit(`${key}Changed` as any, null));
}
private onDelete(event: ChangeEventDelete<TModel>) {
console.log(`Deleted a document in the ${this.collection} collection with ID ${event.documentKey._id}`);
this.emit('deleted', event.documentKey._id);
}
}
const userEvents = new ChangeListener<User>('users');
userEvents.on('deleted', (id) => {
});
userEvents.on('created', (user) => {
});
userEvents.on('personChanged', (person) => {
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment