Skip to content

Instantly share code, notes, and snippets.

@mtsukamoto
Created July 17, 2021 06:01
Show Gist options
  • Save mtsukamoto/1b172967a7971d26a7ea2a7ded181645 to your computer and use it in GitHub Desktop.
Save mtsukamoto/1b172967a7971d26a7ea2a7ded181645 to your computer and use it in GitHub Desktop.
AWSBasics Amazon-Location-Service lambda.py modified.
from datetime import date, datetime
import json
import os
import boto3
# Update this to match the name of your Tracker resource
TRACKER_NAME = "handson-20210717"
"""
This Lambda function receives a payload from AWS IoT Core and publishes device updates to Amazon Location Service via the BatchUpdateDevicePosition API.
Parameter 'event' is the payload delivered from AWS IoT Core.
In this sample, we assume that the payload has a single top-level key 'payload' and a nested key
'location' with keys 'lat' and 'long'. We also assume that the name of the device is nested in
the payload as 'deviceid'. Finally, the timestamp of the payload is present as 'timestamp'. For
example:
>>> event
{ 'payload': { 'deviceid': 'thing123', 'timestamp': 1604940328,
'location': { 'lat': 49.2819, 'long': -123.1187 } } }
If your data does not match this schema, you can either use the AWS IoT Core rules engine to
format the data before delivering it to this Lambda function, or you can modify the code below to
match it.
"""
def json_serial(obj):
# see also: https://www.yoheim.net/blog.php?q=20170703
if isinstance(obj, (datetime, date)):
return obj.isoformat()
raise TypeError ("Type %s not serializable" % type(obj))
def lambda_handler(event, context):
# load the side-loaded Amazon Location Service model; needed during Public Preview
os.environ["AWS_DATA_PATH"] = os.environ["LAMBDA_TASK_ROOT"]
updates = [
{
"DeviceId": event["payload"]["deviceid"],
"SampleTime": datetime.fromtimestamp(event["payload"]["timestamp"]).isoformat(),
"Position": [
event["payload"]["location"]["long"],
event["payload"]["location"]["lat"]
]
}
]
client = boto3.client("location")
response = client.batch_update_device_position(TrackerName=TRACKER_NAME, Updates=updates)
return {
"statusCode": 200,
"body": json.dumps(response, default=json_serial)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment