Skip to content

Instantly share code, notes, and snippets.

@shijaz
Created July 15, 2018 19:59
Show Gist options
  • Save shijaz/32208144aeeae1f2ab93fa73e4ffd95e to your computer and use it in GitHub Desktop.
Save shijaz/32208144aeeae1f2ab93fa73e4ffd95e to your computer and use it in GitHub Desktop.
lambda_function.py for the Lambda Function 'SeeRobot'
# lambda_function.py for the Lambda Function 'SeeRobot'
# Shijaz Abdulla - www.awsomenow.com
import os
import logging
import shadow_updater
# Get the environment variables
clientId = os.environ.get('AWS_IOT_MQTT_CLIENT_ID')
thingName = os.environ.get("AWS_IOT_THING_NAME")
host = os.environ.get("AWS_IOT_MQTT_HOST")
port = os.environ.get("AWS_IOT_MQTT_PORT_UPDATE")
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def close(session_attributes, fulfillment_state, message):
response = {
'sessionAttributes': session_attributes,
'dialogAction': {
'type': 'Close',
'fulfillmentState': fulfillment_state,
'message': message
}
}
return response
def delegate(session_attributes, slots):
return {
'sessionAttributes': session_attributes,
'dialogAction': {
'type': 'Delegate',
'slots': slots
}
}
def build_validation_result(is_valid, message_content):
if message_content is None:
return {
"isValid": is_valid,
"violatedSlot": violated_slot,
}
return {
'isValid': is_valid,
'violatedSlot': violated_slot,
'message': {'contentType': 'PlainText', 'content': message_content}
}
def validate_cheese(cheese_option):
cheese_options = ['SEE', 'STOP']
if cheese_option is not None and cheese_option not in cheese_options:
return build_validation_result(False, 'I do not understand {}, try again.'.format(cheese_option))
def see_robot(intent_request):
cheese_option = "SEE"
source = intent_request['invocationSource']
# Update thing shadow
new_value_dict = {"cheese_option":cheese_option.upper()}
shadow_updater.update_shadow(new_value_dict)
# Update thing shadow with a stop to allow it to see twice consecutively
new_value_dict = {"cheese_option":"STOP"}
shadow_updater.update_shadow(new_value_dict)
# Send appropriate message to user via Lex while closing
if cheese_option != 'STOP':
return close(intent_request['sessionAttributes'],
'Fulfilled',
{'contentType': 'PlainText',
'content': 'Robot is looking for objects, and will speak to you.'})
else:
return close(intent_request['sessionAttributes'],
'Fulfilled',
{'contentType': 'PlainText',
'content': 'I\'ve stopped looking.'})
# Intents
def dispatch(intent_request):
# Called when the user specifies an intent for this bot.
logger.debug('dispatch userId={}, intentName={}'.format(intent_request['userId'], intent_request['currentIntent']['name']))
intent_name = intent_request['currentIntent']['name']
# Dispatch to your bot's intent handlers
if intent_name == 'SeeObject':
return see_robot(intent_request)
raise Exception('Intent with name ' + intent_name + ' not supported')
# Main handler
def lambda_handler(event, context):
logger.debug('event.bot.name={}'.format(event['bot']['name']))
return dispatch(event)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment