Skip to content

Instantly share code, notes, and snippets.

@gbhorwood
Created May 6, 2019 22:15
Show Gist options
  • Save gbhorwood/c3c0f8d8cf2adca10c4d52c3f3047ba4 to your computer and use it in GitHub Desktop.
Save gbhorwood/c3c0f8d8cf2adca10c4d52c3f3047ba4 to your computer and use it in GitHub Desktop.
AWS lambda sample service source file
import json
import os
import sys
packages_path = os.path.join(os.path.split(__file__)[0], "packages")
sys.path.append(packages_path)
#----------------------------------
# Handler functions
#
#
def postSubscription(event, context):
"""
Writes a new subscriber to the `registrations` table
"""
data = sqlInserRegistrations(event)
response = {
"statusCode": 201,
"headers": {
"Access-Control-Allow-Origin": "*"
}
}
return response
def getSubscriptions(event, context):
"""
Returns a page of subscribers from the `registrations`.
Paging done by optional query string args 'page' and 'size'.
Default 'page' is 1, default 'size' is 10.
"""
data = sqlSelectRegistrations(event)
body = {
"data": data
}
response = {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*"
},
"body": json.dumps(body)
}
return response
#----------------------------------
# SQL functions
#
#
def sqlSelectRegistrations(event):
"""
Select from `registrations` table with optional paging
"""
size = int(getQueryStringElement("size", event, 10))
page = int(getQueryStringElement("page", event, 1))
limit = size
offset = (size * page) - size
sql = """
SELECT first_name,
last_name,
email
FROM registrations
LIMIT {}
OFFSET {}
""".format(limit, offset)
data = executeRdsDataSql(sql)
return data
def sqlInserRegistrations(event):
"""
Inserts a record into the `registrations` table
"""
first_name = event.get('first_name')
last_name = event.get('last_name')
email = event.get('email')
sql = """
INSERT
INTO registrations
(first_name, last_name, email)
VALUES ('{}', '{}', '{}')
""".format(first_name, last_name, email)
data = executeRdsDataSql(sql)
#----------------------------------
# Private functions
#
#
def executeRdsDataSql(sql):
"""
Calls aurora via rds-data data api and executes passed sql, returning munged result
Creates client connection.
Parameters:
sql (string): The sql to execute
Returns: string
"""
# get the client
client = getRdsDataClient()
# call data-api to execute sql
response = client.execute_sql(
awsSecretStoreArn=getDotEnv("AWSSECRETSTOREARN"),
database=getDotEnv("DATABASE"),
dbClusterOrInstanceArn=getDotEnv("DBCLUSTERORINSTANCEARN"),
sqlStatements=sql,
)
# turn the rds response json into key/val json
return mungeBoto3Rds(response)
def getRdsDataClient():
"""
Creates a boto3 rds-data client for use by executeRdsDataSql()
Note: You must bundle boto3 and botocore with your app as the AWS default boto3 does not include rds-data
Returns: client
"""
import botocore
import boto3
# valuable info stored in .env
aws_access_key_id = str(getDotEnv("ACCESS_KEY_ID"))
aws_secret_access_key = str(getDotEnv("SECRET_ACCESS_KEY"))
region_name = str(getDotEnv("REGION_NAME"))
# create the client
client = boto3.client(
'rds-data',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name,
)
return client
def getDotEnv(element):
"""
Gets a value from the .env by its key
Parameters:
element (string): The name of the element to get from the .env file
Returns: string or None
"""
from dotenv import load_dotenv, find_dotenv
from os.path import join, dirname
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
return os.getenv(element)
def getQueryStringElement(element, event, default=None):
"""
Gets value of element from the query string or default if not exists
Parameters:
element (string): The name of the element to get from the query string
event (dict): The event
default : The default value to return if no value is on the query string, or None if default not set
Returns: string or None
"""
if event is None:
return default
elif not('queryStringParameters' in event):
return default
if event['queryStringParameters'] is None:
return default
elif element in event['queryStringParameters']:
return event['queryStringParameters'].get(element)
else:
return default
def mungeBoto3Rds(response):
"""
Converts the json returned by rds-data execute_sql() into key/val json
Parameters:
response (dict): The response from the call to execute_sql()
Returns: string
"""
records = response.get('sqlStatementResults')[0].get('resultFrame').get('records')
keysList = list(map(lambda i: str(i.get('name')), response.get('sqlStatementResults')[0].get('resultFrame').get('resultSetMetadata').get('columnMetadata')))
data = []
for record in records:
tmp = {}
for i in range(0, len(record.get('values')) ):
dataType = list(record.get('values')[i])[0]
dataValue = None
if dataType == "stringValue" or dataType == "blobValue":
dataValue = str(record.get('values')[i][dataType])
if dataType == "intValue" or dataType == "bigIntValue":
dataValue = int(record.get('values')[i][dataType])
if dataType == "bitValue" or dataType == "isNull":
dataValue = bool(record.get('values')[i][dataType])
if dataType == "doubleValue" or dataType == "realValue":
dataValue = float(record.get('values')[i][dataType])
tmp[keysList[i]] = dataValue
data.append(tmp)
return data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment