Created
September 4, 2022 04:21
-
-
Save taxilian/55b840efd4c074a5968392497fd938d8 to your computer and use it in GitHub Desktop.
Simple node.js typescript script to use change streams to mirror all CRUD operations from one mongodb connection to another
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import {MongoClient, ResumeToken, Timestamp, Logger} from 'mongodb'; | |
import path from 'path'; | |
import fs from 'fs'; | |
import {serialize as bsonSerialize, deserialize as bsonDeserialize} from 'bson'; | |
const MONGO_URL_SRC = process.env.MONGO_URL_SRC || 'mongodb://localhost:27017'; | |
const MONGO_DB_SRC = process.env.MONGO_DB_SRC || void 0; | |
const MONGO_URL_DEST = process.env.MONGO_URL_DEST || 'mongodb://localhost:27018'; | |
const MONGO_DB_DEST = process.env.MONGO_DB_DEST || void 0; | |
const MONGO_STATEFILE = process.env.MONGO_STATEFILE || path.resolve(__dirname, '..', 'state.json'); | |
// Must be a valid UNIX timestamp, e.g. Math.floor(Date.now() / 1000) | |
const START_TIMESTAMP = process.env.MONGO_START_TIMESTAMP ? Number(process.env.MONGO_START_TIMESTAMP) : void 0; | |
Logger.setLevel('debug'); | |
function encodeToken(token: ResumeToken) { | |
return bsonSerialize(token as any).toString('base64'); | |
} | |
function decodeToken(token: string): ResumeToken { | |
return bsonDeserialize(Buffer.from(token, 'base64')); | |
} | |
interface ResumeState { | |
resumeToken: string; | |
} | |
async function saveResumeToken(resumeToken: ResumeToken) { | |
if (!MONGO_STATEFILE) { | |
return; | |
} | |
const curState: ResumeState = { | |
resumeToken: encodeToken(resumeToken), | |
}; | |
await fs.promises.writeFile(MONGO_STATEFILE, JSON.stringify(curState, null, 2)); | |
} | |
async function connectDb(url: string, logName: string) { | |
const client = await MongoClient.connect(url); | |
console.log(`Connected to ${logName} at ${url}`); | |
return client; | |
} | |
export async function main() { | |
let resumeToken: ResumeToken; | |
if (MONGO_STATEFILE) { | |
try { | |
const stateData: ResumeState = JSON.parse(await fs.promises.readFile(MONGO_STATEFILE, 'utf8')); | |
resumeToken = decodeToken(stateData.resumeToken); | |
} catch { | |
console.warn("Could not read resume token from state file"); | |
} | |
} | |
const [clientSrc, clientDest] = await Promise.all([ | |
connectDb(MONGO_URL_SRC, 'Source DB'), | |
connectDb(MONGO_URL_DEST, 'Destination DB'), | |
]); | |
console.log("Connected to MongoDB both servers"); | |
const srcDb = clientSrc.db(MONGO_DB_SRC); | |
const destDb = clientDest.db(MONGO_DB_DEST); | |
const watcher = srcDb.watch([], { | |
fullDocument: 'updateLookup', | |
allowDiskUse: true, | |
...( | |
// If we have a resume token then use that | |
resumeToken ? {startAfter: resumeToken} : | |
// If we have no resume token but do have a start timestamp then use that | |
START_TIMESTAMP ? {startAtOperationTime: new Timestamp({i: START_TIMESTAMP, t: 1})} : | |
{}), | |
}); | |
watcher.on('change', async (change) => { | |
// console.log("Change", change); | |
switch(change.operationType) { | |
case 'insert': | |
case 'update': | |
case 'replace': | |
console.warn(`Processing ${change.operationType}: ${change.ns.coll} ${JSON.stringify(change.documentKey)}`); | |
if (!change.fullDocument) { | |
console.warn("Could not update document! No full document available.", change); | |
break; | |
} | |
await destDb.collection(change.ns.coll).replaceOne(change.documentKey, change.fullDocument, {upsert: true}); | |
break; | |
case 'delete': | |
console.warn(`Processing delete: ${change.ns.coll} ${JSON.stringify(change.documentKey)}`); | |
await destDb.collection(change.ns.coll).deleteOne(change.documentKey); | |
break; | |
default: | |
// ignore | |
console.warn(`Ignoring operation: ${change.operationType}`); | |
break; | |
} | |
await saveResumeToken(change._id); | |
}); | |
watcher.on('error', (err) => { | |
console.error("Watcher error", err); | |
}); | |
watcher.on('close', () => { | |
console.log("Database connection closed!"); | |
}); | |
} | |
if (require.main === module) { | |
// This is being called directly, not required as a module. | |
main().catch(err => { | |
console.error(`Unexpected error:`, err); | |
process.exit(1); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment