Skip to content

Instantly share code, notes, and snippets.

@jhw
Last active March 23, 2024 20:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhw/ab1bd1c60243e343de31cb696bdc43c4 to your computer and use it in GitHub Desktop.
Save jhw/ab1bd1c60243e343de31cb696bdc43c4 to your computer and use it in GitHub Desktop.
SQS -> Lambda -> EventBridge -> Worker -> Slack
env
*.pyc
__pycache__
tmp
setenv-priv.sh
AppName=task-queue-demo
#!/usr/bin/env bash
. app.props
aws cloudformation delete-stack --stack-name $AppName
#!/usr/bin/env bash
. app.props
echo "SlackWebhookUrl: $SLACK_WEBHOOK_URL"
aws cloudformation deploy --stack-name $AppName --template-file stack.json --capabilities CAPABILITY_NAMED_IAM --parameter-overrides SlackWebhookUrl=$SLACK_WEBHOOK_URL
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-events --stack-name $AppName --query "StackEvents[].{\"1.Timestamp\":Timestamp,\"2.Id\":LogicalResourceId,\"3.Type\":ResourceType,\"4.Status\":ResourceStatus,\"5.Reason\":ResourceStatusReason}"
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stacks --stack-name $AppName --query 'Stacks[0].Outputs' --output table
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-resources --stack-name $AppName --query "StackResources[].{\"1.Timestamp\":Timestamp,\"2.LogicalId\":LogicalResourceId,\"3.PhysicalId\":PhysicalResourceId,\"4.Type\":ResourceType,\"5.Status\":ResourceStatus}"
#!/usr/bin/env bash
aws cloudformation describe-stacks --query "Stacks[].{\"1.Name\":StackName,\"2.Status\":StackStatus}"
from botocore.exceptions import ClientError
import boto3, json, os, re, sys
def hungarorise(text):
return "".join([tok.capitalize()
for tok in re.split("\\-|\\_", text)])
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
try:
if len(sys.argv) < 2:
raise RuntimeError("please enter message")
message=sys.argv[1]
props=dict([tuple(row.split("="))
for row in open("app.props").read().split("\n")
if row!=''])
stackname=props["AppName"]
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, stackname)
queuekey=hungarorise("hello-queue")
if queuekey not in outputs:
raise RuntimeError("queue not found")
queueurl=outputs[queuekey]
body=json.dumps({"message": message})
sqs=boto3.client("sqs")
print (sqs.send_message(QueueUrl=queueurl,
MessageBody=body))
except RuntimeError as error:
print ("Error: %s" % str(error))
except ClientError as error:
print ("Error: %s" % str(error))
awscli
boto3
botocore
pyyaml
#!/usr/bin/env bash
export AWS_DEFAULT_OUTPUT=table
export AWS_PROFILE=#{your-aws-profile-here}
export AWS_REGION=#{your-aws-region-here}
export SLACK_WEBHOOK_URL=#{your-slack-webhook-url-here}
{
"Outputs": {
"HelloQueue": {
"Value": {
"Ref": "HelloQueue"
}
}
},
"Parameters": {
"SlackWebhookUrl": {
"Type": "String"
}
},
"Resources": {
"DemoErrorSubscriptionFilter": {
"DependsOn": [
"DemoLogStream",
"LogsErrorPermission"
],
"Properties": {
"DestinationArn": {
"Fn::GetAtt": [
"LogsErrorFunction",
"Arn"
]
},
"FilterPattern": "ERROR",
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
}
},
"Type": "AWS::Logs::SubscriptionFilter"
},
"DemoEventInvokeConfig": {
"Properties": {
"FunctionName": {
"Ref": "DemoFunction"
},
"MaximumRetryAttempts": 0,
"Qualifier": "$LATEST"
},
"Type": "AWS::Lambda::EventInvokeConfig"
},
"DemoFunction": {
"Properties": {
"Code": {
"ZipFile": "\nimport logging\n\nlogger=logging.getLogger()\nlogger.setLevel(logging.INFO)\n\ndef handler(event, context=None):\n logger.warning(str(event))\n"
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"DemoRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"DemoLogGroup": {
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
},
"RetentionInDays": 3
},
"Type": "AWS::Logs::LogGroup"
},
"DemoLogStream": {
"DependsOn": [
"DemoLogGroup"
],
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
}
},
"Type": "AWS::Logs::LogStream"
},
"DemoPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "DemoFunction"
},
"Principal": "events.amazonaws.com",
"SourceArn": {
"Fn::GetAtt": [
"DemoWhatevsRule",
"Arn"
]
}
},
"Type": "AWS::Lambda::Permission"
},
"DemoPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "demo-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "DemoRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"DemoRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"DemoWarningSubscriptionFilter": {
"DependsOn": [
"DemoLogStream",
"LogsWarningPermission"
],
"Properties": {
"DestinationArn": {
"Fn::GetAtt": [
"LogsWarningFunction",
"Arn"
]
},
"FilterPattern": "%WARNING|Task timed out%",
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
}
},
"Type": "AWS::Logs::SubscriptionFilter"
},
"DemoWhatevsRule": {
"Properties": {
"EventPattern": {
"source": [
{
"Ref": "HelloQueue"
}
]
},
"State": "ENABLED",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"DemoFunction",
"Arn"
]
},
"Id": {
"Fn::Sub": "demo-whatevs-${AWS::StackName}"
}
}
]
},
"Type": "AWS::Events::Rule"
},
"HelloQueue": {
"Properties": {},
"Type": "AWS::SQS::Queue"
},
"HelloTaskQueueEventSourceMapping": {
"Properties": {
"EventSourceArn": {
"Fn::GetAtt": [
"HelloQueue",
"Arn"
]
},
"FunctionName": {
"Ref": "HelloTaskQueueFunction"
}
},
"Type": "AWS::Lambda::EventSourceMapping"
},
"HelloTaskQueueFunction": {
"Properties": {
"Code": {
"ZipFile": "import boto3, json, math, os\n\n\"\"\"\n- EventBridge max batch size is 10\n\"\"\"\n\ndef handler(event, context, batchsize = 10):\n source = os.environ[\"QUEUE_URL\"]\n entries = [{\"Detail\": json.dumps(record),\n \"DetailType\": \"record\",\n \"Source\": source}\n for record in event[\"Records\"]]\n if entries != []:\n events = boto3.client(\"events\")\n nbatches = math.ceil(len(entries)/batchsize)\n for i in range(nbatches):\n batch = entries[i*batchsize: (i+1)*batchsize]\n events.put_events(Entries = batch)\n\n"
},
"Environment": {
"Variables": {
"QUEUE_URL": {
"Ref": "HelloQueue"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"HelloTaskQueueRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"HelloTaskQueuePolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"HelloQueue",
"Arn"
]
}
},
{
"Action": [
"events:PutEvents"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "hello-task-queue-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "HelloTaskQueueRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"HelloTaskQueueRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"LogsErrorFunction": {
"Properties": {
"Code": {
"ZipFile": "import base64, gzip, json, os, urllib.request\n\n# https://colorswall.com/palette/3\n\nLevels={\"info\": \"#5bc0de\",\n \"warning\": \"#f0ad4e\",\n \"error\": \"#d9534f\"}\n\ndef post_webhook(struct, url):\n req = urllib.request.Request(url, method = \"POST\")\n req.add_header(\"Content-Type\", \"application/json\")\n data = json.dumps(struct).encode()\n return urllib.request.urlopen(req, data = data).read()\n\ndef handler(event, context = None,\n colour = Levels[os.environ[\"SLACK_LOGGING_LEVEL\"]],\n webhookurl = os.environ[\"SLACK_WEBHOOK_URL\"]):\n struct = json.loads(gzip.decompress(base64.b64decode(event[\"awslogs\"][\"data\"])))\n text = json.dumps(struct)\n struct = {\"attachments\": [{\"text\": text,\n \"color\": colour}]}\n post_webhook(struct, webhookurl)\n"
},
"Environment": {
"Variables": {
"SLACK_LOGGING_LEVEL": "error",
"SLACK_WEBHOOK_URL": {
"Ref": "SlackWebhookUrl"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"LogsErrorRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"LogsErrorPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "LogsErrorFunction"
},
"Principal": "logs.amazonaws.com"
},
"Type": "AWS::Lambda::Permission"
},
"LogsErrorPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "logs-error-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "LogsErrorRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"LogsErrorRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"LogsWarningFunction": {
"Properties": {
"Code": {
"ZipFile": "import base64, gzip, json, os, urllib.request\n\n# https://colorswall.com/palette/3\n\nLevels={\"info\": \"#5bc0de\",\n \"warning\": \"#f0ad4e\",\n \"error\": \"#d9534f\"}\n\ndef post_webhook(struct, url):\n req = urllib.request.Request(url, method = \"POST\")\n req.add_header(\"Content-Type\", \"application/json\")\n data = json.dumps(struct).encode()\n return urllib.request.urlopen(req, data = data).read()\n\ndef handler(event, context = None,\n colour = Levels[os.environ[\"SLACK_LOGGING_LEVEL\"]],\n webhookurl = os.environ[\"SLACK_WEBHOOK_URL\"]):\n struct = json.loads(gzip.decompress(base64.b64decode(event[\"awslogs\"][\"data\"])))\n text = json.dumps(struct)\n struct = {\"attachments\": [{\"text\": text,\n \"color\": colour}]}\n post_webhook(struct, webhookurl)\n"
},
"Environment": {
"Variables": {
"SLACK_LOGGING_LEVEL": "warning",
"SLACK_WEBHOOK_URL": {
"Ref": "SlackWebhookUrl"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"LogsWarningRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"LogsWarningPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "LogsWarningFunction"
},
"Principal": "logs.amazonaws.com"
},
"Type": "AWS::Lambda::Permission"
},
"LogsWarningPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "logs-warning-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "LogsWarningRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"LogsWarningRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
}
}
}

short

  • test pushing message

pareto2

  • SQS not Sqs
    • irregular required

done

  • extend push_message to push actual message
  • stack
  • script to push message to queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment