Created
July 15, 2018 19:22
-
-
Save shijaz/15225fc67945938b126d3a6064d2d7af to your computer and use it in GitHub Desktop.
lambda_function.py for AWS Lambda function - 'MoveRobot'
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# lambda_function.py for AWS Lambda function - 'MoveRobot' | |
# Shijaz Abdulla - www.awsomenow.com | |
import os | |
import logging | |
import shadow_updater | |
# Get Environment Variables | |
clientId = os.environ.get('AWS_IOT_MQTT_CLIENT_ID') | |
thingName = os.environ.get("AWS_IOT_THING_NAME") | |
host = os.environ.get("AWS_IOT_MQTT_HOST") | |
port = os.environ.get("AWS_IOT_MQTT_PORT_UPDATE") | |
# Configure logging | |
logger = logging.getLogger() | |
logger.setLevel(logging.DEBUG) | |
def get_slots(intent_request): | |
return intent_request['currentIntent']['slots'] | |
def close(session_attributes, fulfillment_state, message): | |
response = { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'Close', | |
'fulfillmentState': fulfillment_state, | |
'message': message | |
} | |
} | |
return response | |
def delegate(session_attributes, slots): | |
return { | |
'sessionAttributes': session_attributes, | |
'dialogAction': { | |
'type': 'Delegate', | |
'slots': slots | |
} | |
} | |
def build_validation_result(is_valid, violated_slot, message_content): | |
if message_content is None: | |
return { | |
"isValid": is_valid, | |
"violatedSlot": violated_slot, | |
} | |
return { | |
'isValid': is_valid, | |
'violatedSlot': violated_slot, | |
'message': {'contentType': 'PlainText', 'content': message_content} | |
} | |
def validate_direction(move_direction): | |
move_directions = ['FORWARD', 'BACKWARD', 'LEFT', 'RIGHT', 'STOP'] | |
if move_direction is not None and move_direction not in move_directions: | |
return build_validation_result(False, | |
'Direction', | |
'Robot cannot go {}, would you like a different direction?'.format(move_direction)) | |
def move_robot(intent_request): | |
move_direction = get_slots(intent_request)["Direction"] | |
source = intent_request['invocationSource'] | |
if source == 'DialogCodeHook': | |
# Perform basic validation on the supplied input slots. | |
slots = get_slots(intent_request) | |
validation_result = validate_direction(move_direction) | |
if not validation_result['isValid']: | |
slots[validation_result['violatedSlot']] = None | |
return elicit_slot(intent_request['sessionAttributes'], | |
intent_request['currentIntent']['name'], | |
slots, | |
validation_result['violatedSlot'], | |
validation_result['message']) | |
output_session_attributes = intent_request['sessionAttributes'] | |
return delegate(output_session_attributes, get_slots(intent_request)) | |
# Update thing shadow with a direction | |
new_value_dict = {"move_direction":move_direction.upper()} | |
shadow_updater.update_shadow(new_value_dict) | |
# Update thing shadow with a stop to allow two similar updates consecutively | |
# -- begin -- | |
# remove the IF statement line and keep the two lines under it (minus indents) if you want the robot to stop automatically during FORWARD and BACKWARD | |
if move_direction != 'Forward' or move_direction != 'Backward': #allow the robot to run forward/backward indefinitely till the user says 'stop' | |
new_value_dict = {"move_direction":"STOP"} | |
shadow_updater.update_shadow(new_value_dict) | |
# -- end -- | |
# Send appropriate message to user via Lex while closing | |
if move_direction != 'Stop': | |
return close(intent_request['sessionAttributes'], | |
'Fulfilled', | |
{'contentType': 'PlainText', | |
'content': 'Moving {}.'.format(move_direction)}) | |
else: | |
return close(intent_request['sessionAttributes'], | |
'Fulfilled', | |
{'contentType': 'PlainText', | |
'content': 'Robot is stopping.'}) | |
# Intents | |
def dispatch(intent_request): | |
# Called when the user specifies an intent for this bot. | |
logger.debug('dispatch userId={}, intentName={}'.format(intent_request['userId'], intent_request['currentIntent']['name'])) | |
intent_name = intent_request['currentIntent']['name'] | |
# Dispatch to your bot's intent handlers | |
if intent_name == 'Move': | |
return move_robot(intent_request) | |
raise Exception('Intent with name ' + intent_name + ' not supported') | |
# Main handler | |
def lambda_handler(event, context): | |
logger.debug('event.bot.name={}'.format(event['bot']['name'])) | |
return dispatch(event) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment