Skip to content

Instantly share code, notes, and snippets.

@taxilian
Created September 4, 2022 04:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taxilian/55b840efd4c074a5968392497fd938d8 to your computer and use it in GitHub Desktop.
Save taxilian/55b840efd4c074a5968392497fd938d8 to your computer and use it in GitHub Desktop.
Simple node.js typescript script to use change streams to mirror all CRUD operations from one mongodb connection to another
import {MongoClient, ResumeToken, Timestamp, Logger} from 'mongodb';
import path from 'path';
import fs from 'fs';
import {serialize as bsonSerialize, deserialize as bsonDeserialize} from 'bson';
const MONGO_URL_SRC = process.env.MONGO_URL_SRC || 'mongodb://localhost:27017';
const MONGO_DB_SRC = process.env.MONGO_DB_SRC || void 0;
const MONGO_URL_DEST = process.env.MONGO_URL_DEST || 'mongodb://localhost:27018';
const MONGO_DB_DEST = process.env.MONGO_DB_DEST || void 0;
const MONGO_STATEFILE = process.env.MONGO_STATEFILE || path.resolve(__dirname, '..', 'state.json');
// Must be a valid UNIX timestamp, e.g. Math.floor(Date.now() / 1000)
const START_TIMESTAMP = process.env.MONGO_START_TIMESTAMP ? Number(process.env.MONGO_START_TIMESTAMP) : void 0;
Logger.setLevel('debug');
function encodeToken(token: ResumeToken) {
return bsonSerialize(token as any).toString('base64');
}
function decodeToken(token: string): ResumeToken {
return bsonDeserialize(Buffer.from(token, 'base64'));
}
interface ResumeState {
resumeToken: string;
}
async function saveResumeToken(resumeToken: ResumeToken) {
if (!MONGO_STATEFILE) {
return;
}
const curState: ResumeState = {
resumeToken: encodeToken(resumeToken),
};
await fs.promises.writeFile(MONGO_STATEFILE, JSON.stringify(curState, null, 2));
}
async function connectDb(url: string, logName: string) {
const client = await MongoClient.connect(url);
console.log(`Connected to ${logName} at ${url}`);
return client;
}
export async function main() {
let resumeToken: ResumeToken;
if (MONGO_STATEFILE) {
try {
const stateData: ResumeState = JSON.parse(await fs.promises.readFile(MONGO_STATEFILE, 'utf8'));
resumeToken = decodeToken(stateData.resumeToken);
} catch {
console.warn("Could not read resume token from state file");
}
}
const [clientSrc, clientDest] = await Promise.all([
connectDb(MONGO_URL_SRC, 'Source DB'),
connectDb(MONGO_URL_DEST, 'Destination DB'),
]);
console.log("Connected to MongoDB both servers");
const srcDb = clientSrc.db(MONGO_DB_SRC);
const destDb = clientDest.db(MONGO_DB_DEST);
const watcher = srcDb.watch([], {
fullDocument: 'updateLookup',
allowDiskUse: true,
...(
// If we have a resume token then use that
resumeToken ? {startAfter: resumeToken} :
// If we have no resume token but do have a start timestamp then use that
START_TIMESTAMP ? {startAtOperationTime: new Timestamp({i: START_TIMESTAMP, t: 1})} :
{}),
});
watcher.on('change', async (change) => {
// console.log("Change", change);
switch(change.operationType) {
case 'insert':
case 'update':
case 'replace':
console.warn(`Processing ${change.operationType}: ${change.ns.coll} ${JSON.stringify(change.documentKey)}`);
if (!change.fullDocument) {
console.warn("Could not update document! No full document available.", change);
break;
}
await destDb.collection(change.ns.coll).replaceOne(change.documentKey, change.fullDocument, {upsert: true});
break;
case 'delete':
console.warn(`Processing delete: ${change.ns.coll} ${JSON.stringify(change.documentKey)}`);
await destDb.collection(change.ns.coll).deleteOne(change.documentKey);
break;
default:
// ignore
console.warn(`Ignoring operation: ${change.operationType}`);
break;
}
await saveResumeToken(change._id);
});
watcher.on('error', (err) => {
console.error("Watcher error", err);
});
watcher.on('close', () => {
console.log("Database connection closed!");
});
}
if (require.main === module) {
// This is being called directly, not required as a module.
main().catch(err => {
console.error(`Unexpected error:`, err);
process.exit(1);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment