Created
July 7, 2021 00:46
-
-
Save filmaj/60a75af4fe7f4a2a10b135c97cfc28a6 to your computer and use it in GitHub Desktop.
arc macro for customized queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const queue = 'MyDopeassQueue'; | |
const dlq = `${queue}DLQ`; | |
module.exports = function queueCustomizer (arc, cloudformation) { | |
cloudformation.Resources[queue] = { | |
Type: 'AWS::SQS::Queue', | |
DependsOn: dlq, | |
Properties: { | |
MessageRetentionPeriod: 1209600, // max (14 days) | |
ReceiveMessageWaitTimeSeconds: 20, // enable long polling when asking for messages | |
RedrivePolicy: { // if some messages can't get processed / error out, direct them to a dead-letter queue | |
deadLetterTargetArn: {"Fn::GetAtt": [dlq, 'Arn']}, | |
maxReceiveCount: 10 // max number of times to try to process a message before 'redriving' them to a DLQ | |
}, | |
VisibilityTimeout: 1800 // double the timeout of the worker (worker-event); we are ok with delays in repeated attempts at processing | |
} | |
} | |
cloudformation.Resources[dlq] = { | |
Type: 'AWS::SQS::Queue', | |
Properties: { | |
MessageRetentionPeriod: 1209600, // max (14 days) | |
ReceiveMessageWaitTimeSeconds: 20 | |
} | |
} | |
cloudformation.Outputs[queue] = { Description: 'This is my sweet custom queue', Value: { Ref: queue } }; | |
cloudformation.Outputs[dlq] = { Description: 'Dead Letter Queue for my sweet custom queue', Value: { Ref: dlq } }; | |
// give queue access to the arc app | |
cloudformation.Resources.Role.Properties.Policies.push({ | |
PolicyName: 'DopeassQueuePolicy', | |
PolicyDocument: { | |
Statement: [ | |
{ | |
Effect: 'Allow', | |
Action: ['sqs:SendMessageBatch', 'sqs:SendMessage', 'sqs:ReceiveMessage', 'sqs:DeleteMessage', 'sqs:GetQueueAttributes'], | |
Resource: '*' | |
} | |
] | |
} | |
}); | |
// service discovery for the queue via SSM; allows us to arc.queues.publish() | |
// TODO: should update variable exporting the new plugin API 'variables' instead https://arc.codes/docs/en/guides/extend/plugins#variables | |
// but im lazy and i havent got to it, and this works for now | |
cloudformation.Resources[`${queue}Param`] = { | |
Type: 'AWS::SSM::Parameter', | |
Properties: { | |
Type: 'String', | |
Name: { 'Fn::Sub': ['/${AWS::StackName}/queues/${queue}', { queue }] }, | |
Value: { Ref: queue } | |
} | |
} | |
return cloudformation; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const arc = require('@architect/functions'); | |
const AWS = require('aws-sdk'); | |
exports.handler = async function scheduled (event) { | |
console.log(JSON.stringify(event, null, 2)); | |
const services = await arc.services(); | |
// first let's check if there are messages to handle | |
const sqs = new AWS.SQS(); | |
let QueueUrl = services.queues.MyDopeassQueue; | |
let queueStats = await sqs.getQueueAttributes({ QueueUrl, AttributeNames: ['All'] }).promise(); | |
let numMsgs = parseInt(queueStats.Attributes.ApproximateNumberOfMessages, 10); | |
if (numMsgs === 0) { | |
console.log('No messages in the queue, aborting.'); | |
return; | |
} | |
console.log(`Approximately ${numMsgs} messages in the queue`); | |
// ... moar logic was here, skipping my application-specific bits | |
console.log(`Retrieving ${maxMessages} messages from queue`); | |
let queueBatch = await sqs.receiveMessage({ QueueUrl, MaxNumberOfMessages: maxMessages }).promise(); | |
if (queueBatch && queueBatch.Messages && queueBatch.Messages.length) { | |
console.log(`${token.owner} publishing ${queueBatch.Messages.length} msgs to queue`); | |
for (let msg of queueBatch.Messages) { | |
let body = JSON.parse(msg.Body); | |
let payload = { | |
users: body, | |
token: token.id, | |
receipt: msg.ReceiptHandle | |
} | |
await arc.events.publish({ name: 'worker', payload }); | |
token.inflight += body.length; | |
} | |
} else { | |
console.log('No messages received, we done for now.'); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment