Skip to content

Instantly share code, notes, and snippets.

@jhw
Last active March 24, 2024 11:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhw/f918f6df7f014b2f593843cb89746049 to your computer and use it in GitHub Desktop.
Save jhw/f918f6df7f014b2f593843cb89746049 to your computer and use it in GitHub Desktop.
Lambda + EventBridge worker pattern demo
env
*.pyc
__pycache__
tmp
setenv-priv.sh
AppName=event-worker-demo
#!/usr/bin/env bash
. app.props
aws cloudformation delete-stack --stack-name $AppName
#!/usr/bin/env bash
. app.props
echo "SlackWebhookUrl: $SLACK_WEBHOOK_URL"
aws cloudformation deploy --stack-name $AppName --template-file stack.json --capabilities CAPABILITY_NAMED_IAM --parameter-overrides SlackWebhookUrl=$SLACK_WEBHOOK_URL
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-events --stack-name $AppName --query "StackEvents[].{\"1.Timestamp\":Timestamp,\"2.Id\":LogicalResourceId,\"3.Type\":ResourceType,\"4.Status\":ResourceStatus,\"5.Reason\":ResourceStatusReason}"
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stacks --stack-name $AppName --query 'Stacks[0].Outputs' --output table
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-resources --stack-name $AppName --query "StackResources[].{\"1.Timestamp\":Timestamp,\"2.LogicalId\":LogicalResourceId,\"3.PhysicalId\":PhysicalResourceId,\"4.Type\":ResourceType,\"5.Status\":ResourceStatus}"
#!/usr/bin/env bash
aws cloudformation describe-stacks --query "Stacks[].{\"1.Name\":StackName,\"2.Status\":StackStatus}"
import boto3, json
if __name__=="__main__":
try:
events=boto3.client("events")
struct={"hello": "world"}
entry={"Detail": json.dumps(struct),
"DetailType": "stuff",
"Source": "push_event.py"}
entries=[entry]
print (events.put_events(Entries=entries))
except RuntimeError as error:
print ("Error: %s" % str(error))
awscli
boto3
botocore
pyyaml
#!/usr/bin/env bash
export AWS_DEFAULT_OUTPUT=table
export AWS_PROFILE=#{your-aws-profile-here}
export SLACK_WEBHOOK_URL=#{your-slack-webhook-url-here}
{
"Outputs": {},
"Parameters": {
"SlackWebhookUrl": {
"Type": "String"
}
},
"Resources": {
"MyErrorSubscriptionFilter": {
"DependsOn": [
"MyLogStream",
"SlackErrorPermission"
],
"Properties": {
"DestinationArn": {
"Fn::GetAtt": [
"SlackErrorFunction",
"Arn"
]
},
"FilterPattern": "ERROR",
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${MyFunction}"
}
},
"Type": "AWS::Logs::SubscriptionFilter"
},
"MyEventInvokeConfig": {
"Properties": {
"FunctionName": {
"Ref": "MyFunction"
},
"MaximumRetryAttempts": 0,
"Qualifier": "$LATEST"
},
"Type": "AWS::Lambda::EventInvokeConfig"
},
"MyFunction": {
"Properties": {
"Code": {
"ZipFile": "\nimport logging\n\nlogger=logging.getLogger()\nlogger.setLevel(logging.INFO)\n\ndef handler(event, context=None):\n logger.warning(str(event))\n"
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"MyRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"MyLogGroup": {
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${MyFunction}"
},
"RetentionInDays": 3
},
"Type": "AWS::Logs::LogGroup"
},
"MyLogStream": {
"DependsOn": [
"MyLogGroup"
],
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${MyFunction}"
}
},
"Type": "AWS::Logs::LogStream"
},
"MyPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "MyFunction"
},
"Principal": "events.amazonaws.com",
"SourceArn": {
"Fn::GetAtt": [
"MyWhatevsRule",
"Arn"
]
}
},
"Type": "AWS::Lambda::Permission"
},
"MyPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "my-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "MyRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"MyRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"MyWarningSubscriptionFilter": {
"DependsOn": [
"MyLogStream",
"SlackWarningPermission"
],
"Properties": {
"DestinationArn": {
"Fn::GetAtt": [
"SlackWarningFunction",
"Arn"
]
},
"FilterPattern": "%WARNING|Task timed out%",
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${MyFunction}"
}
},
"Type": "AWS::Logs::SubscriptionFilter"
},
"MyWhatevsRule": {
"Properties": {
"EventPattern": {
"detail": {
"hello": [
"world"
]
}
},
"State": "ENABLED",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"MyFunction",
"Arn"
]
},
"Id": {
"Fn::Sub": "my-whatevs-${AWS::StackName}"
}
}
]
},
"Type": "AWS::Events::Rule"
},
"SlackErrorFunction": {
"Properties": {
"Code": {
"ZipFile": "import base64, gzip, json, os, urllib.request\n\n# https://colorswall.com/palette/3\n\nLevels={\"info\": \"#5bc0de\",\n \"warning\": \"#f0ad4e\",\n \"error\": \"#d9534f\"}\n\ndef post_webhook(struct, url):\n req = urllib.request.Request(url, method = \"POST\")\n req.add_header(\"Content-Type\", \"application/json\")\n data = json.dumps(struct).encode()\n return urllib.request.urlopen(req, data = data).read()\n\ndef handler(event, context = None,\n colour = Levels[os.environ[\"SLACK_LOGGING_LEVEL\"]],\n webhookurl = os.environ[\"SLACK_WEBHOOK_URL\"]):\n struct = json.loads(gzip.decompress(base64.b64decode(event[\"awslogs\"][\"data\"])))\n text = json.dumps(struct)\n struct = {\"attachments\": [{\"text\": text,\n \"color\": colour}]}\n post_webhook(struct, webhookurl)\n"
},
"Environment": {
"Variables": {
"SLACK_LOGGING_LEVEL": "error",
"SLACK_WEBHOOK_URL": {
"Ref": "SlackWebhookUrl"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"SlackErrorRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"SlackErrorPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "SlackErrorFunction"
},
"Principal": "logs.amazonaws.com"
},
"Type": "AWS::Lambda::Permission"
},
"SlackErrorPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "slack-error-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "SlackErrorRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"SlackErrorRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"SlackWarningFunction": {
"Properties": {
"Code": {
"ZipFile": "import base64, gzip, json, os, urllib.request\n\n# https://colorswall.com/palette/3\n\nLevels={\"info\": \"#5bc0de\",\n \"warning\": \"#f0ad4e\",\n \"error\": \"#d9534f\"}\n\ndef post_webhook(struct, url):\n req = urllib.request.Request(url, method = \"POST\")\n req.add_header(\"Content-Type\", \"application/json\")\n data = json.dumps(struct).encode()\n return urllib.request.urlopen(req, data = data).read()\n\ndef handler(event, context = None,\n colour = Levels[os.environ[\"SLACK_LOGGING_LEVEL\"]],\n webhookurl = os.environ[\"SLACK_WEBHOOK_URL\"]):\n struct = json.loads(gzip.decompress(base64.b64decode(event[\"awslogs\"][\"data\"])))\n text = json.dumps(struct)\n struct = {\"attachments\": [{\"text\": text,\n \"color\": colour}]}\n post_webhook(struct, webhookurl)\n"
},
"Environment": {
"Variables": {
"SLACK_LOGGING_LEVEL": "warning",
"SLACK_WEBHOOK_URL": {
"Ref": "SlackWebhookUrl"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"SlackWarningRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"SlackWarningPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "SlackWarningFunction"
},
"Principal": "logs.amazonaws.com"
},
"Type": "AWS::Lambda::Permission"
},
"SlackWarningPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "slack-warning-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "SlackWarningRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"SlackWarningRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
}
}
}

short

pareto2

medium

done

  • subscription filter log group refs are fixed and not declarative!
  • test pushing hello world message
  • script to push eventbridge message
|  2024-03-18T20:30:00.823Z|  LogsWarningPermission |  AWS::Lambda::Permission        |  CREATE_FAILED        |  Resource handler returned message: "The provided principal was invalid. Please check the principal and try again. (Service: Lambda, Status Code: 400, Request ID: f50d83d5-ff16-4391-8e8c-8de7999e3f18)" (RequestToken: c17e58aa-d2cf-12e6-32e4-cb8925592e8b, HandlerErrorCode: InvalidRequest)   |
  • stack.json
  • slack webhook url
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment