Skip to content

Instantly share code, notes, and snippets.

@filmaj
Created July 7, 2021 00:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save filmaj/60a75af4fe7f4a2a10b135c97cfc28a6 to your computer and use it in GitHub Desktop.
Save filmaj/60a75af4fe7f4a2a10b135c97cfc28a6 to your computer and use it in GitHub Desktop.
arc macro for customized queue
const queue = 'MyDopeassQueue';
const dlq = `${queue}DLQ`;
module.exports = function queueCustomizer (arc, cloudformation) {
cloudformation.Resources[queue] = {
Type: 'AWS::SQS::Queue',
DependsOn: dlq,
Properties: {
MessageRetentionPeriod: 1209600, // max (14 days)
ReceiveMessageWaitTimeSeconds: 20, // enable long polling when asking for messages
RedrivePolicy: { // if some messages can't get processed / error out, direct them to a dead-letter queue
deadLetterTargetArn: {"Fn::GetAtt": [dlq, 'Arn']},
maxReceiveCount: 10 // max number of times to try to process a message before 'redriving' them to a DLQ
},
VisibilityTimeout: 1800 // double the timeout of the worker (worker-event); we are ok with delays in repeated attempts at processing
}
}
cloudformation.Resources[dlq] = {
Type: 'AWS::SQS::Queue',
Properties: {
MessageRetentionPeriod: 1209600, // max (14 days)
ReceiveMessageWaitTimeSeconds: 20
}
}
cloudformation.Outputs[queue] = { Description: 'This is my sweet custom queue', Value: { Ref: queue } };
cloudformation.Outputs[dlq] = { Description: 'Dead Letter Queue for my sweet custom queue', Value: { Ref: dlq } };
// give queue access to the arc app
cloudformation.Resources.Role.Properties.Policies.push({
PolicyName: 'DopeassQueuePolicy',
PolicyDocument: {
Statement: [
{
Effect: 'Allow',
Action: ['sqs:SendMessageBatch', 'sqs:SendMessage', 'sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes'],
Resource: '*'
}
]
}
});
// service discovery for the queue via SSM; allows us to arc.queues.publish()
// TODO: should update variable exporting the new plugin API 'variables' instead https://arc.codes/docs/en/guides/extend/plugins#variables
// but im lazy and i havent got to it, and this works for now
cloudformation.Resources[`${queue}Param`] = {
Type: 'AWS::SSM::Parameter',
Properties: {
Type: 'String',
Name: { 'Fn::Sub': ['/${AWS::StackName}/queues/${queue}', { queue }] },
Value: { Ref: queue }
}
}
return cloudformation;
}
const arc = require('@architect/functions');
const AWS = require('aws-sdk');
exports.handler = async function scheduled (event) {
console.log(JSON.stringify(event, null, 2));
const services = await arc.services();
// first let's check if there are messages to handle
const sqs = new AWS.SQS();
let QueueUrl = services.queues.MyDopeassQueue;
let queueStats = await sqs.getQueueAttributes({ QueueUrl, AttributeNames: ['All'] }).promise();
let numMsgs = parseInt(queueStats.Attributes.ApproximateNumberOfMessages, 10);
if (numMsgs === 0) {
console.log('No messages in the queue, aborting.');
return;
}
console.log(`Approximately ${numMsgs} messages in the queue`);
// ... moar logic was here, skipping my application-specific bits
console.log(`Retrieving ${maxMessages} messages from queue`);
let queueBatch = await sqs.receiveMessage({ QueueUrl, MaxNumberOfMessages: maxMessages }).promise();
if (queueBatch && queueBatch.Messages && queueBatch.Messages.length) {
console.log(`${token.owner} publishing ${queueBatch.Messages.length} msgs to queue`);
for (let msg of queueBatch.Messages) {
let body = JSON.parse(msg.Body);
let payload = {
users: body,
token: token.id,
receipt: msg.ReceiptHandle
}
await arc.events.publish({ name: 'worker', payload });
token.inflight += body.length;
}
} else {
console.log('No messages received, we done for now.');
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment