Skip to content

Instantly share code, notes, and snippets.

@jackphilippi
Last active August 13, 2021 02:39
Show Gist options
  • Save jackphilippi/c656023cdb622b5a2aa4758e041d41e8 to your computer and use it in GitHub Desktop.
Save jackphilippi/c656023cdb622b5a2aa4758e041d41e8 to your computer and use it in GitHub Desktop.
Message cache for handling queues with sqs
const logger = require('../../logger');
/**
* A cache for handling the delay of SQS messages
* @param {Object} opts The options object
*/
function MessageCache(opts) {
// Store the messages that we receive, so that we can
// keep track of the visibility timeout for these messages
this.messageCache = {};
// Obtain the default timeout options from the passed
// opts. If there's none defined, use 30 seconds as
// default.
this.defaultVisibilityTimeout = opts.defaultVisibilityTimeout || 30;
this.maxDelayOnErrorInSeconds = opts.maxDelayOnErrorInSeconds;
this.queueUrl = opts.queueUrl;
// This is the SQS consumer
this.sqs = opts.sqs;
/**
* Clean up any completed objects in the messageCache
*/
this.purgeFinished = () => {
const result = {};
let count = 0;
Object.keys(this.messageCache).forEach((key) => {
// If the message hasn't been processed yet, we keep it
// in the cache. Otherwise, we dispose of it.
if (
Object.prototype.hasOwnProperty.call(this.messageCache, key)
&& this.messageCache[key].Finished !== true
) {
result[key] = this.messageCache[key];
} else {
count += 1;
}
});
this.messageCache = result;
return count;
};
/**
* Set the 'finish' property on the given message to true, so
* that it will be deleted upon the next call to purgeFinished()
* @param {Object} message The message to be flagged as finished
*/
this.finish = (message) => {
const { MessageId } = message;
if (typeof this.messageCache[MessageId] === 'undefined') {
logger.error(`Message with id ${MessageId} does not exist. Can not set finished.`);
} else {
this.messageCache[MessageId].Finished = true;
}
};
/**
* If the consumer stops for any reason, we can use this method to
* reset the visibility timeouts of the messages we've stored so that
* other consumers can receive them.
*/
this.resetVisibilityTimeouts = () => {
this.messageCache.forEach((message) => {
const visibilityChange = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: 0,
};
this.sqs.changeMessageVisibility(visibilityChange, (err) => {
if (err) logger.error('Couldn\'t reset visibility timeout for message...', err, message);
});
});
};
/**
* Add a message to the cache and append a delay to
* it if there's not one already.
* @param {Object} message The message to be added to cache
*/
this.increaseTimeout = (message) => {
const { MessageId, Attributes } = message;
const attribReceivedCount = (Attributes && Attributes.ApproximateReceiveCount);
const receivedCount = parseInt(attribReceivedCount - 1 || 0, 10);
/**
* Calculate the next timeout to be applied to the message.
* We use a left-shift to double the sum of the previous
* visibility timeouts.
* e.g. 15, 30, 60, 120, 240, ...
*/
const newVisibilityTimeout = Math.min(
(this.defaultVisibilityTimeout << receivedCount), // eslint-disable-line no-bitwise
this.maxDelayOnErrorInSeconds,
);
logger.info(`Backing off. Updating visibilityTimeout for message ${MessageId} to ${newVisibilityTimeout}s`);
const visibilityChange = {
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: newVisibilityTimeout,
};
this.sqs.changeMessageVisibility(visibilityChange, (err) => {
if (err) logger.error('Encountered error when updating visibilityTimeout for message.', err, message);
});
};
}
module.exports = MessageCache;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment