Skip to content

Instantly share code, notes, and snippets.

@shijaz
Created July 15, 2018 19:22
Show Gist options
  • Save shijaz/15225fc67945938b126d3a6064d2d7af to your computer and use it in GitHub Desktop.
Save shijaz/15225fc67945938b126d3a6064d2d7af to your computer and use it in GitHub Desktop.
lambda_function.py for AWS Lambda function - 'MoveRobot'
# lambda_function.py for AWS Lambda function - 'MoveRobot'
# Shijaz Abdulla - www.awsomenow.com
import os
import logging
import shadow_updater
# Get Environment Variables
clientId = os.environ.get('AWS_IOT_MQTT_CLIENT_ID')
thingName = os.environ.get("AWS_IOT_THING_NAME")
host = os.environ.get("AWS_IOT_MQTT_HOST")
port = os.environ.get("AWS_IOT_MQTT_PORT_UPDATE")
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
def get_slots(intent_request):
return intent_request['currentIntent']['slots']
def close(session_attributes, fulfillment_state, message):
response = {
'sessionAttributes': session_attributes,
'dialogAction': {
'type': 'Close',
'fulfillmentState': fulfillment_state,
'message': message
}
}
return response
def delegate(session_attributes, slots):
return {
'sessionAttributes': session_attributes,
'dialogAction': {
'type': 'Delegate',
'slots': slots
}
}
def build_validation_result(is_valid, violated_slot, message_content):
if message_content is None:
return {
"isValid": is_valid,
"violatedSlot": violated_slot,
}
return {
'isValid': is_valid,
'violatedSlot': violated_slot,
'message': {'contentType': 'PlainText', 'content': message_content}
}
def validate_direction(move_direction):
move_directions = ['FORWARD', 'BACKWARD', 'LEFT', 'RIGHT', 'STOP']
if move_direction is not None and move_direction not in move_directions:
return build_validation_result(False,
'Direction',
'Robot cannot go {}, would you like a different direction?'.format(move_direction))
def move_robot(intent_request):
move_direction = get_slots(intent_request)["Direction"]
source = intent_request['invocationSource']
if source == 'DialogCodeHook':
# Perform basic validation on the supplied input slots.
slots = get_slots(intent_request)
validation_result = validate_direction(move_direction)
if not validation_result['isValid']:
slots[validation_result['violatedSlot']] = None
return elicit_slot(intent_request['sessionAttributes'],
intent_request['currentIntent']['name'],
slots,
validation_result['violatedSlot'],
validation_result['message'])
output_session_attributes = intent_request['sessionAttributes']
return delegate(output_session_attributes, get_slots(intent_request))
# Update thing shadow with a direction
new_value_dict = {"move_direction":move_direction.upper()}
shadow_updater.update_shadow(new_value_dict)
# Update thing shadow with a stop to allow two similar updates consecutively
# -- begin --
# remove the IF statement line and keep the two lines under it (minus indents) if you want the robot to stop automatically during FORWARD and BACKWARD
if move_direction != 'Forward' or move_direction != 'Backward': #allow the robot to run forward/backward indefinitely till the user says 'stop'
new_value_dict = {"move_direction":"STOP"}
shadow_updater.update_shadow(new_value_dict)
# -- end --
# Send appropriate message to user via Lex while closing
if move_direction != 'Stop':
return close(intent_request['sessionAttributes'],
'Fulfilled',
{'contentType': 'PlainText',
'content': 'Moving {}.'.format(move_direction)})
else:
return close(intent_request['sessionAttributes'],
'Fulfilled',
{'contentType': 'PlainText',
'content': 'Robot is stopping.'})
# Intents
def dispatch(intent_request):
# Called when the user specifies an intent for this bot.
logger.debug('dispatch userId={}, intentName={}'.format(intent_request['userId'], intent_request['currentIntent']['name']))
intent_name = intent_request['currentIntent']['name']
# Dispatch to your bot's intent handlers
if intent_name == 'Move':
return move_robot(intent_request)
raise Exception('Intent with name ' + intent_name + ' not supported')
# Main handler
def lambda_handler(event, context):
logger.debug('event.bot.name={}'.format(event['bot']['name']))
return dispatch(event)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment