Skip to content

Instantly share code, notes, and snippets.

@jipengxiang
Last active May 11, 2019 04:41
Show Gist options
  • Save jipengxiang/67e8167fd06a1fd8ed85210dfe51a47d to your computer and use it in GitHub Desktop.
Save jipengxiang/67e8167fd06a1fd8ed85210dfe51a47d to your computer and use it in GitHub Desktop.
Serverless Architecture
{
"AWSTemplateFormatVersion": "2010-09-09",
"Description" : "CCA Lab 12 - v3.1: Implementing a Serverless Architecture with AWS Managed Services",
"Resources": {
"InputS3BucketForTransactionsFiles": {
"Type": "AWS::S3::Bucket"
},
"CustomerDynamoDBTable": {
"Type": "AWS::DynamoDB::Table",
"Properties": {
"TableName": "Customer",
"AttributeDefinitions": [
{
"AttributeName": "CustomerId",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "CustomerId",
"KeyType": "HASH"
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": "5",
"WriteCapacityUnits": "5"
}
}
},
"TransactionsDynamoDBTable": {
"Type": "AWS::DynamoDB::Table",
"DependsOn": "CustomerDynamoDBTable",
"Properties": {
"TableName": "Transactions",
"AttributeDefinitions": [
{
"AttributeName": "CustomerId",
"AttributeType": "S"
},
{
"AttributeName": "TransactionId",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "CustomerId",
"KeyType": "HASH"
},
{
"AttributeName": "TransactionId",
"KeyType": "RANGE"
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": "5",
"WriteCapacityUnits": "5"
},
"StreamSpecification": {
"StreamViewType": "NEW_IMAGE"
}
}
},
"TransactionTotalDynamoDBTable": {
"Type": "AWS::DynamoDB::Table",
"DependsOn": "TransactionsDynamoDBTable",
"Properties": {
"TableName": "TransactionTotal",
"AttributeDefinitions": [
{
"AttributeName": "CustomerId",
"AttributeType": "S"
}
],
"KeySchema": [
{
"AttributeName": "CustomerId",
"KeyType": "HASH"
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": "5",
"WriteCapacityUnits": "5"
}
}
},
"TransactionProcessorRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"RoleName": "TransactionProcessorRole",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess",
"arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess"
],
"Policies": [ {
"PolicyName": "CWLogsPolicy",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*",
"Effect": "Allow"
}
]}
}
]
}
},
"TotalNotifierRole": {
"Type": "AWS::IAM::Role",
"Properties": {
"RoleName": "TotalNotifierRole",
"AssumeRolePolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
},
"ManagedPolicyArns": [
"arn:aws:iam::aws:policy/AmazonSNSFullAccess",
"arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess"
],
"Policies": [
{
"PolicyName": "CWLogsPolicy",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*",
"Effect": "Allow"
}
]}
},
{
"PolicyName": "StreamsPolicy",
"PolicyDocument": {
"Version" : "2012-10-17",
"Statement": [
{
"Action": [
"dynamodb:GetRecords",
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:ListStreams"
],
"Resource": "*",
"Effect": "Allow"
}
]}
}
]
}
}
},
"Outputs": {
"InputS3BucketForTransactionsFilesName": {
"Value": {
"Ref": "InputS3BucketForTransactionsFiles"
},
"Description": "Name of the S3 bucket in which transactions text file should be uploaded"
},
"CustomerDynamoDBTable": {
"Value": {
"Ref": "CustomerDynamoDBTable"
},
"Description": "Customer table in DynamoDB"
},
"TransactionsDynamoDBTable": {
"Value": {
"Ref": "TransactionsDynamoDBTable"
},
"Description": "Transactions table in DynamoDB. This table has streams enabled"
},
"TransactionTotalDynamoDBTable": {
"Value": {
"Ref": "TransactionTotalDynamoDBTable"
},
"Description": "TansactionTotal table in DynamoDB"
}
}
}
# TotalNotifier Lambda function
#
# This function is triggered when values are inserted into the Transactions DynamoDB table.
# Transaction totals are calculated and notifications are sent to SNS if limits are exceeded.
from __future__ import print_function
import json, boto3
# Connect to SNS
sns = boto3.client('sns')
alertTopic = 'HighBalanceAlert'
snsTopicArn = [t['TopicArn'] for t in sns.list_topics()['Topics'] if t['TopicArn'].endswith(':' + alertTopic)][0]
# Connect to DynamoDB
dynamodb = boto3.resource('dynamodb')
transactionTotalTableName = 'TransactionTotal'
transactionsTotalTable = dynamodb.Table(transactionTotalTableName);
# This handler is executed every time the Lambda function is triggered
def lambda_handler(event, context):
# Show the incoming event in the debug log
print("Event received by Lambda function: " + json.dumps(event, indent=2))
# For each transaction added, calculate the new Transactions Total
for record in event['Records']:
customerId = record['dynamodb']['NewImage']['CustomerId']['S']
transactionAmount = int(record['dynamodb']['NewImage']['TransactionAmount']['N'])
# Update the customer's total in the TransactionTotal DynamoDB table
response = transactionsTotalTable.update_item(
Key={
'CustomerId': customerId
},
UpdateExpression="add accountBalance :val",
ExpressionAttributeValues={
':val': transactionAmount
},
ReturnValues="UPDATED_NEW"
)
# Retrieve the latest account balance
latestAccountBalance = response['Attributes']['accountBalance']
print("Latest account balance: " + format(latestAccountBalance))
# If balance > $1500, send a message to SNS
if latestAccountBalance >= 1500:
# Construct message to be sent
message = '{"customerID": "' + customerId + '", ' + '"accountBalance": "' + str(latestAccountBalance) + '"}'
print(message)
# Send message to SNS
sns.publish(
TopicArn=snsTopicArn,
Message=message,
Subject='Warning! Account balance is very high',
MessageStructure='raw'
)
# Finished!
return 'Successfully processed {} records.'.format(len(event['Records']))
# TransactionProcessor Lambda function
#
# This function is triggered by an object being created in an Amazon S3 bucket.
# The file is downloaded and each line is inserted into DynamoDB tables.
from __future__ import print_function
import json, urllib, boto3, csv
# Connect to S3 and DynamoDB
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')
# Connect to the DynamoDB tables
customerTable = dynamodb.Table('Customer');
transactionsTable = dynamodb.Table('Transactions');
# This handler is executed every time the Lambda function is triggered
def lambda_handler(event, context):
# Show the incoming event in the debug log
print("Event received by Lambda function: " + json.dumps(event, indent=2))
# Get the bucket and object key from the Event
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']).decode('utf8')
localFilename = '/tmp/transactions.txt'
# Download the file from S3 to the local filesystem
try:
s3.meta.client.download_file(bucket, key, localFilename)
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
# Read the Transactions CSV file. Delimiter is the '|' character
with open(localFilename) as csvfile:
reader = csv.DictReader(csvfile, delimiter='|')
# Read each row in the file
rowCount = 0
for row in reader:
rowCount += 1
# Show the row in the debug log
print(row['customer_id'], row['customer_address'], row['trn_id'], row['trn_date'], row['trn_amount'])
try:
# Insert Customer ID and Address into Customer DynamoDB table
customerTable.put_item(
Item={
'CustomerId': row['customer_id'],
'Address': row['customer_address']})
# Insert transaction details into Transactions DynamoDB table
transactionsTable.put_item(
Item={
'CustomerId': row['customer_id'],
'TransactionId': row['trn_id'],
'TransactionDate': row['trn_date'],
'TransactionAmount': int(row['trn_amount'])})
except Exception as e:
print(e)
print("Unable to insert data into DynamoDB table".format(e))
# Finished!
return "%d transactions inserted" % rowCount
customer_id|customer_address|trn_id|trn_date|trn_amount
C1|1 Smith Street, London|T01|03/16/2017|100
C2|2 Smith Street, London|T02|03/16/2017|200
C2|2 Smith Street, London|T03|03/16/2017|50
C2|2 Smith Street, London|T04|03/16/2017|300
C2|2 Smith Street, London|T05|03/16/2017|100
C2|2 Smith Street, London|T06|03/16/2017|150
C2|2 Smith Street, London|T07|03/16/2017|400
C2|2 Smith Street, London|T08|03/16/2017|50
C2|2 Smith Street, London|T09|03/16/2017|50
C2|2 Smith Street, London|T10|03/16/2017|10
C2|2 Smith Street, London|T11|03/16/2017|10
C2|2 Smith Street, London|T12|03/16/2017|10
C2|2 Smith Street, London|T13|03/16/2017|20
C1|1 Smith Street, London|T14|03/16/2017|51
C1|1 Smith Street, London|T15|03/16/2017|25
C1|1 Smith Street, London|T16|03/16/2017|27
C1|1 Smith Street, London|T17|03/16/2017|29
C1|1 Smith Street, London|T18|03/16/2017|19
C1|1 Smith Street, London|T19|03/16/2017|33
C1|1 Smith Street, London|T20|03/16/2017|35
C1|1 Smith Street, London|T21|03/16/2017|39
C1|1 Smith Street, London|T22|03/16/2017|41
C1|1 Smith Street, London|T23|03/16/2017|199
C2|2 Smith Street, London|T24|03/16/2017|400
@jipengxiang
Copy link
Author

image

@jipengxiang
Copy link
Author

image
image
image

@jipengxiang
Copy link
Author

jipengxiang commented May 11, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment