Last active
September 30, 2020 12:55
-
-
Save shijaz/aebf4fd5803bb6a6e877b82c867a6fbc to your computer and use it in GitHub Desktop.
shadow_listener.py to control the robot using AWS IoT and Amazon Lex using AWS Lambda functions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# shadow_listener.py - Looks for messages on the device shadow using MQTT and performs actions (move and see) | |
# Copyright (C) 2019 Shijaz Abdulla - www.awsomenow.com | |
#This bit runs on the RPi | |
import os, logging, subprocess, time, argparse, datetime | |
from bottle import route, request, response, redirect, hook, error, default_app, view, static_file, template, HTTPError | |
from gpiozero import CamJamKitRobot | |
import datetime, pygame, ssl | |
import paho.mqtt.client as mqtt | |
#from picamera import PiCamera | |
import boto3 | |
from contextlib import closing | |
global response | |
class txt: | |
HEADER = '\033[95m' | |
OKBLUE = '\033[94m' | |
OKGREEN = '\033[92m' | |
WARNING = '\033[93m' | |
FAIL = '\033[91m' | |
ENDC = '\033[0m' | |
BOLD = '\033[1m' | |
UNDERLINE = '\033[4m' | |
# Set variables from env | |
host = os.environ.get("AWS_IOT_MQTT_HOST") | |
port = int(os.environ.get("AWS_IOT_MQTT_PORT")) | |
rootCAPath = os.environ.get("AWS_IOT_ROOT_CA_FILENAME") | |
certificatePath = os.environ.get("AWS_IOTCERTIFICATE_FILENAME") | |
privateKeyPath = os.environ.get("AWS_PRIVATE_KEY_FILENAME") | |
publicKeyPath = os.environ.get("AWS_PUBLIC_KEY_FILENAME") | |
thingName = os.environ.get("AWS_IOT_THING_NAME") | |
clientId = os.environ.get("AWS_IOT_MQTT_CLIENT_ID") | |
robot = CamJamKitRobot() | |
keepalive_interval=45 | |
shadow_update_topic = "$aws/things/{0}/shadow/update".format(thingName) | |
shadow_update_accepted_topic = "$aws/things/{0}/shadow/update/accepted".format(thingName) | |
shadow_update_rejected_topic = "$aws/things/{0}/shadow/update/rejected".format(thingName) | |
shadow_update_delta_topic = "$aws/things/{0}/shadow/update/delta".format(thingName) | |
shadow_get_topic = "$aws/things/{0}/shadow/get".format(thingName) | |
shadow_get_accepted_topic = "$aws/things/{0}/shadow/get/accepted".format(thingName) | |
shadow_get_rejected_topic = "$aws/things/{0}/shadow/get/rejected".format(thingName) | |
shadow_state_doc_direction_forward = """{"state" : {"reported" : {"move_direction" : "FORWARD"}}}""" | |
shadow_state_doc_direction_backward = """{"state" : {"reported" : {"move_direction" : "BACKWARD"}}}""" | |
shadow_state_doc_direction_left = """{"state" : {"reported" : {"move_direction" : "LEFT"}}}""" | |
shadow_state_doc_direction_right = """{"state" : {"reported" : {"move_direction" : "RIGHT"}}}""" | |
shadow_state_doc_direction_stopped = """{"state" : {"reported" : {"move_direction" : "STOP"}}}""" | |
shadow_state_doc_cheese_see = """{"state" : {"reported" : {"cheese_option" : "SEE"}}}""" | |
shadow_state_doc_cheese_stop = """{"state" : {"reported" : {"cheese_option" : "STOP"}}}""" | |
# Logger information | |
logger = logging.getLogger(__name__) | |
logger.setLevel(logging.INFO) | |
streamHandler = logging.StreamHandler() | |
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
streamHandler.setFormatter(formatter) | |
logger.addHandler(streamHandler) | |
# Initialize MQTT Client | |
mqttclient = mqtt.Client("client2") | |
def playsound(mp3file): | |
pygame.mixer.init() | |
pygame.mixer.music.load(mp3file) | |
pygame.mixer.music.set_volume(1.0) | |
pygame.mixer.music.play() | |
# I swapped my left and right functions because of the way I wired my motors. You may or may not have to switch them back. | |
def action_right(): | |
playsound("./speech/right.mp3") | |
robot.left() | |
time.sleep(0.2) | |
robot.stop() | |
def action_stop(): | |
robot.stop() | |
def action_left(): | |
playsound("./speech/left.mp3") | |
robot.right() | |
time.sleep(0.2) | |
robot.stop() | |
def action_forward(): | |
playsound("./speech/forward.mp3") | |
robot.forward() | |
time.sleep(0.3) | |
robot.stop() | |
def action_back(): | |
playsound("./speech/backward.mp3") | |
robot.backward() | |
time.sleep(0.3) | |
robot.stop() | |
def ultrasonic(): | |
return "{:.2f}".format(sensor.distance) | |
def detect_labels(bucket, key, max_labels=1, min_confidence=85, region="eu-west-1"): | |
print(txt.OKBLUE + "Recognizing objects with minimum confidence " + txt.WARNING + str(min_confidence) + txt.OKBLUE + "%..." + txt.ENDC) | |
playsound("./speech/recognizing.mp3") | |
rekognition = boto3.client("rekognition", region) | |
rekogresponse = rekognition.detect_labels( | |
Image={ | |
"S3Object": { | |
"Bucket": bucket, | |
"Name": key, | |
} | |
}, | |
MaxLabels=max_labels, | |
MinConfidence=min_confidence, | |
) | |
return rekogresponse['Labels'] | |
def cheese(): | |
s3=boto3.client('s3') | |
polly=boto3.client("polly") | |
#Generate a filename for the picture on S3 | |
now=datetime.datetime.now() | |
KEY = now.strftime("%Y%m%d%H%M%S")+".jpg" | |
#S3 bucket name for upload | |
BUCKET = "rekogrobot" | |
#take a picture without storing locally and send it to S3 | |
response.content_type = 'image/jpeg' | |
response.cache_control = 'no-store' | |
print (txt.OKBLUE + "Robot is looking..." + txt.ENDC) | |
playsound("./speech/looking.mp3") | |
with subprocess.Popen(["raspistill", "-w", "400", "-h", "300", "-o", "-"], stdout=subprocess.PIPE) as proc: | |
#playsound("./speech/sending.mp3") | |
s3.put_object(ACL='public-read', Bucket=BUCKET, Key=KEY, Body=proc.stdout.read()) | |
s3location="https://"+BUCKET+".s3-eu-west-1.amazonaws.com/"+KEY | |
labels=detect_labels(BUCKET,KEY) | |
if labels: | |
#Print and read out response using Polly | |
print(txt.OKGREEN + "\nResponse JSON from Rekognition: " + txt.ENDC) | |
print(labels) | |
for label in labels: | |
roundedconfidence=round(label['Confidence'],2) | |
speech= "I see a {Name} and I am ".format(**label) | |
speech= speech + str(roundedconfidence) + "% sure." | |
print("\n" + txt.OKGREEN + speech + txt.ENDC) | |
print(txt.OKBLUE + "\nRobot is speaking..." + txt.ENDC) | |
try: | |
pollyresponse = polly.synthesize_speech(Text=speech, OutputFormat="mp3", VoiceId="Salli") | |
except (BotoCoreError, ClientError) as error: | |
print(txt.FAIL + error + txt.ENDC) | |
sys.exit(-1) | |
if "AudioStream" in pollyresponse: | |
with closing(pollyresponse["AudioStream"]) as stream: | |
output = "./speech.mp3" | |
try: | |
with open(output, "wb") as file: | |
file.write(stream.read()) | |
except IOError as error: | |
print(txt.FAIL + error + txt.ENDC) | |
sys.exit(-1) | |
else: | |
print(txt.FAIL + "Could not stream audio" + txt.ENDC) | |
sys.exit(-1) | |
#open the file | |
pygame.mixer.init() | |
pygame.mixer.music.load(output) | |
pygame.mixer.music.set_volume(1.0) | |
pygame.mixer.music.play() | |
#Use the below if you want to code to wait till the robot finishes speaking | |
#while pygame.mixer.music.get_busy()==True: | |
# continue | |
else: | |
#Play a message that the robot couldn't identify any objects. | |
print(txt.WARNING + "I could not recognize any objects." + txt.ENDC) | |
pygame.mixer.init() | |
pygame.mixer.music.load("./notfound_Salli.mp3") | |
pygame.mixer.music.set_volume(1.0) | |
pygame.mixer.music.play() | |
#return b | |
def Direction_Status_Change(shadow_state_doc, Type): | |
# Parse Status from Shadow | |
desired_direction = "" | |
desired_cheese="" | |
print (txt.OKGREEN + "\nShadow JSON:" + txt.ENDC) | |
shadow_state_doc = json.loads(shadow_state_doc) | |
print (shadow_state_doc) | |
if Type == "DELTA": | |
if 'move_direction' in shadow_state_doc['state']: | |
desired_direction = shadow_state_doc['state']['move_direction'] | |
if 'cheese_option' in shadow_state_doc['state']: | |
desired_cheese = shadow_state_doc['state']['cheese_option'] | |
elif Type == "GET_REQ": | |
desired_direction = shadow_state_doc['state']['desired']['move_direction'] | |
desired_cheese=shadow_state_doc['state']['desired']['cheese_option'] | |
# if not desired_direction== "STOP": print (txt.OKGREEN + "Desired Direction: " + desired_direction + txt.ENDC) | |
# if not desired_direction=="STOP": print (txt.OKGREEN + "Desired See Action: " + desired_cheese + txt.ENDC) | |
# Control Direction | |
if not (desired_direction=="STOP" or desired_direction is None or desired_direction==""): print (txt.OKBLUE + "\nMoving " + desired_direction + "..." + txt.ENDC) | |
if desired_direction == "FORWARD": | |
action_forward() | |
# Report direction back to Shadow | |
shadow_state_doc_direction=shadow_state_doc_direction_forward | |
elif desired_direction == "BACKWARD": | |
action_back() | |
# Report direction back to Shadow | |
shadow_state_doc_direction=shadow_state_doc_direction_backward | |
elif desired_direction == "RIGHT": | |
action_right() | |
# Report direction back to Shadow | |
shadow_state_doc_direction=shadow_state_doc_direction_right | |
elif desired_direction == "LEFT": | |
action_left() | |
# Report direction back to Shadow | |
shadow_state_doc_direction=shadow_state_doc_direction_left | |
elif desired_direction == "STOP": | |
action_stop() | |
# Report direction back to Shadow | |
shadow_state_doc_direction=shadow_state_doc_direction_stopped | |
else: | |
print (txt.WARNING + "No command to move robot in IoT shadow." + txt.ENDC) | |
if not (desired_direction=="STOP" or desired_direction is None or desired_direction==""): print (txt.OKBLUE + "\nDirection set to " + desired_direction + ". Reporting status to IoT shadow..." + txt.ENDC) | |
if desired_direction!="": | |
mqttclient.publish(shadow_update_topic,shadow_state_doc_direction,qos=1) | |
# See Object | |
if desired_cheese == "SEE": print (txt.OKGREEN + "Robot vision command found in IoT shadow: " + txt.WARNING + desired_cheese + txt.ENDC) | |
if desired_cheese == "SEE": | |
cheese() | |
shadow_state_doc_cheese=shadow_state_doc_cheese_see | |
mqttclient.publish(shadow_update_topic,shadow_state_doc_cheese,qos=1) | |
else: | |
shadow_state_doc_cheese=shadow_state_doc_cheese_stop | |
mqttclient.publish(shadow_update_topic,shadow_state_doc_cheese,qos=1) | |
# We shall subscribe to Shadow Accepted and Rejected Topics in this function | |
def on_connect(self, mosq, obj, rc): | |
print (txt.OKBLUE + "Connected to AWS IoT." + txt.ENDC) | |
# Subscribe to Delta Topic | |
mqttclient.subscribe(shadow_update_delta_topic, 1) | |
# Subscribe to Update Topic | |
#mqttc.subscribe(shadow_update_topic, 1) | |
# Subscribe to Update Accepted and Rejected Topics | |
mqttclient.subscribe(shadow_update_accepted_topic, 1) | |
mqttclient.subscribe(shadow_update_rejected_topic, 1) | |
# Subscribe to Get Accepted and Rejected Topics | |
mqttclient.subscribe(shadow_get_accepted_topic, 1) | |
mqttclient.subscribe(shadow_get_rejected_topic, 1) | |
# This function will be invoked every time | |
# a new message arrives for the subscribed topic | |
def on_message(mosq, obj, msg): | |
if str(msg.topic) == shadow_update_delta_topic: | |
print (txt.OKGREEN + "\nNew delta message received..." + txt.ENDC) | |
shadow_state_delta = (msg.payload).decode('utf-8') | |
print (shadow_state_delta) | |
Direction_Status_Change(shadow_state_delta, "DELTA") | |
elif str(msg.topic) == shadow_get_accepted_topic: | |
print (txt.OKGREEN + "\nReceived state doc with Get request..." + txt.ENDC) | |
shadow_state_doc = (msg.payload).decode('utf-8') | |
print (shadow_state_doc) | |
Direction_Status_Change(shadow_state_doc, "GET_REQ") | |
elif str(msg.topic) == shadow_get_rejected_topic: | |
shadow_get_error = (msg.payload).decode('utf-8') | |
print (txt.FAIL + "\n---ERROR--- Unable to fetch Shadow Doc...\nError Response: " + txt.ENDC + shadow_get_error) | |
elif str(msg.topic) == shadow_update_accepted_topic: | |
print (txt.OKGREEN + "\nStatus change updated in IoT Shadow." + txt.ENDC) | |
print (txt.OKGREEN + "\nResponse JSON:" + txt.ENDC + '\n' + (msg.payload).decode('utf-8')) | |
elif str(msg.topic) == shadow_update_rejected_topic: | |
shadow_update_error = (msg.payload).decode('utf-8') | |
print (txt.FAIL + "\n---ERROR--- Failed to Update the Shadow...\nError Response: " + txt.ENDC + shadow_update_error) | |
else: | |
print (txt.OKGREEN + "AWS Response Topic: " + txt.ENDC + str(msg.topic)) | |
print (txt.OKGREEN + "QoS: " + txt.ENDC + str(msg.qos)) | |
print (txt.OKGREEN + "Payload: " + txt.ENDC + str(msg.payload).decode('utf-8')) | |
def on_subscribe(mosq, obj, mid, granted_qos): | |
#As we are subscribing to 3 Topics, wait till all 3 topics get subscribed | |
#for each subscription mid will get incremented by 1 (starting with 1) | |
if mid == 3: | |
# Fetch current Shadow status. Useful for reconnection scenario. | |
mqttclient.publish(shadow_get_topic,"",qos=1) | |
def on_disconnect(client, userdata, rc): | |
if rc != 0: | |
print (txt.WARNING + "Disconnected from AWS IoT. Trying to auto-reconnect..." + txt.ENDC) | |
# Register callback functions | |
mqttclient.on_message = on_message | |
mqttclient.on_connect = on_connect | |
mqttclient.on_subscribe = on_subscribe | |
mqttclient.on_disconnect = on_disconnect | |
def main(): | |
print(txt.OKBLUE + "Robot is now ready." + txt.ENDC) | |
playsound("./robotready_Salli.mp3") | |
#pygame.mixer.init() | |
#pygame.mixer.music.load("./robotready_Salli.mp3") | |
#pygame.mixer.music.set_volume(1.0) | |
#pygame.mixer.music.play() | |
parser = argparse.ArgumentParser() | |
# Verbose mode | |
parser.add_argument("--verbose", "-v", help="increase output verbosity", action="store_true") | |
args = parser.parse_args() | |
if args.verbose: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
log = logging.getLogger(__name__) | |
try: | |
robot.stop() | |
except Exception as e: | |
log.error(e) | |
print ("error") | |
exit() | |
print (txt.OKGREEN + "Root CA Path: " + txt.ENDC + rootCAPath) | |
print (txt.OKGREEN + "Certificate Path: " + txt.ENDC + certificatePath) | |
print (txt.OKGREEN + "Private Key Path: " + txt.ENDC + privateKeyPath) | |
# Configure TLS Set | |
mqttclient.tls_set(rootCAPath, certfile=certificatePath, keyfile=privateKeyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) | |
# Connect with MQTT Broker | |
mqttclient.connect(host, port, keepalive_interval) | |
# Continue monitoring the incoming messages for subscribed topic | |
mqttclient.loop_forever() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment