Last active
August 13, 2021 02:39
-
-
Save jackphilippi/c656023cdb622b5a2aa4758e041d41e8 to your computer and use it in GitHub Desktop.
Message cache for handling queues with sqs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const logger = require('../../logger'); | |
/** | |
* A cache for handling the delay of SQS messages | |
* @param {Object} opts The options object | |
*/ | |
function MessageCache(opts) { | |
// Store the messages that we receive, so that we can | |
// keep track of the visibility timeout for these messages | |
this.messageCache = {}; | |
// Obtain the default timeout options from the passed | |
// opts. If there's none defined, use 30 seconds as | |
// default. | |
this.defaultVisibilityTimeout = opts.defaultVisibilityTimeout || 30; | |
this.maxDelayOnErrorInSeconds = opts.maxDelayOnErrorInSeconds; | |
this.queueUrl = opts.queueUrl; | |
// This is the SQS consumer | |
this.sqs = opts.sqs; | |
/** | |
* Clean up any completed objects in the messageCache | |
*/ | |
this.purgeFinished = () => { | |
const result = {}; | |
let count = 0; | |
Object.keys(this.messageCache).forEach((key) => { | |
// If the message hasn't been processed yet, we keep it | |
// in the cache. Otherwise, we dispose of it. | |
if ( | |
Object.prototype.hasOwnProperty.call(this.messageCache, key) | |
&& this.messageCache[key].Finished !== true | |
) { | |
result[key] = this.messageCache[key]; | |
} else { | |
count += 1; | |
} | |
}); | |
this.messageCache = result; | |
return count; | |
}; | |
/** | |
* Set the 'finish' property on the given message to true, so | |
* that it will be deleted upon the next call to purgeFinished() | |
* @param {Object} message The message to be flagged as finished | |
*/ | |
this.finish = (message) => { | |
const { MessageId } = message; | |
if (typeof this.messageCache[MessageId] === 'undefined') { | |
logger.error(`Message with id ${MessageId} does not exist. Can not set finished.`); | |
} else { | |
this.messageCache[MessageId].Finished = true; | |
} | |
}; | |
/** | |
* If the consumer stops for any reason, we can use this method to | |
* reset the visibility timeouts of the messages we've stored so that | |
* other consumers can receive them. | |
*/ | |
this.resetVisibilityTimeouts = () => { | |
this.messageCache.forEach((message) => { | |
const visibilityChange = { | |
QueueUrl: this.queueUrl, | |
ReceiptHandle: message.ReceiptHandle, | |
VisibilityTimeout: 0, | |
}; | |
this.sqs.changeMessageVisibility(visibilityChange, (err) => { | |
if (err) logger.error('Couldn\'t reset visibility timeout for message...', err, message); | |
}); | |
}); | |
}; | |
/** | |
* Add a message to the cache and append a delay to | |
* it if there's not one already. | |
* @param {Object} message The message to be added to cache | |
*/ | |
this.increaseTimeout = (message) => { | |
const { MessageId, Attributes } = message; | |
const attribReceivedCount = (Attributes && Attributes.ApproximateReceiveCount); | |
const receivedCount = parseInt(attribReceivedCount - 1 || 0, 10); | |
/** | |
* Calculate the next timeout to be applied to the message. | |
* We use a left-shift to double the sum of the previous | |
* visibility timeouts. | |
* e.g. 15, 30, 60, 120, 240, ... | |
*/ | |
const newVisibilityTimeout = Math.min( | |
(this.defaultVisibilityTimeout << receivedCount), // eslint-disable-line no-bitwise | |
this.maxDelayOnErrorInSeconds, | |
); | |
logger.info(`Backing off. Updating visibilityTimeout for message ${MessageId} to ${newVisibilityTimeout}s`); | |
const visibilityChange = { | |
QueueUrl: this.queueUrl, | |
ReceiptHandle: message.ReceiptHandle, | |
VisibilityTimeout: newVisibilityTimeout, | |
}; | |
this.sqs.changeMessageVisibility(visibilityChange, (err) => { | |
if (err) logger.error('Encountered error when updating visibilityTimeout for message.', err, message); | |
}); | |
}; | |
} | |
module.exports = MessageCache; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://stackoverflow.com/questions/32465104/aws-sqs-delay-making-available-a-message-that-failed-to-process