Skip to content

Instantly share code, notes, and snippets.

@jordan314 jordan314/robotarmlamda.py
Last active Feb 25, 2018

Embed
What would you like to do?
Robot Arm Lamda that hits another node endpoint
# -*- coding: utf-8 -*-
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Amazon Software License (the "License"). You may not use this file except in
# compliance with the License. A copy of the License is located at
#
# http://aws.amazon.com/asl/
#
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Alexa Smart Home Lambda Function Sample Code.
This file demonstrates some key concepts when migrating an existing Smart Home skill Lambda to
v3, including recommendations on how to transfer endpoint/appliance objects, how v2 and vNext
handlers can be used together, and how to validate your v3 responses using the new Validation
Schema.
Note that this example does not deal with user authentication, only uses virtual devices, omits
a lot of implementation and error handling to keep the code simple and focused.
"""
import logging
import time
import json
import uuid
import urllib.request
# Imports for v3 validation
from validation import validate_message
# Setup logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# To simplify this sample Lambda, we omit validation of access tokens and retrieval of a specific
# user's appliances. Instead, this array includes a variety of virtual appliances in v2 API syntax,
# and will be used to demonstrate transformation between v2 appliances and v3 endpoints.
SAMPLE_APPLIANCES = [
{
"applianceId": "armopen",
"manufacturerName": "Designtion",
"modelName": "Open",
"version": "1",
"friendlyName": "Open",
"friendlyDescription": "Turn on Open",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armclose",
"manufacturerName": "Designtion",
"modelName": "Close",
"version": "1",
"friendlyName": "Close",
"friendlyDescription": "Turn on Close",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armback",
"manufacturerName": "Designtion",
"modelName": "Back",
"version": "1",
"friendlyName": "Back",
"friendlyDescription": "Turn on Back",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armforward",
"manufacturerName": "Designtion",
"modelName": "Forward",
"version": "1",
"friendlyName": "Forward",
"friendlyDescription": "Turn on Forward",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armtiltforward",
"manufacturerName": "Designtion",
"modelName": "Tilt Forward",
"version": "1",
"friendlyName": "Tilt Forward",
"friendlyDescription": "Turn on Tilt Forward",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armtiltback",
"manufacturerName": "Designtion",
"modelName": "Tilt Back",
"version": "1",
"friendlyName": "Tilt Back",
"friendlyDescription": "Turn on Tilt Back",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armleft",
"manufacturerName": "Designtion",
"modelName": "Left",
"version": "1",
"friendlyName": "Left",
"friendlyDescription": "Turn on Left",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armright",
"manufacturerName": "Designtion",
"modelName": "Right",
"version": "1",
"friendlyName": "Right",
"friendlyDescription": "Turn on Right",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armup",
"manufacturerName": "Designtion",
"modelName": "Up",
"version": "1",
"friendlyName": "Up",
"friendlyDescription": "Turn on Up",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armdown",
"manufacturerName": "Designtion",
"modelName": "Down",
"version": "1",
"friendlyName": "Down",
"friendlyDescription": "Turn on Down",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armstop",
"manufacturerName": "Designtion",
"modelName": "Stop",
"version": "1",
"friendlyName": "Stop",
"friendlyDescription": "Turn on Stop",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armplaywithcats",
"manufacturerName": "Designtion",
"modelName": "Play with Cats",
"version": "1",
"friendlyName": "Play with Cats",
"friendlyDescription": "Turn on Play with Cats",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armholdthis",
"manufacturerName": "Designtion",
"modelName": "Hold This",
"version": "1",
"friendlyName": "Hold This",
"friendlyDescription": "Turn on Hold This",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armletgo",
"manufacturerName": "Designtion",
"modelName": "Let Go",
"version": "1",
"friendlyName": "Let Go",
"friendlyDescription": "Turn on Let Go",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armchop",
"manufacturerName": "Designtion",
"modelName": "Chop",
"version": "1",
"friendlyName": "Chop",
"friendlyDescription": "Turn on Chop",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
{
"applianceId": "armthrowthisaway",
"manufacturerName": "Designtion",
"modelName": "Throw this away",
"version": "1",
"friendlyName": "Throw this away",
"friendlyDescription": "Turn on Throw this away",
"isReachable": True,
"actions": [
"turnOn",
"turnOff"
],
"additionalApplianceDetails": {}
},
]
def lambda_handler(request, context):
"""Main Lambda handler.
Since you can expect both v2 and v3 directives for a period of time during the migration
and transition of your existing users, this main Lambda handler must be modified to support
both v2 and v3 requests.
"""
try:
logger.info("Directive:")
logger.info(json.dumps(request, indent=4, sort_keys=True))
version = get_directive_version(request)
if version == "3":
logger.info("Received v3 directive!")
if request["directive"]["header"]["name"] == "Discover":
response = handle_discovery_v3(request)
else:
response = handle_non_discovery_v3(request)
else:
logger.info("Received v2 directive!")
if request["header"]["namespace"] == "Alexa.ConnectedHome.Discovery":
response = handle_discovery()
else:
response = handle_non_discovery(request)
logger.info("Response:")
logger.info(json.dumps(response, indent=4, sort_keys=True))
if version == "3":
logger.info("Validate v3 response")
validate_message(request, response)
return response
except ValueError as error:
logger.error(error)
raise
# v2 handlers
def handle_discovery():
header = {
"namespace": "Alexa.ConnectedHome.Discovery",
"name": "DiscoverAppliancesResponse",
"payloadVersion": "2",
"messageId": get_uuid()
}
payload = {
"discoveredAppliances": SAMPLE_APPLIANCES
}
response = {
"header": header,
"payload": payload
}
return response
def handle_non_discovery(request):
request_name = request["header"]["name"]
if request_name == "TurnOnRequest":
header = {
"namespace": "Alexa.ConnectedHome.Control",
"name": "TurnOnConfirmation",
"payloadVersion": "2",
"messageId": get_uuid()
}
payload = {}
elif request_name == "TurnOffRequest":
header = {
"namespace": "Alexa.ConnectedHome.Control",
"name": "TurnOffConfirmation",
"payloadVersion": "2",
"messageId": get_uuid()
}
# other handlers omitted in this example
payload = {}
response = {
"header": header,
"payload": payload
}
return response
# v2 utility functions
def get_appliance_by_appliance_id(appliance_id):
for appliance in SAMPLE_APPLIANCES:
if appliance["applianceId"] == appliance_id:
return appliance
return None
def get_utc_timestamp(seconds=None):
return time.strftime("%Y-%m-%dT%H:%M:%S.00Z", time.gmtime(seconds))
def get_uuid():
return str(uuid.uuid4())
# v3 handlers
def handle_discovery_v3(request):
endpoints = []
for appliance in SAMPLE_APPLIANCES:
endpoints.append(get_endpoint_from_v2_appliance(appliance))
response = {
"event": {
"header": {
"namespace": "Alexa.Discovery",
"name": "Discover.Response",
"payloadVersion": "3",
"messageId": get_uuid()
},
"payload": {
"endpoints": endpoints
}
}
}
return response
def handle_non_discovery_v3(request):
request_namespace = request["directive"]["header"]["namespace"]
request_name = request["directive"]["header"]["name"]
endpoint_id = request["directive"]["endpoint"]["endpointId"]
if endpoint_id == "armopen" and request_name == "TurnOn":
logger.info("Received turn on open")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonopen").read()
# if(endpoint_id == "armopen") and request_name == "TurnOff":
# logger.info("Received turn off open")
# urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnoffopen").read()
if endpoint_id == "armclose" and request_name == "TurnOn":
logger.info("Received turn on close")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonclose").read()
# if(endpoint_id == "armclose") and request_name == "TurnOff":
# logger.info("Received turn off close")
# urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnoffclose").read()
if endpoint_id == "armback" and request_name == "TurnOn":
logger.info("Received armback")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonback").read()
if(endpoint_id == "armforward") and request_name == "TurnOff":
logger.info("Received armforward")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonforward").read()
if endpoint_id == "armtiltforward" and request_name == "TurnOn":
logger.info("Received turn on right")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnontiltforward").read()
if(endpoint_id == "armtiltback") and request_name == "TurnOn":
logger.info("Received turn off right")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnontiltback").read()
if(endpoint_id == "armleft") and request_name == "TurnOn":
logger.info("Received turn on left")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonleft").read()
if(endpoint_id == "armright") and request_name == "TurnOn":
logger.info("Received turn on right")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonright").read()
if(endpoint_id == "armup") and request_name == "TurnOn":
logger.info("Received turn on up")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonup").read()
if(endpoint_id == "armdown") and request_name == "TurnOn":
logger.info("Received turn on down")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnondown").read()
if(endpoint_id == "armstop") and request_name == "TurnOn":
logger.info("Received turn on stop")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonstop").read()
if(endpoint_id == "armplaywithcats") and request_name == "TurnOn":
logger.info("Received turn on stop")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonplaywithcats").read()
if(endpoint_id == "armholdthis") and request_name == "TurnOn":
logger.info("Received turn on hold this")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonholdthis").read()
if(endpoint_id == "armletgo") and request_name == "TurnOn":
logger.info("Received turn on hold this")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonletgo").read()
if(endpoint_id == "armchop") and request_name == "TurnOn":
logger.info("Received turn on chop")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonchop").read()
if(endpoint_id == "armthrowthisaway") and request_name == "TurnOn":
logger.info("Received turn thorw this away")
urllib.request.urlopen("http://{mydomain}:{myport}/?token={mytoken}&command=turnonthrowthisaway").read()
if request_namespace == "Alexa.PowerController":
if request_name == "TurnOn":
value = "ON"
else:
value = "OFF"
response = {
"context": {
"properties": [
{
"namespace": "Alexa.PowerController",
"name": "powerState",
"value": value,
"timeOfSample": get_utc_timestamp(),
"uncertaintyInMilliseconds": 500
}
]
},
"event": {
"header": {
"namespace": "Alexa",
"name": "Response",
"payloadVersion": "3",
"messageId": get_uuid(),
"correlationToken": request["directive"]["header"]["correlationToken"]
},
"endpoint": {
"scope": {
"type": "BearerToken",
"token": "access-token-from-Amazon"
},
"endpointId": request["directive"]["endpoint"]["endpointId"]
},
"payload": {}
}
}
return response
elif request_namespace == "Alexa.Authorization":
if request_name == "AcceptGrant":
response = {
"event": {
"header": {
"namespace": "Alexa.Authorization",
"name": "AcceptGrant.Response",
"payloadVersion": "3",
"messageId": "5f8a426e-01e4-4cc9-8b79-65f8bd0fd8a4"
},
"payload": {}
}
}
return response
# other handlers omitted in this example
# v3 utility functions
def get_endpoint_from_v2_appliance(appliance):
endpoint = {
"endpointId": appliance["applianceId"],
"manufacturerName": appliance["manufacturerName"],
"friendlyName": appliance["friendlyName"],
"description": appliance["friendlyDescription"],
"displayCategories": [],
"cookie": appliance["additionalApplianceDetails"],
"capabilities": []
}
endpoint["displayCategories"] = get_display_categories_from_v2_appliance(appliance)
endpoint["capabilities"] = get_capabilities_from_v2_appliance(appliance)
return endpoint
def get_directive_version(request):
try:
return request["directive"]["header"]["payloadVersion"]
except:
try:
return request["header"]["payloadVersion"]
except:
return "-1"
def get_endpoint_by_endpoint_id(endpoint_id):
appliance = get_appliance_by_appliance_id(endpoint_id)
if appliance:
return get_endpoint_from_v2_appliance(appliance)
return None
def get_display_categories_from_v2_appliance(appliance):
model_name = appliance["modelName"]
displayCategories = ["SWITCH"]
return displayCategories
def get_capabilities_from_v2_appliance(appliance):
model_name = appliance["modelName"]
capabilities = [
{
"type": "AlexaInterface",
"interface": "Alexa.PowerController",
"version": "3",
"properties": {
"supported": [
{ "name": "powerState" }
],
"proactivelyReported": True,
"retrievable": True
}
}
]
# additional capabilities that are required for each endpoint
endpoint_health_capability = {
"type": "AlexaInterface",
"interface": "Alexa.EndpointHealth",
"version": "3",
"properties": {
"supported":[
{ "name":"connectivity" }
],
"proactivelyReported": True,
"retrievable": True
}
}
alexa_interface_capability = {
"type": "AlexaInterface",
"interface": "Alexa",
"version": "3"
}
capabilities.append(endpoint_health_capability)
capabilities.append(alexa_interface_capability)
return capabilities
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.