Skip to content

Instantly share code, notes, and snippets.

@shijaz
Last active September 30, 2020 12:55
Show Gist options
  • Save shijaz/aebf4fd5803bb6a6e877b82c867a6fbc to your computer and use it in GitHub Desktop.
Save shijaz/aebf4fd5803bb6a6e877b82c867a6fbc to your computer and use it in GitHub Desktop.
shadow_listener.py to control the robot using AWS IoT and Amazon Lex using AWS Lambda functions
# shadow_listener.py - Looks for messages on the device shadow using MQTT and performs actions (move and see)
# Copyright (C) 2019 Shijaz Abdulla - www.awsomenow.com
#This bit runs on the RPi
import os, logging, subprocess, time, argparse, datetime
from bottle import route, request, response, redirect, hook, error, default_app, view, static_file, template, HTTPError
from gpiozero import CamJamKitRobot
import datetime, pygame, ssl
import paho.mqtt.client as mqtt
#from picamera import PiCamera
import boto3
from contextlib import closing
global response
class txt:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# Set variables from env
host = os.environ.get("AWS_IOT_MQTT_HOST")
port = int(os.environ.get("AWS_IOT_MQTT_PORT"))
rootCAPath = os.environ.get("AWS_IOT_ROOT_CA_FILENAME")
certificatePath = os.environ.get("AWS_IOTCERTIFICATE_FILENAME")
privateKeyPath = os.environ.get("AWS_PRIVATE_KEY_FILENAME")
publicKeyPath = os.environ.get("AWS_PUBLIC_KEY_FILENAME")
thingName = os.environ.get("AWS_IOT_THING_NAME")
clientId = os.environ.get("AWS_IOT_MQTT_CLIENT_ID")
robot = CamJamKitRobot()
keepalive_interval=45
shadow_update_topic = "$aws/things/{0}/shadow/update".format(thingName)
shadow_update_accepted_topic = "$aws/things/{0}/shadow/update/accepted".format(thingName)
shadow_update_rejected_topic = "$aws/things/{0}/shadow/update/rejected".format(thingName)
shadow_update_delta_topic = "$aws/things/{0}/shadow/update/delta".format(thingName)
shadow_get_topic = "$aws/things/{0}/shadow/get".format(thingName)
shadow_get_accepted_topic = "$aws/things/{0}/shadow/get/accepted".format(thingName)
shadow_get_rejected_topic = "$aws/things/{0}/shadow/get/rejected".format(thingName)
shadow_state_doc_direction_forward = """{"state" : {"reported" : {"move_direction" : "FORWARD"}}}"""
shadow_state_doc_direction_backward = """{"state" : {"reported" : {"move_direction" : "BACKWARD"}}}"""
shadow_state_doc_direction_left = """{"state" : {"reported" : {"move_direction" : "LEFT"}}}"""
shadow_state_doc_direction_right = """{"state" : {"reported" : {"move_direction" : "RIGHT"}}}"""
shadow_state_doc_direction_stopped = """{"state" : {"reported" : {"move_direction" : "STOP"}}}"""
shadow_state_doc_cheese_see = """{"state" : {"reported" : {"cheese_option" : "SEE"}}}"""
shadow_state_doc_cheese_stop = """{"state" : {"reported" : {"cheese_option" : "STOP"}}}"""
# Logger information
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Initialize MQTT Client
mqttclient = mqtt.Client("client2")
def playsound(mp3file):
pygame.mixer.init()
pygame.mixer.music.load(mp3file)
pygame.mixer.music.set_volume(1.0)
pygame.mixer.music.play()
# I swapped my left and right functions because of the way I wired my motors. You may or may not have to switch them back.
def action_right():
playsound("./speech/right.mp3")
robot.left()
time.sleep(0.2)
robot.stop()
def action_stop():
robot.stop()
def action_left():
playsound("./speech/left.mp3")
robot.right()
time.sleep(0.2)
robot.stop()
def action_forward():
playsound("./speech/forward.mp3")
robot.forward()
time.sleep(0.3)
robot.stop()
def action_back():
playsound("./speech/backward.mp3")
robot.backward()
time.sleep(0.3)
robot.stop()
def ultrasonic():
return "{:.2f}".format(sensor.distance)
def detect_labels(bucket, key, max_labels=1, min_confidence=85, region="eu-west-1"):
print(txt.OKBLUE + "Recognizing objects with minimum confidence " + txt.WARNING + str(min_confidence) + txt.OKBLUE + "%..." + txt.ENDC)
playsound("./speech/recognizing.mp3")
rekognition = boto3.client("rekognition", region)
rekogresponse = rekognition.detect_labels(
Image={
"S3Object": {
"Bucket": bucket,
"Name": key,
}
},
MaxLabels=max_labels,
MinConfidence=min_confidence,
)
return rekogresponse['Labels']
def cheese():
s3=boto3.client('s3')
polly=boto3.client("polly")
#Generate a filename for the picture on S3
now=datetime.datetime.now()
KEY = now.strftime("%Y%m%d%H%M%S")+".jpg"
#S3 bucket name for upload
BUCKET = "rekogrobot"
#take a picture without storing locally and send it to S3
response.content_type = 'image/jpeg'
response.cache_control = 'no-store'
print (txt.OKBLUE + "Robot is looking..." + txt.ENDC)
playsound("./speech/looking.mp3")
with subprocess.Popen(["raspistill", "-w", "400", "-h", "300", "-o", "-"], stdout=subprocess.PIPE) as proc:
#playsound("./speech/sending.mp3")
s3.put_object(ACL='public-read', Bucket=BUCKET, Key=KEY, Body=proc.stdout.read())
s3location="https://"+BUCKET+".s3-eu-west-1.amazonaws.com/"+KEY
labels=detect_labels(BUCKET,KEY)
if labels:
#Print and read out response using Polly
print(txt.OKGREEN + "\nResponse JSON from Rekognition: " + txt.ENDC)
print(labels)
for label in labels:
roundedconfidence=round(label['Confidence'],2)
speech= "I see a {Name} and I am ".format(**label)
speech= speech + str(roundedconfidence) + "% sure."
print("\n" + txt.OKGREEN + speech + txt.ENDC)
print(txt.OKBLUE + "\nRobot is speaking..." + txt.ENDC)
try:
pollyresponse = polly.synthesize_speech(Text=speech, OutputFormat="mp3", VoiceId="Salli")
except (BotoCoreError, ClientError) as error:
print(txt.FAIL + error + txt.ENDC)
sys.exit(-1)
if "AudioStream" in pollyresponse:
with closing(pollyresponse["AudioStream"]) as stream:
output = "./speech.mp3"
try:
with open(output, "wb") as file:
file.write(stream.read())
except IOError as error:
print(txt.FAIL + error + txt.ENDC)
sys.exit(-1)
else:
print(txt.FAIL + "Could not stream audio" + txt.ENDC)
sys.exit(-1)
#open the file
pygame.mixer.init()
pygame.mixer.music.load(output)
pygame.mixer.music.set_volume(1.0)
pygame.mixer.music.play()
#Use the below if you want to code to wait till the robot finishes speaking
#while pygame.mixer.music.get_busy()==True:
# continue
else:
#Play a message that the robot couldn't identify any objects.
print(txt.WARNING + "I could not recognize any objects." + txt.ENDC)
pygame.mixer.init()
pygame.mixer.music.load("./notfound_Salli.mp3")
pygame.mixer.music.set_volume(1.0)
pygame.mixer.music.play()
#return b
def Direction_Status_Change(shadow_state_doc, Type):
# Parse Status from Shadow
desired_direction = ""
desired_cheese=""
print (txt.OKGREEN + "\nShadow JSON:" + txt.ENDC)
shadow_state_doc = json.loads(shadow_state_doc)
print (shadow_state_doc)
if Type == "DELTA":
if 'move_direction' in shadow_state_doc['state']:
desired_direction = shadow_state_doc['state']['move_direction']
if 'cheese_option' in shadow_state_doc['state']:
desired_cheese = shadow_state_doc['state']['cheese_option']
elif Type == "GET_REQ":
desired_direction = shadow_state_doc['state']['desired']['move_direction']
desired_cheese=shadow_state_doc['state']['desired']['cheese_option']
# if not desired_direction== "STOP": print (txt.OKGREEN + "Desired Direction: " + desired_direction + txt.ENDC)
# if not desired_direction=="STOP": print (txt.OKGREEN + "Desired See Action: " + desired_cheese + txt.ENDC)
# Control Direction
if not (desired_direction=="STOP" or desired_direction is None or desired_direction==""): print (txt.OKBLUE + "\nMoving " + desired_direction + "..." + txt.ENDC)
if desired_direction == "FORWARD":
action_forward()
# Report direction back to Shadow
shadow_state_doc_direction=shadow_state_doc_direction_forward
elif desired_direction == "BACKWARD":
action_back()
# Report direction back to Shadow
shadow_state_doc_direction=shadow_state_doc_direction_backward
elif desired_direction == "RIGHT":
action_right()
# Report direction back to Shadow
shadow_state_doc_direction=shadow_state_doc_direction_right
elif desired_direction == "LEFT":
action_left()
# Report direction back to Shadow
shadow_state_doc_direction=shadow_state_doc_direction_left
elif desired_direction == "STOP":
action_stop()
# Report direction back to Shadow
shadow_state_doc_direction=shadow_state_doc_direction_stopped
else:
print (txt.WARNING + "No command to move robot in IoT shadow." + txt.ENDC)
if not (desired_direction=="STOP" or desired_direction is None or desired_direction==""): print (txt.OKBLUE + "\nDirection set to " + desired_direction + ". Reporting status to IoT shadow..." + txt.ENDC)
if desired_direction!="":
mqttclient.publish(shadow_update_topic,shadow_state_doc_direction,qos=1)
# See Object
if desired_cheese == "SEE": print (txt.OKGREEN + "Robot vision command found in IoT shadow: " + txt.WARNING + desired_cheese + txt.ENDC)
if desired_cheese == "SEE":
cheese()
shadow_state_doc_cheese=shadow_state_doc_cheese_see
mqttclient.publish(shadow_update_topic,shadow_state_doc_cheese,qos=1)
else:
shadow_state_doc_cheese=shadow_state_doc_cheese_stop
mqttclient.publish(shadow_update_topic,shadow_state_doc_cheese,qos=1)
# We shall subscribe to Shadow Accepted and Rejected Topics in this function
def on_connect(self, mosq, obj, rc):
print (txt.OKBLUE + "Connected to AWS IoT." + txt.ENDC)
# Subscribe to Delta Topic
mqttclient.subscribe(shadow_update_delta_topic, 1)
# Subscribe to Update Topic
#mqttc.subscribe(shadow_update_topic, 1)
# Subscribe to Update Accepted and Rejected Topics
mqttclient.subscribe(shadow_update_accepted_topic, 1)
mqttclient.subscribe(shadow_update_rejected_topic, 1)
# Subscribe to Get Accepted and Rejected Topics
mqttclient.subscribe(shadow_get_accepted_topic, 1)
mqttclient.subscribe(shadow_get_rejected_topic, 1)
# This function will be invoked every time
# a new message arrives for the subscribed topic
def on_message(mosq, obj, msg):
if str(msg.topic) == shadow_update_delta_topic:
print (txt.OKGREEN + "\nNew delta message received..." + txt.ENDC)
shadow_state_delta = (msg.payload).decode('utf-8')
print (shadow_state_delta)
Direction_Status_Change(shadow_state_delta, "DELTA")
elif str(msg.topic) == shadow_get_accepted_topic:
print (txt.OKGREEN + "\nReceived state doc with Get request..." + txt.ENDC)
shadow_state_doc = (msg.payload).decode('utf-8')
print (shadow_state_doc)
Direction_Status_Change(shadow_state_doc, "GET_REQ")
elif str(msg.topic) == shadow_get_rejected_topic:
shadow_get_error = (msg.payload).decode('utf-8')
print (txt.FAIL + "\n---ERROR--- Unable to fetch Shadow Doc...\nError Response: " + txt.ENDC + shadow_get_error)
elif str(msg.topic) == shadow_update_accepted_topic:
print (txt.OKGREEN + "\nStatus change updated in IoT Shadow." + txt.ENDC)
print (txt.OKGREEN + "\nResponse JSON:" + txt.ENDC + '\n' + (msg.payload).decode('utf-8'))
elif str(msg.topic) == shadow_update_rejected_topic:
shadow_update_error = (msg.payload).decode('utf-8')
print (txt.FAIL + "\n---ERROR--- Failed to Update the Shadow...\nError Response: " + txt.ENDC + shadow_update_error)
else:
print (txt.OKGREEN + "AWS Response Topic: " + txt.ENDC + str(msg.topic))
print (txt.OKGREEN + "QoS: " + txt.ENDC + str(msg.qos))
print (txt.OKGREEN + "Payload: " + txt.ENDC + str(msg.payload).decode('utf-8'))
def on_subscribe(mosq, obj, mid, granted_qos):
#As we are subscribing to 3 Topics, wait till all 3 topics get subscribed
#for each subscription mid will get incremented by 1 (starting with 1)
if mid == 3:
# Fetch current Shadow status. Useful for reconnection scenario.
mqttclient.publish(shadow_get_topic,"",qos=1)
def on_disconnect(client, userdata, rc):
if rc != 0:
print (txt.WARNING + "Disconnected from AWS IoT. Trying to auto-reconnect..." + txt.ENDC)
# Register callback functions
mqttclient.on_message = on_message
mqttclient.on_connect = on_connect
mqttclient.on_subscribe = on_subscribe
mqttclient.on_disconnect = on_disconnect
def main():
print(txt.OKBLUE + "Robot is now ready." + txt.ENDC)
playsound("./robotready_Salli.mp3")
#pygame.mixer.init()
#pygame.mixer.music.load("./robotready_Salli.mp3")
#pygame.mixer.music.set_volume(1.0)
#pygame.mixer.music.play()
parser = argparse.ArgumentParser()
# Verbose mode
parser.add_argument("--verbose", "-v", help="increase output verbosity", action="store_true")
args = parser.parse_args()
if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
try:
robot.stop()
except Exception as e:
log.error(e)
print ("error")
exit()
print (txt.OKGREEN + "Root CA Path: " + txt.ENDC + rootCAPath)
print (txt.OKGREEN + "Certificate Path: " + txt.ENDC + certificatePath)
print (txt.OKGREEN + "Private Key Path: " + txt.ENDC + privateKeyPath)
# Configure TLS Set
mqttclient.tls_set(rootCAPath, certfile=certificatePath, keyfile=privateKeyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
# Connect with MQTT Broker
mqttclient.connect(host, port, keepalive_interval)
# Continue monitoring the incoming messages for subscribed topic
mqttclient.loop_forever()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment