Created
January 30, 2020 21:58
-
-
Save skyjur/3d63694c8d89665e7bad06ce2a95a5c1 to your computer and use it in GitHub Desktop.
Change feed listener
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const _log = require("../../utils/logger") | |
/** | |
* @typedef {{ | |
* query: any, | |
* includeInitial?: boolean, | |
* onChange(oldVal: any, newVal: any, ctx?: Ork.Context): any, | |
* onState?: (state: string, ctx?: Ork.Context) => any, | |
* dataTimeoutInterval?: number, | |
* stateTimeoutInterval?: number, | |
* logPrefix?: string | |
* ctx?: Ork.Context | |
* }} ChangeFeedArgs | |
*/ | |
/** | |
* | |
* @param {ChangeFeedArgs} params | |
*/ | |
function runChangeFeedForever (params) { | |
const { | |
query, | |
includeInitial = false, | |
onChange, | |
onState, | |
ctx = {}, | |
dataTimeoutInterval = 10 * 60e3, | |
stateTimeoutInterval = 10000, | |
logPrefix = `[ChangeFeed ${Math.round((Date.now() + Math.random()) * 100) % | |
1000000000}]` | |
} = params | |
const { log = _log } = ctx | |
let activeCursor = null | |
let closed = false | |
log(logPrefix, "starting") | |
const onStateTimeout = () => { | |
log(logPrefix, "cursor timeout") | |
restart() | |
} | |
const onDataTimeout = () => { | |
log(logPrefix, "inactive") | |
restart() | |
} | |
const onError = e => { | |
log("Error:", logPrefix, e) | |
restart() | |
} | |
// cancel cursor if no state arrives in 3 sec: | |
const stateTimeoutTimer = setTimeout(onStateTimeout, stateTimeoutInterval) | |
// cancel cursor if no data arrives in 1min to trigger reconnect | |
let dataTimeoutTimer = setTimeout(onDataTimeout, dataTimeoutInterval) | |
const restart = () => { | |
if (!closed) { | |
closed = true | |
clearTimeout(stateTimeoutTimer) | |
clearTimeout(dataTimeoutTimer) | |
if (activeCursor) { | |
activeCursor.close() | |
} | |
setTimeout(() => { | |
log(logPrefix, "restarting") | |
runChangeFeedForever({ ...params, logPrefix }) | |
}, 3000) | |
} | |
} | |
query | |
.changes({ | |
includeInitial, | |
includeStates: true, | |
includeTypes: false | |
}) | |
.run((err, cursor) => { | |
if (err) { | |
return onError(err) | |
} else { | |
activeCursor = cursor | |
if (closed) { | |
log(logPrefix, "received cursor, but already closed") | |
cursor.close() | |
return | |
} | |
log(logPrefix, "cursor received") | |
cursor.each(async function (err, row) { | |
clearTimeout(stateTimeoutTimer) | |
clearTimeout(dataTimeoutTimer) | |
if (closed) { | |
return cursor.close() | |
} | |
if (err) { | |
return onError(err) | |
} else { | |
// cancel cursor if no data arrives in 1min to trigger reconnect | |
dataTimeoutTimer = setTimeout(onDataTimeout, dataTimeoutInterval) | |
try { | |
if ((row.new_val || row.old_val) && onChange) { | |
await onChange(row.old_val, row.new_val, ctx) | |
} else if (row.state) { | |
log(logPrefix, "state", row.state) | |
if (onState) { | |
await onState(row.state, ctx) | |
} | |
} | |
} catch (e) { | |
log( | |
"Error:", | |
logPrefix, | |
`in handling row: ${JSON.stringify(row)}): ${e}`, | |
e | |
) | |
} | |
} | |
}) | |
} | |
}) | |
} | |
module.exports = { runChangeFeedForever } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment