Skip to content

Instantly share code, notes, and snippets.

@jhw
Last active March 24, 2024 11:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhw/719e3aad88cf897d380ab14ef62b4d0a to your computer and use it in GitHub Desktop.
Save jhw/719e3aad88cf897d380ab14ef62b4d0a to your computer and use it in GitHub Desktop.
DynamoDB + DDB Streams + EventBridge demo
env
*.pyc
__pycache__
tmp
setenv-priv.sh
AppName=streaming-table-demo
from botocore.exceptions import ClientError
import boto3, os, re, sys
def hungarorise(text):
return "".join([tok.capitalize()
for tok in re.split("\\-|\\_", text)])
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
try:
if len(sys.argv) < 3:
raise RuntimeError("please enter league name, team name")
leaguename, teamname = sys.argv[1:3]
props=dict([tuple(row.split("="))
for row in open("app.props").read().split("\n")
if row!=''])
stackname=props["AppName"]
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, stackname)
tablekey=hungarorise("app-table")
if tablekey not in outputs:
raise RuntimeError("table not found")
tablename=outputs[tablekey]
table=boto3.resource("dynamodb").Table(tablename)
key={"pk": f"LEAGUE#{leaguename}",
"sk": f"TEAM#{teamname}"}
print (table.delete_item(Key=key))
except RuntimeError as error:
print ("Error: %s" % str(error))
except ClientError as error:
print ("Error: %s" % str(error))
#!/usr/bin/env bash
. app.props
aws cloudformation delete-stack --stack-name $AppName
#!/usr/bin/env bash
. app.props
echo "SlackWebhookUrl: $SLACK_WEBHOOK_URL"
aws cloudformation deploy --stack-name $AppName --template-file stack.json --capabilities CAPABILITY_NAMED_IAM --parameter-overrides SlackWebhookUrl=$SLACK_WEBHOOK_URL
from botocore.exceptions import ClientError
import boto3, os, re, sys, time
def fetch_log_events(logs, kwargs):
events, token = [], None
while True:
if token:
kwargs["nextToken"]=token
resp=logs.filter_log_events(**kwargs)
events+=resp["events"]
if "nextToken" in resp:
token=resp["nextToken"]
else:
break
return sorted(events,
key=lambda x: x["timestamp"])
if __name__=="__main__":
try:
if len(sys.argv) < 3:
raise RuntimeError("please enter lambda name, window")
lambdaname, window = sys.argv[1:3]
if not re.search("^\\d+$", window):
raise RuntimeError("window is invalid")
window=int(window)
logs=boto3.client("logs")
starttime=int(1000*(time.time()-window))
loggroupname="/aws/lambda/%s" % lambdaname
kwargs={"logGroupName": loggroupname,
"startTime": starttime,
"interleaved": True}
events=fetch_log_events(logs, kwargs)
for event in events:
msg=re.sub("\\r|\\n", "", event["message"])
print (msg)
except RuntimeError as error:
print ("Error: %s" % str(error))
except ClientError as error:
print ("Error: %s" % str(error))
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-events --stack-name $AppName --query "StackEvents[].{\"1.Timestamp\":Timestamp,\"2.Id\":LogicalResourceId,\"3.Type\":ResourceType,\"4.Status\":ResourceStatus,\"5.Reason\":ResourceStatusReason}"
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stacks --stack-name $AppName --query 'Stacks[0].Outputs' --output table
#!/usr/bin/env bash
. app.props
aws cloudformation describe-stack-resources --stack-name $AppName --query "StackResources[].{\"1.Timestamp\":Timestamp,\"2.LogicalId\":LogicalResourceId,\"3.PhysicalId\":PhysicalResourceId,\"4.Type\":ResourceType,\"5.Status\":ResourceStatus}"
#!/usr/bin/env bash
aws cloudformation describe-stacks --query "Stacks[].{\"1.Name\":StackName,\"2.Status\":StackStatus}"
from botocore.exceptions import ClientError
import boto3, os, re, sys
def hungarorise(text):
return "".join([tok.capitalize()
for tok in re.split("\\-|\\_", text)])
def fetch_outputs(cf, stackname):
outputs={}
for stack in cf.describe_stacks()["Stacks"]:
if (stack["StackName"].startswith(stackname) and
"Outputs" in stack):
for output in stack["Outputs"]:
outputs[output["OutputKey"]]=output["OutputValue"]
return outputs
if __name__=="__main__":
try:
if len(sys.argv) < 3:
raise RuntimeError("please enter league name, team name")
leaguename, teamname = sys.argv[1:3]
props=dict([tuple(row.split("="))
for row in open("app.props").read().split("\n")
if row!=''])
stackname=props["AppName"]
cf=boto3.client("cloudformation")
outputs=fetch_outputs(cf, stackname)
tablekey=hungarorise("app-table")
if tablekey not in outputs:
raise RuntimeError("table not found")
tablename=outputs[tablekey]
table=boto3.resource("dynamodb").Table(tablename)
item={"pk": f"LEAGUE#{leaguename}",
"sk": f"TEAM#{teamname}"}
# print (item)
print (table.put_item(Item=item))
except RuntimeError as error:
print ("Error: %s" % str(error))
except ClientError as error:
print ("Error: %s" % str(error))
awscli
boto3
botocore
pyyaml
#!/usr/bin/env bash
export AWS_DEFAULT_OUTPUT=table
export AWS_PROFILE=#{your-aws-profile-here}
export AWS_REGION=#{your-aws-region-here}
export SLACK_WEBHOOK_URL=#{your-slack-webhook-url-here}
{
"Outputs": {
"AppTable": {
"Value": {
"Ref": "AppTable"
}
}
},
"Parameters": {
"SlackWebhookUrl": {
"Type": "String"
}
},
"Resources": {
"AppStreamingTableErrorSubscriptionFilter": {
"DependsOn": [
"AppStreamingTableLogStream",
"SlackErrorPermission"
],
"Properties": {
"DestinationArn": {
"Fn::GetAtt": [
"SlackErrorFunction",
"Arn"
]
},
"FilterPattern": "ERROR",
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${AppStreamingTableFunction}"
}
},
"Type": "AWS::Logs::SubscriptionFilter"
},
"AppStreamingTableEventSourceMapping": {
"Properties": {
"EventSourceArn": {
"Fn::GetAtt": [
"AppTable",
"StreamArn"
]
},
"FunctionName": {
"Ref": "AppStreamingTableFunction"
},
"MaximumBatchingWindowInSeconds": 0,
"MaximumRetryAttempts": 0,
"StartingPosition": "LATEST"
},
"Type": "AWS::Lambda::EventSourceMapping"
},
"AppStreamingTableFunction": {
"Properties": {
"Code": {
"ZipFile": "import boto3, json, math, os\n\nclass Key:\n\n def __init__(self, pk, sk, eventname):\n self.pk = pk\n self.sk = sk\n self.eventname = eventname\n\n def __str__(self):\n return \"%s/%s/%s\" % (self.pk,\n self.sk,\n self.eventname)\n\n\"\"\"\n- EventBridge required fields are Source, DetailType, Detail\n- record[\"eventName\"] is used as DetailType\n- eventName could be INSERT, MODIFY, DELETE\n\"\"\"\n \nclass Entry:\n\n def __init__(self, key, records, source):\n self.key = key\n self.records = records\n self.source = source\n\n @property\n def entry(self): \n detail = {\"pk\": self.key.pk,\n \"sk\": self.key.sk,\n \"eventName\": self.key.eventname,\n \"records\": self.records}\n detailtype = self.key.eventname\n return {\"Source\": self.source,\n \"DetailType\": detailtype,\n \"Detail\": json.dumps(detail)}\n\ndef batch_records(records):\n keys, groups = {}, {}\n for record in records:\n pk = record[\"dynamodb\"][\"Keys\"][\"pk\"][\"S\"]\n sk = record[\"dynamodb\"][\"Keys\"][\"sk\"][\"S\"].split(\"#\")[0]\n eventname = record[\"eventName\"]\n key = Key(pk = pk,\n sk = sk,\n eventname = eventname)\n strkey = str(key)\n if strkey not in keys:\n keys[strkey] = key\n groups.setdefault(strkey, [])\n groups[strkey].append(record)\n return [(key, groups[strkey])\n for strkey, key in keys.items()]\n\n\"\"\"\n- EventBridge max batch size is 10\n\"\"\"\n\ndef handler(event, context, batchsize = 10):\n source = os.environ[\"TABLE_NAME\"]\n groups = batch_records(event[\"Records\"])\n entries = [Entry(k, v, source).entry\n for k, v in groups]\n if entries != []:\n events = boto3.client(\"events\")\n nbatches = math.ceil(len(entries)/batchsize)\n for i in range(nbatches):\n batch = entries[i*batchsize: (i+1)*batchsize]\n events.put_events(Entries = batch)\n\n"
},
"Environment": {
"Variables": {
"TABLE_NAME": {
"Ref": "AppTable"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"AppStreamingTableRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"AppStreamingTableLogGroup": {
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${AppStreamingTableFunction}"
},
"RetentionInDays": 3
},
"Type": "AWS::Logs::LogGroup"
},
"AppStreamingTableLogStream": {
"DependsOn": [
"AppStreamingTableLogGroup"
],
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${AppStreamingTableFunction}"
}
},
"Type": "AWS::Logs::LogStream"
},
"AppStreamingTablePolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"AppTable",
"StreamArn"
]
}
},
{
"Action": [
"events:PutEvents"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "app-streaming-table-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "AppStreamingTableRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"AppStreamingTableRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"AppTable": {
"Properties": {
"AttributeDefinitions": [
{
"AttributeName": "pk",
"AttributeType": "S"
},
{
"AttributeName": "sk",
"AttributeType": "S"
}
],
"BillingMode": "PAY_PER_REQUEST",
"KeySchema": [
{
"AttributeName": "pk",
"KeyType": "HASH"
},
{
"AttributeName": "sk",
"KeyType": "RANGE"
}
],
"StreamSpecification": {
"StreamViewType": "NEW_AND_OLD_IMAGES"
}
},
"Type": "AWS::DynamoDB::Table"
},
"DemoErrorSubscriptionFilter": {
"DependsOn": [
"DemoLogStream",
"SlackErrorPermission"
],
"Properties": {
"DestinationArn": {
"Fn::GetAtt": [
"SlackErrorFunction",
"Arn"
]
},
"FilterPattern": "ERROR",
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
}
},
"Type": "AWS::Logs::SubscriptionFilter"
},
"DemoEventInvokeConfig": {
"Properties": {
"FunctionName": {
"Ref": "DemoFunction"
},
"MaximumRetryAttempts": 0,
"Qualifier": "$LATEST"
},
"Type": "AWS::Lambda::EventInvokeConfig"
},
"DemoFoobarRule": {
"Properties": {
"EventPattern": {
"detail": {
"eventName": [
"INSERT"
],
"pk": [
{
"prefix": "LEAGUE"
}
]
},
"source": [
{
"Ref": "AppTable"
}
]
},
"State": "ENABLED",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"DemoFunction",
"Arn"
]
},
"Id": {
"Fn::Sub": "demo-foobar-${AWS::StackName}"
}
}
]
},
"Type": "AWS::Events::Rule"
},
"DemoFunction": {
"Properties": {
"Code": {
"ZipFile": "\nimport logging\n\nlogger=logging.getLogger()\nlogger.setLevel(logging.INFO)\n\ndef handler(event, context=None):\n logger.warning(str(event))\n"
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"DemoRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"DemoLogGroup": {
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
},
"RetentionInDays": 3
},
"Type": "AWS::Logs::LogGroup"
},
"DemoLogStream": {
"DependsOn": [
"DemoLogGroup"
],
"Properties": {
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
}
},
"Type": "AWS::Logs::LogStream"
},
"DemoPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "DemoFunction"
},
"Principal": "events.amazonaws.com",
"SourceArn": {
"Fn::GetAtt": [
"DemoFoobarRule",
"Arn"
]
}
},
"Type": "AWS::Lambda::Permission"
},
"DemoPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
},
{
"Action": [
"s3:GetObject"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "demo-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "DemoRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"DemoRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"DemoWarningSubscriptionFilter": {
"DependsOn": [
"DemoLogStream",
"SlackWarningPermission"
],
"Properties": {
"DestinationArn": {
"Fn::GetAtt": [
"SlackWarningFunction",
"Arn"
]
},
"FilterPattern": "%WARNING|Task timed out%",
"LogGroupName": {
"Fn::Sub": "/aws/lambda/${DemoFunction}"
}
},
"Type": "AWS::Logs::SubscriptionFilter"
},
"SlackErrorFunction": {
"Properties": {
"Code": {
"ZipFile": "import base64, gzip, json, os, urllib.request\n\n# https://colorswall.com/palette/3\n\nLevels={\"info\": \"#5bc0de\",\n \"warning\": \"#f0ad4e\",\n \"error\": \"#d9534f\"}\n\ndef post_webhook(struct, url):\n req = urllib.request.Request(url, method = \"POST\")\n req.add_header(\"Content-Type\", \"application/json\")\n data = json.dumps(struct).encode()\n return urllib.request.urlopen(req, data = data).read()\n\ndef handler(event, context = None,\n colour = Levels[os.environ[\"SLACK_LOGGING_LEVEL\"]],\n webhookurl = os.environ[\"SLACK_WEBHOOK_URL\"]):\n struct = json.loads(gzip.decompress(base64.b64decode(event[\"awslogs\"][\"data\"])))\n text = json.dumps(struct)\n struct = {\"attachments\": [{\"text\": text,\n \"color\": colour}]}\n post_webhook(struct, webhookurl)\n"
},
"Environment": {
"Variables": {
"SLACK_LOGGING_LEVEL": "error",
"SLACK_WEBHOOK_URL": {
"Ref": "SlackWebhookUrl"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"SlackErrorRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"SlackErrorPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "SlackErrorFunction"
},
"Principal": "logs.amazonaws.com"
},
"Type": "AWS::Lambda::Permission"
},
"SlackErrorPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "slack-error-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "SlackErrorRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"SlackErrorRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
},
"SlackWarningFunction": {
"Properties": {
"Code": {
"ZipFile": "import base64, gzip, json, os, urllib.request\n\n# https://colorswall.com/palette/3\n\nLevels={\"info\": \"#5bc0de\",\n \"warning\": \"#f0ad4e\",\n \"error\": \"#d9534f\"}\n\ndef post_webhook(struct, url):\n req = urllib.request.Request(url, method = \"POST\")\n req.add_header(\"Content-Type\", \"application/json\")\n data = json.dumps(struct).encode()\n return urllib.request.urlopen(req, data = data).read()\n\ndef handler(event, context = None,\n colour = Levels[os.environ[\"SLACK_LOGGING_LEVEL\"]],\n webhookurl = os.environ[\"SLACK_WEBHOOK_URL\"]):\n struct = json.loads(gzip.decompress(base64.b64decode(event[\"awslogs\"][\"data\"])))\n text = json.dumps(struct)\n struct = {\"attachments\": [{\"text\": text,\n \"color\": colour}]}\n post_webhook(struct, webhookurl)\n"
},
"Environment": {
"Variables": {
"SLACK_LOGGING_LEVEL": "warning",
"SLACK_WEBHOOK_URL": {
"Ref": "SlackWebhookUrl"
}
}
},
"Handler": "index.handler",
"MemorySize": 512,
"Role": {
"Fn::GetAtt": [
"SlackWarningRole",
"Arn"
]
},
"Runtime": "python3.10",
"Timeout": 5
},
"Type": "AWS::Lambda::Function"
},
"SlackWarningPermission": {
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Ref": "SlackWarningFunction"
},
"Principal": "logs.amazonaws.com"
},
"Type": "AWS::Lambda::Permission"
},
"SlackWarningPolicy": {
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
],
"Version": "2012-10-17"
},
"PolicyName": {
"Fn::Sub": "slack-warning-policy-${AWS::StackName}"
},
"Roles": [
{
"Ref": "SlackWarningRole"
}
]
},
"Type": "AWS::IAM::Policy"
},
"SlackWarningRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
}
},
"Type": "AWS::IAM::Role"
}
}
}

short

pareto2

  • add slack logging to streaming table and task queue patterns
  • add eventName INSERT to sample pattern

done

  • scripts to delete ddb items
  • add note to sst inline code noting that top level detail fields are pk, sk, eventName, records
  • change demo detail to use {pk: [{prefix: LEAGUE}]}
  • streaming role needs table stream arn not table arn
"Resource": {
	"Fn::GetAtt": ["AppTable", "StreamArn"]
}

  • DynamoDB not Dynamodb
    • likely requires irregular
[ERROR] Runtime.UserCodeSyntaxError: Syntax error in module 'index': invalid syntax (index.py, line 63)
Traceback (most recent call last):
  File "/var/task/index.py" Line 63
        if entries! = []:
  • table not visible
  • check item exists in table
  • check pattern detail
  • test pushing item
  • script to push item to dynamodb
|  2024-03-20T11:37:29.127Z|  AppStreamingTableEventSourceMapping |  AWS::Lambda::EventSourceMapping |  CREATE_FAILED        |  Resource handler returned message: "Invalid request provided: Cannot access stream arn:aws:dynamodb:eu-west-1:119552584133:table/streaming-table-demo-AppTable-1PDU9XFLLH9BB/stream/2024-03-20T11:36:52.715. Please ensure the role can perform the GetRecords, GetShardIterator, DescribeStream, and ListStreams Actions on your stream in IAM. (Service: Lambda, Status Code: 400, Request ID: 6517bdda-dfcb-4265-813c-210751467501)" (RequestToken: db51b898-2f80-c92b-0cc4-851fafee9663, HandlerErrorCode: InvalidRequest)   |
  • test deploying stack
  • python env
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment