Skip to content

Instantly share code, notes, and snippets.

@andreaswittig
Last active June 27, 2023 13:53
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save andreaswittig/bea2818d83cc2e681c6edaf9bf178e4d to your computer and use it in GitHub Desktop.
Save andreaswittig/bea2818d83cc2e681c6edaf9bf178e4d to your computer and use it in GitHub Desktop.
---
Resources:
Api:
Type: 'AWS::ApiGatewayV2::Api'
Properties:
Name: !Ref 'AWS::StackName'
ProtocolType: WEBSOCKET
RouteSelectionExpression: '\$default'
DefaultRoute:
Type: 'AWS::ApiGatewayV2::Route'
Properties:
ApiId: !Ref Api
AuthorizationType: NONE
RouteKey: '$default'
Target: !Sub 'integrations/${KinesisIntegration}'
Stage:
Type: 'AWS::ApiGatewayV2::Stage'
Properties:
ApiId: !Ref Api
DeploymentId: !Ref Deployment
StageName: 'v1'
DefaultRouteSettings:
LoggingLevel: INFO
DataTraceEnabled: true
Deployment:
Type: 'AWS::ApiGatewayV2::Deployment'
DependsOn: DefaultRoute
Properties:
ApiId: !Ref Api
KinesisIntegration:
Type: 'AWS::ApiGatewayV2::Integration'
Properties:
ApiId: !Ref Api
CredentialsArn: !GetAtt IntegrationRole.Arn
IntegrationMethod: 'POST'
IntegrationType: 'AWS'
IntegrationUri: !Sub 'arn:aws:apigateway:${AWS::Region}:kinesis:action/PutRecord'
RequestTemplates:
default: !Sub |
#set($payload = $input.json('$'))
#set($data = "{""payload"": $payload, ""connectionId"": ""$context.connectionId""}")
{
"Data": "$util.base64Encode($data)",
"PartitionKey": "$context.connectionId",
"StreamName": "${EventStream}"
}
TemplateSelectionExpression: default
IntegrationRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- 'apigateway.amazonaws.com'
Action:
- 'sts:AssumeRole'
Policies:
- PolicyName: logs
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action: 'kinesis:PutRecord'
Resource: !GetAtt 'EventStream.Arn'
EventStream:
Type: 'AWS::Kinesis::Stream'
Properties:
ShardCount: 1
EventStreamConsumer:
Type: 'AWS::Kinesis::StreamConsumer'
Properties:
ConsumerName: lambda
StreamARN: !GetAtt EventStream.Arn
EventSourceMapping:
Type: 'AWS::Lambda::EventSourceMapping'
Properties:
BatchSize: 16
Enabled: true
EventSourceArn: !Ref EventStreamConsumer
FunctionName: !GetAtt 'StreamFunction.Arn'
StartingPosition: LATEST
StreamFunction:
Type: 'AWS::Lambda::Function'
Properties:
Handler: 'index.handler'
Runtime: 'nodejs8.10'
Layers:
- !Sub 'arn:aws:lambda:${AWS::Region}:853553028582:layer:monitoring-jump-start:1'
MemorySize: 128
Timeout: 30
Role: !GetAtt 'StreamRole.Arn'
Code:
ZipFile: !Sub |
'use strict';
const AWS = require('aws-sdk');
const api = new AWS.ApiGatewayManagementApi({
apiVersion: '2018-11-29',
endpoint: '${Api}.execute-api.${AWS::Region}.amazonaws.com/${Stage}'
});
exports.handler = async (event) => {
console.log(JSON.stringify(event));
for (let r in event.Records) {
const data = JSON.parse(new Buffer(event.Records[r].kinesis.data, 'base64').toString());
try {
await api.postToConnection({
ConnectionId: data.connectionId,
Data: JSON.stringify(data.payload)
}).promise();
} catch (e) {
if (e.statusCode === 410) {
// do nothing, client disconnected
console.log('client disconnected');
} else {
throw e;
}
}
}
return "OK";
};
StreamRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: 'lambda.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: lambda
PolicyDocument:
Statement:
- Effect: Allow
Action: 'execute-api:ManageConnections'
Resource: !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${Api}/${Stage}/POST/@connections/*'
- Effect: Allow
Action:
- 'kinesis:ListShards'
Resource: '*'
- Effect: Allow
Action:
- 'kinesis:SubscribeToShard'
- 'kinesis:DescribeStreamSummary'
- 'kinesis:GetShardIterator'
- 'kinesis:GetRecords'
Resource:
- !GetAtt EventStream.Arn
- Effect: Allow
Action:
- 'kinesis:SubscribeToShard'
Resource:
- !GetAtt EventStreamConsumer.ConsumerARN
- Effect: Allow
Action:
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'
StreamLogGroup:
Type: 'AWS::Logs::LogGroup'
Properties:
LogGroupName: !Sub '/aws/lambda/${StreamFunction}'
RetentionInDays: 14
Outputs:
WebSocketURI:
Value: !Sub 'wss://${Api}.execute-api.${AWS::Region}.amazonaws.com/${Stage}'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment