Skip to content

Instantly share code, notes, and snippets.

@thakursaurabh1998
Created June 27, 2022 19:40
Show Gist options
  • Save thakursaurabh1998/e3256a01427a54cbc42589fd8e19eca6 to your computer and use it in GitHub Desktop.
Save thakursaurabh1998/e3256a01427a54cbc42589fd8e19eca6 to your computer and use it in GitHub Desktop.
class Poll {
/**
* @typedef options
* @property {number} interval Interval between each poll in `seconds`
* @property {number} maxPollCount Max number of times to poll
*
* @param {options} options
* @param {(Object) => Promise<boolean>} pollingFunction
* @param {QueueClientInterface} queueClient
*/
constructor(
{ interval, maxPollCount },
pollingFunction,
queueClient,
logger = console
) {
this.logger = logger;
this.interval = interval;
this.queueClient = queueClient;
this.maxPollCount = maxPollCount;
this.pollingFunction = pollingFunction;
// The COLLECTOR_QUEUE is the queue to which our consumer subscribes
// to as the DLQ pushes all the expired messages to this queue.
this.mq.listen("COLLECTOR_QUEUE", options);
this.mq.on("dequeue", (data) => {
this.fetchAndValidate(data);
});
}
/**
* @param {(dequeuedData: object) => Promise<boolean>} pollingFunction if this function returns `true`, polling is continued
*/
async fetchAndValidate(dequeuedData) {
// the polling function calls the external service to check the status
const pollContinue = await this.pollingFunction(dequeuedData);
// We are saving the polls left count with the poll data when
// it is being pushed to the queue so we can check if polling
// can be done or not, this count gets altered everytime poll
// happens
const pollsLeft = dequeuedData.pollCount - 1;
if (pollContinue && pollsLeft > 0) {
// Here we push the data back to the queue with the
// updated pollCount
this.queueClient.push(this.interval, {
...dequeuedData,
pollCount: pollsLeft,
});
} else {
// if poll limit is reached or the polling function returns
// false, we stop the polling, this can be logged if you want
// or not
this.logger.warn({
MQ: {
msg: "polling limit reached for the following batch",
details: dequeuedData,
},
});
}
}
/**
* Start polling
* @param {object} data dequeued data
* @param {boolean} prePoll poll before putting to queue
*/
async start(data) {
const pollCount = this.maxPollCount;
this.queueClient.push(this.interval, {
...data,
pollCount,
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment