Skip to content

Instantly share code, notes, and snippets.

@skyjur
Created January 30, 2020 21:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skyjur/3d63694c8d89665e7bad06ce2a95a5c1 to your computer and use it in GitHub Desktop.
Save skyjur/3d63694c8d89665e7bad06ce2a95a5c1 to your computer and use it in GitHub Desktop.
Change feed listener
const _log = require("../../utils/logger")
/**
* @typedef {{
* query: any,
* includeInitial?: boolean,
* onChange(oldVal: any, newVal: any, ctx?: Ork.Context): any,
* onState?: (state: string, ctx?: Ork.Context) => any,
* dataTimeoutInterval?: number,
* stateTimeoutInterval?: number,
* logPrefix?: string
* ctx?: Ork.Context
* }} ChangeFeedArgs
*/
/**
*
* @param {ChangeFeedArgs} params
*/
function runChangeFeedForever (params) {
const {
query,
includeInitial = false,
onChange,
onState,
ctx = {},
dataTimeoutInterval = 10 * 60e3,
stateTimeoutInterval = 10000,
logPrefix = `[ChangeFeed ${Math.round((Date.now() + Math.random()) * 100) %
1000000000}]`
} = params
const { log = _log } = ctx
let activeCursor = null
let closed = false
log(logPrefix, "starting")
const onStateTimeout = () => {
log(logPrefix, "cursor timeout")
restart()
}
const onDataTimeout = () => {
log(logPrefix, "inactive")
restart()
}
const onError = e => {
log("Error:", logPrefix, e)
restart()
}
// cancel cursor if no state arrives in 3 sec:
const stateTimeoutTimer = setTimeout(onStateTimeout, stateTimeoutInterval)
// cancel cursor if no data arrives in 1min to trigger reconnect
let dataTimeoutTimer = setTimeout(onDataTimeout, dataTimeoutInterval)
const restart = () => {
if (!closed) {
closed = true
clearTimeout(stateTimeoutTimer)
clearTimeout(dataTimeoutTimer)
if (activeCursor) {
activeCursor.close()
}
setTimeout(() => {
log(logPrefix, "restarting")
runChangeFeedForever({ ...params, logPrefix })
}, 3000)
}
}
query
.changes({
includeInitial,
includeStates: true,
includeTypes: false
})
.run((err, cursor) => {
if (err) {
return onError(err)
} else {
activeCursor = cursor
if (closed) {
log(logPrefix, "received cursor, but already closed")
cursor.close()
return
}
log(logPrefix, "cursor received")
cursor.each(async function (err, row) {
clearTimeout(stateTimeoutTimer)
clearTimeout(dataTimeoutTimer)
if (closed) {
return cursor.close()
}
if (err) {
return onError(err)
} else {
// cancel cursor if no data arrives in 1min to trigger reconnect
dataTimeoutTimer = setTimeout(onDataTimeout, dataTimeoutInterval)
try {
if ((row.new_val || row.old_val) && onChange) {
await onChange(row.old_val, row.new_val, ctx)
} else if (row.state) {
log(logPrefix, "state", row.state)
if (onState) {
await onState(row.state, ctx)
}
}
} catch (e) {
log(
"Error:",
logPrefix,
`in handling row: ${JSON.stringify(row)}): ${e}`,
e
)
}
}
})
}
})
}
module.exports = { runChangeFeedForever }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment