Skip to content

Instantly share code, notes, and snippets.

@wakeupmh
Last active July 1, 2024 01:25
Show Gist options
  • Save wakeupmh/bebf8fa40ccb5e1010f2edc234e74583 to your computer and use it in GitHub Desktop.
Save wakeupmh/bebf8fa40ccb5e1010f2edc234e74583 to your computer and use it in GitHub Desktop.
creating a sqs fifo with a high throughput
import { Context, SQSEvent } from "aws-lambda";
import 'reflect-metadata';
export async function handler(event: SQSEvent, _?: Context): Promise<void> {
try {
for (const record of event.Records) {
const params = JSON.parse(record.body ?? "{}");
console.log(params);
}
} catch (e: any) {
console.error("error at Get Queue Message", {
error: e,
stackTrace: e.stack,
});
}
}
import { Stack, StackProps, Duration, Fn } from "aws-cdk-lib"
import { Construct } from "constructs"
import * as iam from 'aws-cdk-lib/aws-iam'
import * as lambda from 'aws-cdk-lib/aws-lambda'
import path from "path"
import { Queue } from "aws-cdk-lib/aws-sqs"
export class CdkStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props)
const fifoQueue = Queue.fromQueueArn(
this,
'FifoQueue',
Fn.importValue('infrastructure::queue::example::arn'),
)
const lambdaRole = new iam.Role(this, 'QueueConsumerFunctionRole', {
assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaSQSQueueExecutionRole'),
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSLambdaBasicExecutionRole'),
],
})
const lambdaFunction = new lambda.Function(this, 'QueueConsumerFunction', {
code: lambda.Code.fromAsset(path.join(__dirname, '../src')),
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'app.handler',
timeout: Duration.seconds(30),
memorySize: 128,
architecture: lambda.Architecture.ARM_64,
role: lambdaRole,
})
new lambda.EventSourceMapping(this, 'QueueConsumerFunctionMySQSEvent', {
target: lambdaFunction,
batchSize: 10,
eventSourceArn: fifoQueue.queueArn,
})
}
}
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
const client = new SQSClient({});
const SQS_QUEUE_URL = "queue_url";
export const main = async (
sqsQueueUrl: string = SQS_QUEUE_URL
): Promise<void> => {
const command = new SendMessageCommand({
QueueUrl: sqsQueueUrl,
MessageAttributes: {
Subject: {
DataType: "String",
StringValue: "Greetings",
},
},
MessageDeduplicationId: "whatsapp#user_xpto",
MessageGroupId: "whatsapp",
MessageBody:
"Hey there!",
});
await client.send(command);
};
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
const client = new SQSClient({});
const SQS_QUEUE_URL = "queue_url";
export const main = async (
sqsQueueUrl: string = SQS_QUEUE_URL
): Promise<void> => {
const command = new SendMessageCommand({
QueueUrl: sqsQueueUrl,
MessageAttributes: {
Title: {
DataType: "String",
StringValue: "Blood Test",
},
Type: {
DataType: "String",
StringValue: "Checkup",
},
},
MessageDeduplicationId: "1234567890",
MessageGroupId: "blood-test-results",
MessageBody:
"Your blood test results are in. Please schedule an appointment with your doctor to discuss the results.",
});
await client.send(command);
};
import { CfnOutput, Stack, StackProps } from "aws-cdk-lib"
import { DeduplicationScope, FifoThroughputLimit, Queue } from "aws-cdk-lib/aws-sqs"
import { Construct } from "constructs"
export class CdkStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const name = 'example'
const queue = new Queue(this, name, {
queueName: `${name}.fifo`,
fifo: true,
deduplicationScope: DeduplicationScope.MESSAGE_GROUP,
fifoThroughputLimit: FifoThroughputLimit.PER_MESSAGE_GROUP_ID,
contentBasedDeduplication: false,
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment