Skip to content

Instantly share code, notes, and snippets.

@nicolasdao
Last active September 26, 2023 17:17
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save nicolasdao/f7ff9ff0eccec4905df6e664d04d851c to your computer and use it in GitHub Desktop.
Save nicolasdao/f7ff9ff0eccec4905df6e664d04d851c to your computer and use it in GitHub Desktop.
Recipes for AWS Lambda with Serverless Framework. A series of recipes to get shit done using the Serverless Framework. Keywords: serverless recipe code recipes lambda lambdas function

AWS SERVERLESS RECIPES

Table of contents

Lambda

Scheduled Lambda

functions:
  crawl:
    handler: index.handler
    events:
      - schedule: rate(2 hours)
      - schedule: cron(0 12 * * ? *)

As you can see, it supports both a DSL format and a standard CRON format.

WARNINGS:

  • The smallest unit is 1 minute.
  • The rate function only accepts the following units: minute, minutes, hour, hours, day, days (as you can see, you have to use the singular version if the value is equal to 1).

Invoking a Lambda from another Lambda

The following lambda handler is sending a hello world message to the OTHER_FUNC lambda. The value passed to the FunctionName is supposed to be the other function's ARN. What's important in this example is the answer to how we got that ARN in the process.env.OTHER_FUNC environment variable. That answer will be given in the next section about the serverless.yml.

index.js

const AWS = require('aws-sdk')
const lambda = new AWS.Lambda({apiVersion: '2015-03-31'})

exports.handler =  function(event, context, callback) {
  lambda.invoke({ FunctionName:process.env.OTHER_FUNC, body:JSON.stringify({ hello:'world' }) }, (err, data) => {
    return callback(null, 'records processed')
  })
}

serverless.yml

service: lambda_invoking_another_lambda

custom:
  stage: ${opt:stage, 'dev'}

plugins:
  - serverless-iam-roles-per-function

provider:
  name: aws
  runtime: nodejs10.x
  profile: neap
  stage: ${self:custom.stage}
  region: ap-southeast-2

functions:
  mainFunc:
    handler: mainFunc.handler
    events:
      - http:
          path: /lambda_01
          method: ANY
    iamRoleStatementsInherit: true
    iamRoleStatements:
      - Effect: 'Allow'
        Action:
          - lambda:InvokeFunction
        Resource: 
          Fn::GetAtt: 
            - OtherFuncLambdaFunction
            - Arn
    environment:
      OTHER_FUNC:
        Fn::GetAtt: 
          - OtherFuncLambdaFunction
          - Arn
  otherFunc:
    handler: otherFunc.handler

As you can see, there is this hidden convention that the CloudFormation resource name for our functions is:

<function name starting with capital letter>LambdaFunction

In our case, the CloudFormation resource for our otherFunc function is named OtherFuncLambdaFunction.

Websocket

The principle is quite simple. Deploy an API Gateway in Websocket mode and an AWS Lambda that is triggered each time a client connects and disconnects. Each time a connection is made, a new connectionId is issued for that that specific client. The Lambda's responsibility is to store that connectionId somewhere so that other clients can POST messages back to that client using a simple HTTP POST to the API Gateway (done via the nodeJS AWS-SDK). This recipe contains three parts:

  1. API Gateway and Lambda setup
  2. Client's code to connect to the API Gateway Websocket
  3. Server code that can POST messages back to the client using its associated connectionId

1. API Gateway and Lambda setup

serverless.yml:

service: websocket-test

plugins:
  - serverless-iam-roles-per-function

provider:
  name: aws
  runtime: nodejs10.x
  region: ap-southeast-2
  websocketsApiName: websocket-test-api
  websocketsApiRouteSelectionExpression: $request.body.action # custom routes are selecte

functions:
  connectionHandler:
    handler: index.handler
    events:
      - websocket:
          route: $connect
      - websocket:
          route: $disconnect
      - websocket:
          route: onMessage

index.js:

const { app } = require('@neap/funky')

app.all('/', (req,res) => {
  const payload = req.params._awsParams || { message: 'No AWS data' }
  console.log(JSON.stringify(payload, null, ' '))
  return res.status(200).send('done')
})

eval(app.listen({ port:3000, host:'aws' }))

where the payload structure is similar to this:

{
  "headers": {
    "Host": "abcd12345.execute-api.ap-southeast-2.amazonaws.com",
    "x-api-key": "",
    "X-Forwarded-For": "",
    "x-restapi": ""
  },
  "multiValueHeaders": {
    "Host": [
      "abcd12345.execute-api.ap-southeast-2.amazonaws.com"
    ],
    "x-api-key": [
      ""
    ],
    "X-Forwarded-For": [
      ""
    ],
    "x-restapi": [
      ""
    ]
  },
  "requestContext": {
    "routeKey": "$disconnect",
    "messageId": null,
    "eventType": "DISCONNECT",
    "extendedRequestId": "DBI0XHL_ywMFatg=",
    "requestTime": "11/Nov/2019:23:28:02 +0000",
    "messageDirection": "IN",
    "stage": "dev",
    "connectedAt": 1573514872512,
    "requestTimeEpoch": 1573514882208,
    "identity": {
      "cognitoIdentityPoolId": null,
      "cognitoIdentityId": null,
      "principalOrgId": null,
      "cognitoAuthenticationType": null,
      "userArn": null,
      "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/77.0.3865.120 Safari/537.36",
      "accountId": null,
      "caller": null,
      "sourceIp": "49.195.105.136",
      "accessKey": null,
      "cognitoAuthenticationProvider": null,
      "user": null
    },
    "requestId": "DBI0XHL_ywMFatg=",
    "domainName": "abcd12345.execute-api.ap-southeast-2.amazonaws.com",
    "connectionId": "DBIy2dg0SwMCERQ=",
    "apiId": "abcd12345"
  },
  "isBase64Encoded": false
}

Use this payload to determine:

  • The connectionId
  • The type, i.e., $connect vs $disconnect

2. Client's code

<script type="text/javascript">
  let socket = new WebSocket("wss://abcd12345.execute-api.ap-southeast-2.amazonaws.com/dev")

  socket.onopen = function(e) {
    alert("[open] Connection established")
    alert("Sending to server")
    // The following request will fire back 'socket.onmessage' below. The value 
    // of the 'event.data' will be similar to '{"message": "Forbidden", "connectionId":"DBNjmfOmSwMCJiA=", "requestId":"DBNlXFDjSwMF3FA="}'
    socket.send("My name is John") 
  }

  /**
   * Callback function fired when the server sends a message.
   * 
   * @param  {String} event.data  Server's message
   * @return {Void}
   */
  socket.onmessage = function(event) {
    alert(`[message] Data received from server: ${event.data}`)
  }

  socket.onclose = function(event) {
    if (event.wasClean) {
      alert(`[close] Connection closed cleanly, code=${event.code} reason=${event.reason}`)
    } else {
      // e.g. server process killed or network down
      // event.code is usually 1006 in this case
      alert('[close] Connection died')
    }
  }

  socket.onerror = function(error) {
    alert(`[error] ${error.message}`)
  }


</script>

3. Server code that can POST messages back to the client

const { co } = require('core-async')

let api_gateways = {}
const getApiGateway = endpoint => {
  if (!endpoint)
    throw new Error('Missing required argument \'endpoint\'.')

  let api_gw = api_gateways[endpoint]
  if (!api_gw) {
    const AWS = require('aws-sdk')
    // apply the patch
    require('./patch_apigateway.js') 
    api_gw = new AWS.ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint })
    api_gateways[endpoint] = api_gw
  }
  return api_gw
}

/**
 * Posts a message to an AWS API Gateway.
 * 
 * @param {String}  endpoint    API Gateway endpoint (without the protocol).
 * @param {String}  connectionId  Websocket connection ID.
 * @param {Object}  data      Any payload.
 * @yield {Object}          Empty object
 */
const post = ({ endpoint, connectionId, data }) => co(function *(){
  if (!connectionId)
    throw new Error('Missing required argument \'connectionId\'.')

  if (!data)
    return 

  const api_gw = getApiGateway(endpoint)
  const t = typeof(data)
  const payload = t == 'string' || (data instanceof Buffer) 
    ? data 
    : (data instanceof Date)
      ? data.toISOString()
      : t == 'object' 
        ? JSON.stringify(data) : `${data}`

  return yield api_gw.postToConnection({ ConnectionId: connectionId, Data: payload }).promise()
})

Triggered by SQS

serverless.yml

service: sqs-trigger-demo

plugins:
  - serverless-iam-roles-per-function

provider:
  name: aws
  runtime: nodejs10.x
  
functions:
  sendMessageToSQS:
    handler: index.handler
    timeout: 300
    events:
      - http:
          path: /send
          method: POST
    iamRoleStatementsInherit: true
    iamRoleStatements:
      - Effect: 'Allow'
        Action:
          - sqs:SendMessage
        Resource: 
          Fn::GetAtt: 
            - YourQueue
            - Arn
    environment:
      YOUR_QUEUE: !Ref YourQueue

  processSQS:
    handler: index.handler
    events:
      - sqs: 
          arn:
            Fn::GetAtt: 
              - YourQueue
              - Arn
          batchSize: 1

resources:
  Resources:
    YourQueue:
      Type: AWS::SQS::Queue

NOTICE: The optional batchSize property in the SQS event. If you don't specify it, the default is 10 (which is also the max). This means that a single lambda can be triggered by up to 10 messages at the same time.

processSQS/index.js:

exports.handler =  function(event, context, callback) {
  const records = event.Records
  console.log(JSON.stringify(records, null, ' '))
  return callback(null, 'records processed')
}

or, if you're using @neap/funky:

const { app } = require('@neap/funky')

app.all('/', (req,res) => {
    const payload = req.params._awsParams
    console.log(payload)
    return res.status(200).send('done')
})

eval(app.listen({ port:3000, host:'aws' }))

where payload:

{
  Records: [{
    messageId: '20b6436d-d8d2-415d-970e-0068e90f5df6',
    receiptHandle: 'AQEBw2N73wSb+3srF3QvjuTzkd/tQGmRqxJvcCinTadcE....',
    body: '{"id":13482,"device_uuid":"70B3D570500046FE"}',
    attributes: [Object],
    messageAttributes: {},
    md5OfBody: '75b13cf93658767850fe30d5ecb4a25b',
    eventSource: 'aws:sqs',
    eventSourceARN: 'arn:this-is-the-arn-of-the-resource-that-triggered-this-lambda',
    awsRegion: 'ap-southeast-2'
  }]
}

VPC config

The reason and consequences of this setup are not trivial. To know more about it, please refer to this document: https://gist.github.com/nicolasdao/2693912322fab8b4be19cca8920c603e#lambda-connected-to-a-vpc

To connect a Lambda to a VPC, you must provide two additional pieces of configuration:

  • VPC's Subnet IDs the lambda must have access to.
  • At least one VPC security groups (the simplest one gives allows all inboud traffic in).

If any of those two settings is omitted, then the lambda is not associated to that VPC.

service: your-func-name

provider:
  name: aws
  runtime: nodejs10.x
  memorySize: 512
  region: ap-southeast-2

functions:
  firstFunc:
    handler: index.handler
    vpc:
      securityGroupIds:
        - sg-1234
      subnetIds:
        - subnet-1234
        - subnet-4567
    events:
      - http:
          path: /
          method: ANY
      - http:
          path: /{any+}
          method: ANY

Tagging

To tag, you need the serverless-iam-roles-per-function plugin.

npm i serverless-iam-roles-per-function -D
plugins:
  - serverless-plugin-resource-tagging

provider:
  name: XXX
  stackTags:
    Tag1: "Tag1 value"
    Tag2: "Tag2 value"

RDS

Access an RDS DB

The trickiest part is to add a new inbound rule in the DB's security group to allow the Lambda to query the DB.

service: some-poc

custom:
  stage: ${opt:stage, 'dev'}

plugins:
  - serverless-iam-roles-per-function
  - serverless-plugin-resource-tagging

provider:
  name: aws
  stackTags:
    project: some-poc
  runtime: nodejs10.x
  memorySize: 512
  profile: your-profile
  stage: ${self:custom.stage}
  region: ap-southeast-2

functions:
  main:
    handler: index.handler
    timeout: 30
    vpc:
      securityGroupIds:
        - sg-1234-lambda
      subnetIds:
        - subnet-1234-a-private-subnet
        - subnet-1234-a-private-subnet
    events:
      - http:
          path: /
          method: ANY
      - http:
          path: /{any+}
          method: ANY
    iamRoleStatementsInherit: true

resources:
  Resources:
    MSSQLInboundRule:
      Type: AWS::EC2::SecurityGroupIngress
      Properties: 
        Description: Some PoC - Access from lambda to MSSQL
        IpProtocol: tcp
        FromPort: 1433
        ToPort: 1433
        SourceSecurityGroupId: sg-1234-lambda
        GroupId: sg-1234-db

DynamoDB

Lambda triggered by DynamoDB Stream?

index.js

exports.handler =  function(event, context, callback) {
  const records = event.Records
  console.log(JSON.stringify(records, null, ' '))
  return callback(null, 'records processed')
}

Where a Record is similar to this:

{
    "eventID": "771198431cedb052fc164d24a48f62c7",
    "eventName": "INSERT",
    "eventVersion": "1.1",
    "eventSource": "aws:dynamodb",
    "awsRegion": "ap-southeast-2",
    "dynamodb": {
        "ApproximateCreationDateTime": 1567139975,
        "Keys": {
            "device_id": {
                "N": "1"
            },
            "timestamp": {
                "S": "2019-08-30T04:39:35.405Z"
            }
        },
        "NewImage": {
            "device_id": {
                "N": "1"
            },
            "value": {
                "S": "2"
            },
            "timestamp": {
                "S": "2019-08-30T04:39:35.405Z"
            }
        },
        "SequenceNumber": "548300000000007249139847",
        "SizeBytes": 94,
        "StreamViewType": "NEW_IMAGE"
    },
    "eventSourceARN": "arn:aws:dynamodb:ap-southeast-2:123456221:table/number_dev/stream/2019-08-30T04:35:12.392"
}

serverless.yml

service: lambda_with_dynamodb_stream

custom:
  stage: ${opt:stage, 'dev'}
  table: 
    number: number_${self:custom.stage}

provider:
  name: aws
  runtime: nodejs10.x
  profile: neap
  stage: ${self:custom.stage}
  region: ap-southeast-2
  iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - dynamodb:DescribeStream
        - dynamodb:GetRecords
        - dynamodb:GetShardIterator
        - dynamodb:ListStreams
      Resource:  
        Fn::GetAtt: 
          - NumberTable
          - Arn

functions:
  stream:
    handler: index.handler
    events:
      - stream:
          type: dynamodb
          arn: 
            Fn::GetAtt: 
              - NumberTable
              - StreamArn

resources:
  Resources:
    NumberTable:
      Type: AWS::DynamoDB::Table
      Properties:
        TableName: ${self:custom.table.number}
        AttributeDefinitions:
          - AttributeName: device_id
            AttributeType: N
          - AttributeName: timestamp
            AttributeType: S
        KeySchema:
          - AttributeName: device_id
            KeyType: HASH
          - AttributeName: timestamp
            KeyType: RANGE
        ProvisionedThroughput:
          ReadCapacityUnits: 1
          WriteCapacityUnits: 1
        StreamSpecification: 
          StreamViewType: NEW_IMAGE

Where:

  • ProvisionedThroughput could have been replaced with BillingMode: PAY_PER_REQUEST if you do not want to deal with provsioning DynamoDB.
  • StreamSpecification is what toggles the DynamoDB stream and where StreamViewType can take 4 different values:
    1. KEYS_ONLY - Only the key attributes of the modified item are written to the stream.
    2. NEW_IMAGE - The entire item, as it appears after it was modified, is written to the stream.
    3. OLD_IMAGE - The entire item, as it appeared before it was modified, is written to the stream.
    4. NEW_AND_OLD_IMAGES - Both the new and the old item images of the item are written to the stream.

SQS

Polling an SQS queue from a Lambda

Provision the SQS queue as follow in the serverless.yml:

service: simpleingest

custom:
  stage: ${opt:stage, 'dev'}

plugins:
  - serverless-iam-roles-per-function

provider:
  name: aws
  runtime: nodejs10.x
  profile: neap
  stage: ${self:custom.stage}
  region: ap-southeast-2

functions:
  dequeue:
    handler: lambdas/dequeue.handler
    events:
      - schedule: rate(1 minute)
    iamRoleStatements:
      - Effect: 'Allow'
        Action:
          - sqs:ReceiveMessage
          - sqs:DeleteMessage
        Resource: 
          Fn::GetAtt: 
            - YourQueue
            - Arn
    environment:
      YOUR_QUEUE: !Ref YourQueue

resources:
  Resources:
    YourQueue:
      Type: AWS::SQS::Queue

As you can see, the environment variable YOUR_QUEUE is set to !Ref YourQueue, where ref returns the queue's url. We'll need this to use AWS SDK.

const AWS = require('aws-sdk')
const sqs = new AWS.SQS({apiVersion: '2012-11-05'})

sqs.sendMessage({ QueueUrl:process.env.YOUR_QUEUE, MessageBody:JSON.stringify({ hello: 'world' }) }, (err,data) => {
  if (err)
    console.log('Error - ${err.stack}')
  else
    console.log(data)
})

// MaxNumberOfMessages: Max is 10. 10 does not mean you'll get 10, it means you'll get at most 10.
sqs.receiveMessage({ QueueUrl:process.env.YOUR_QUEUE, MaxNumberOfMessages:10 }, (err,data) => {
  if (err)
    console.log('Error - ${err.stack}')
  else {
    console.log(data.Messages[0].Body) // Body is a string containing your message.
    console.log(data.Messages[0].ReceiptHandle) // used for deletion.
  }
})

sqs.deleteMessage({ QueueUrl:process.env.YOUR_QUEUE, ReceiptHandle:10 }, (err,data) => {
  if (err)
    console.log('Error - ${err.stack}')
  else {
    console.log(data.Messages[0].Body) // Body is a string containing your message.
    console.log(data.Messages[0].ReceiptHandle) // used for deletion.
  }
})

Back-pressure mess with SQS

This section was written based on this article: Lambda Concurrency Limits and SQS Triggers Don’t Mix Well (Sometimes)

  • Set the queue’s visibility timeout to at least 6 times the timeout that you configure on your function. The extra time allows for Lambda to retry if your function execution is throttled while your function is processing a previous batch.
  • Set the maxReceiveCount on the queue’s redrive policy to at least 5. This will help avoid sending messages to the dead-letter queue due to throttling.
  • Configure the dead-letter to retain failed messages long enough that you can move them back later to be reprocessed

If the Lambda's timeout is 30 seconds (which is the max for a Lambda configured to respond to API Gateway requests), then the VisibilityTimeout should be 600.

serverless.yml

service: sqs-trigger-demo

plugins:
  - serverless-iam-roles-per-function

provider:
  name: aws
  runtime: nodejs10.x
  
functions:
  sendMessageToSQS:
    handler: index.handler
    timeout: 300
    events:
      - http:
          path: /send
          method: POST
    iamRoleStatementsInherit: true
    iamRoleStatements:
      - Effect: 'Allow'
        Action:
          - sqs:SendMessage
        Resource: 
          Fn::GetAtt: 
            - YourQueue
            - Arn
    environment:
      YOUR_QUEUE: !Ref YourQueue

  processSQS:
    handler: index.handler
    events:
      - sqs: 
          arn:
            Fn::GetAtt: 
              - YourQueue
              - Arn
          batchSize: 1

resources:
  Resources:
    YourQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: your-main-queue-name
        VisibilityTimeout: 600
        RedrivePolicy:
          deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
          maxReceiveCount: 10

    DeadLetterQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: my-dead-letter-queue-name

SNS

Provisioning an SNS topic

Under the resources.Resources property of your serverless.yml, ad the following:

    YourTopic:
      Type: AWS::SNS::Topic
      Properties:
        TopicName: ${self:custom.your-topic-name}

The TopicName is optional.

WARNING: To configure accesses to an SNS topic, your need to use its ARN. Usually, this is achieved with the Fn::GetAtt intrinsic function, but for an SNS, you have to use !Ref:

iamRoleStatements:
    - Effect: 'Allow'
      Action:
        - sns:Publish
      Resource: !Ref YourTopic

Lambda triggered by a topic

exports.handler =  function(event, context, callback) {
  const records = event.Records
  console.log(JSON.stringify(records, null, ' '))
  return callback(null, 'records processed')
}

where Records is an array with items similar to the following:

{
    "EventSource": "aws:sns",
    "EventVersion": "1.0",
    "EventSubscriptionArn": "arn:aws:sns:ap-southeast-2:56757897",
    "Sns": {
  "Type": "Notification",
  "MessageId": "2f38-10a-1ba-af0-79313b1c",
  "TopicArn": "arn:aws:sns:ap-southeast-2",
  "Subject": "Hello Candy",
  "Message": "{\n  Hello: 'Candy'\n}",
  "Timestamp": "2018-10-15T04:18:19.761Z",
  "SignatureVersion": "1",
  "Signature": "jC6KCrv6U8nU2kd01PWGGBUcur9z/NWPJ9ND7EH83uI/9Oi5RyQoZS5DA2auYOYnNMtrDFB1cfP+pjnbS==",
  "SigningCertUrl": "https://sns.ap-southeast-2.amazonaws.com/SimpleNotificationService-6aad65c2f99.pem",
  "UnsubscribeUrl": "https://sns.ap-southeast-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:",
  "MessageAttributes": {
      "hello": {
    "Type": "String",
    "Value": "world"
      }
  }
    }
}
require('aws-sdk/lib/node_loader')
var AWS = require('aws-sdk/lib/core')
var Service = AWS.Service
var apiLoader = AWS.apiLoader
apiLoader.services['apigatewaymanagementapi'] = {}
AWS.ApiGatewayManagementApi = Service.defineService('apigatewaymanagementapi', ['2018-11-29'])
Object.defineProperty(apiLoader.services['apigatewaymanagementapi'], '2018-11-29', {
get: function get() {
var model = {
'metadata': {
'apiVersion': '2018-11-29',
'endpointPrefix': 'execute-api',
'signingName': 'execute-api',
'serviceFullName': 'AmazonApiGatewayManagementApi',
'serviceId': 'ApiGatewayManagementApi',
'protocol': 'rest-json',
'jsonVersion': '1.1',
'uid': 'apigatewaymanagementapi-2018-11-29',
'signatureVersion': 'v4'
},
'operations': {
'PostToConnection': {
'http': {
'requestUri': '/@connections/{connectionId}',
'responseCode': 200
},
'input': {
'type': 'structure',
'members': {
'Data': {
'type': 'blob'
},
'ConnectionId': {
'location': 'uri',
'locationName': 'connectionId'
}
},
'required': [
'ConnectionId',
'Data'
],
'payload': 'Data'
}
}
},
'shapes': {}
}
model.paginators = {
'pagination': {}
}
return model
},
enumerable: true,
configurable: true
})
module.exports = AWS.ApiGatewayManagementApi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment