Skip to content

Instantly share code, notes, and snippets.

@hoangsetup
Created April 15, 2019 17:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hoangsetup/51694cc7672308aa53a08ff475409c2a to your computer and use it in GitHub Desktop.
Save hoangsetup/51694cc7672308aa53a08ff475409c2a to your computer and use it in GitHub Desktop.
Integrate SQS and Lambda
// Consumer
import {
SQSEvent,
SQSHandler, SQSRecord,
} from "aws-lambda";
import * as Lambda from "aws-sdk/clients/lambda";
import { InvocationRequest, InvocationResponse } from "aws-sdk/clients/lambda";
async function invokeWorkerLambda(record: SQSRecord): Promise<InvocationResponse> {
const lambda = new Lambda({region: process.env.AWS_REGION});
const params: InvocationRequest = {
FunctionName: process.env.WORKER_LAMBDA_NAME,
InvocationType: "Event",
Payload: JSON.stringify(record),
};
return lambda.invoke(params).promise();
}
export const handler: SQSHandler = async (event: SQSEvent): Promise<void> => {
try {
if (event.Records && event.Records.length > 0) {
const invocations: Array<Promise<InvocationResponse>> = [];
for (const record of event.Records) {
invocations.push(invokeWorkerLambda(record));
}
await Promise.all(invocations);
}
} catch (err) {
console.error(err, err.stack);
}
};
// Worker
import {
SQSRecord,
} from "aws-lambda";
import * as SQS from "aws-sdk/clients/sqs";
async function deleteMessage(receiptHandle: string): Promise<{}> {
const sqs = new SQS({region: process.env.AWS_REGION});
return await sqs.deleteMessage({
ReceiptHandle: receiptHandle,
QueueUrl: process.env.TASK_QUEUE_URL,
}).promise();
}
async function work(task: SQSRecord) {
console.log(task);
// TODO implement
return Promise.resolve();
}
export const handler = async (event: SQSRecord) => {
try {
await work(event);
await deleteMessage(event.receiptHandle);
} catch (err) {
console.error(err, err.stack);
}
};
MyTaskQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 90
RedrivePolicy:
deadLetterTargetArn: !Sub ${DeadLetterQueue.Arn}
maxReceiveCount: 10
DeadLetterQueue:
Type: AWS::SQS::Queue
ConsumerLambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: '/'
Policies:
- PolicyName: logs
PolicyDocument:
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- PolicyName: lambda
PolicyDocument:
Statement:
- Effect: Allow
Action:
- lambda:InvokeFunction
Resource: !Sub ${WorkerLambda.Arn}
ConsumerLambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./consumer
Handler: index.handler
MemorySize: 128
Role: !Sub ${ConsumerLambdaRole.Arn}
Runtime: nodejs8.10
Timeout: 90
Environment:
Variables:
TASK_QUEUE_URL: !Ref MyTaskQueue
WORKER_LAMBDA_NAME: !Ref WorkerLambda
LambdaFunctionEventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 10
Enabled: true
EventSourceArn: !GetAtt MyTaskQueue.Arn
FunctionName: !GetAtt ConsumerLambda.Arn
WorkerLambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: '/'
Policies:
- PolicyName: logs
PolicyDocument:
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: arn:aws:logs:*:*:*
- PolicyName: sqs
PolicyDocument:
Statement:
- Effect: Allow
Action:
- sqs:DeleteMessage
Resource: !Sub ${TaskQueue.Arn}
WorkerLambda:
Type: AWS::Serverless::Function
Properties:
CodeUri: ./worker
Handler: index.handler
MemorySize: 128
Role: !Sub ${WorkerLambdaRole.Arn}
Runtime: nodejs8.10
Timeout: 90
Environment:
Variables:
TASK_QUEUE_URL: !Ref TaskQueue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment