Skip to content

Instantly share code, notes, and snippets.

@mlapida
Last active June 6, 2019 04:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mlapida/60748a378d4986170b6f to your computer and use it in GitHub Desktop.
Save mlapida/60748a378d4986170b6f to your computer and use it in GitHub Desktop.
A Lambda Function for receiving data from an API Endpoint and sending it to a DynamoDB table.
from __future__ import print_function
import logging
import boto3
from datetime import *
from boto3.dynamodb.conditions import Key, Attr
# enable basic logging to CloudWatch Logs
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# setup the DynamoDB table
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('SleepData')
# setup conversion to epoch
epoch = datetime.utcfromtimestamp(0)
now = datetime.now()
def lambda_handler(event, context):
# determine if the user is asleep
sleepState = sleepCheck()
# if the user has triggered the buttons, perform some logic
if event['data'] == "True":
# if the user is awake, toggle sleep. If not, visa versa
if sleepState == "awake":
table.put_item(
Item={
'event_name': event['name'],
'published_at': int(unix_time_millis(datetime.strptime(event['published_at'], "%Y-%m-%dT%H:%M:%S.%fZ"))),
'data': 'true',
'state': 'asleep',
'deviceID' : event['source']
}
)
else:
table.put_item(
Item={
'event_name': event['name'],
'published_at': int(unix_time_millis(datetime.strptime(event['published_at'], "%Y-%m-%dT%H:%M:%S.%fZ"))),
'data': 'true',
'state' : 'awake',
'deviceID' : event['source']
}
)
else:
# if the user is a sleep, and the buttons weren't pressed
# send the data to dynamodb
if sleepState == "asleep":
table.put_item(
Item={
'event_name': event['name'],
# convert the date/time to epoch for storage in the table
'published_at': int(unix_time_millis(datetime.strptime(event['published_at'], "%Y-%m-%dT%H:%M:%S.%fZ"))),
'data': int(event['data']),
'state': 'asleep',
'deviceID' : event['source']
}
)
else:
print("Not asleep")
print(event)
return("Success!")
# a function to convert the time to epoch
def unix_time_millis(dt):
return (dt - epoch).total_seconds() * 1000.0
# a function to check if the user is currernly "sleeping"
def sleepCheck():
fe = Key('data').eq('true')
pe = "published_at, deviceID, #da, #st"
ean = { "#da": "data", "#st": "state"}
esk = None
response = table.scan(
FilterExpression=fe,
ProjectionExpression=pe,
ExpressionAttributeNames=ean
)
x = len(response['Items'])
y = 0
for i in response['Items']:
y = y + 1
if y == x:
return(str(i['state']))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment