Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save nahidakbar/c9a5ff1e743fa048d80e3b61298e23ad to your computer and use it in GitHub Desktop.
Save nahidakbar/c9a5ff1e743fa048d80e3b61298e23ad to your computer and use it in GitHub Desktop.
const EVENTS_AT_A_TIME = 10;
const MAX_CHECK_INTERVAL = 500;
const WAIT_PER_EVENT = MAX_CHECK_INTERVAL / EVENTS_AT_A_TIME;
export function listenToEvents(knex, table, name, callback) {
let lastEventId;
const cursor = table + "::" + name;
async function getCursorPosition(): Promise<number> {
const positions = await knex("EventCursor").where({
cursor
});
if (positions.length) {
return positions[0].position;
} else {
await knex("EventCursor").insert({
cursor,
position: 0
});
}
return 0;
}
async function deliverEventsFrom(eventId): Promise<number> {
const events = await knex(table)
.where("eventId", ">", eventId)
.orderBy("eventId", "asc")
.limit(EVENTS_AT_A_TIME);
let latestEvent = eventId;
if (events.length) {
for (const event of events) {
callback(event);
latestEvent = event.eventId;
}
}
return latestEvent;
}
async function setCursorPosition(position: number) {
await knex("EventCursor")
.update({
position
})
.where({
cursor
});
}
async function step() {
const position = await getCursorPosition();
const newPosition = await deliverEventsFrom(cursor);
if (newPosition > position) {
await setCursorPosition(newPosition);
}
setTimeout(
step,
(EVENTS_AT_A_TIME - (newPosition - position)) * WAIT_PER_EVENT
);
}
setImmediate(step);
}
@nahidakbar
Copy link
Author

exports.up = function(knex, Promise) {
return knex.schema.createTable("EventCursor", function(table) {
table.string("cursor", 50).primary();
table
.bigInteger("position")
.unsigned()
.notNull();
});
};

exports.down = function(knex, Promise) {
return knex.schema.dropTable("EventCursor");
};

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment