Skip to content

Instantly share code, notes, and snippets.

@AshMartian
Created January 12, 2019 06:15
Show Gist options
  • Save AshMartian/eb46959adfa6899976e7d5d3ce455213 to your computer and use it in GitHub Desktop.
Save AshMartian/eb46959adfa6899976e7d5d3ce455213 to your computer and use it in GitHub Desktop.
Vector Sees
import argparse
import matplotlib.pyplot as plt
from mxnet import nd, image, img
from mxnet.gluon.data.vision import transforms
from gluoncv.model_zoo import get_model
from gluoncv.data.transforms.presets.imagenet import transform_eval
# Load Model
class Prediction(object):
def __init__(self, human, score):
self.description = human
self.score = score
def detect_image(image, model="resnet152_v2"):
net = get_model(model, pretrained=True)
# Load Images
decoded_img = img.imdecode(image)
# Transform
decoded_img = transform_eval(decoded_img)
pred = net(decoded_img)
topK = 5
ind = nd.topk(pred, k=topK)[0].astype('int')
results = []
for i in range(topK):
print('\t[%s], with probability %.3f.'%
(net.classes[ind[i].asscalar()], nd.softmax(pred)[0][ind[i]].asscalar()))
human_string = net.classes[ind[i].asscalar()]
score = nd.softmax(pred)[0][ind[i]].asscalar()
prediction = Prediction(human_string, score)
results.append(prediction)
return results
import anki_vector
from anki_vector.events import Events
#from google.cloud import vision
#from google.cloud.vision import types
import time
import datetime
import io
import random
import functools
import threading
import asyncio
import mx_classify
import sys
evt = threading.Event()
#client = vision.ImageAnnotatorClient()
saw_face = False
saw_face_time = time.time()
runtime = 0
max_runtime = 360
images_processed = 0
last_image = 0
async def main(loop):
async def auto_reconnect(conn: anki_vector.connection.Connection):
await conn.control_lost_event.wait()
def on_robot_status_update(robot, event_type, event):
#print("Got status update")
battery_state = robot.get_battery_state()
if battery_state.battery_level == 3 and battery_state.is_on_charger_platform:
print("{0}: Requesting control to explore".format(datetime.datetime.now()))
robot.conn.request_control()
robot.behavior.drive_off_charger()
robot.conn.release_control()
time.sleep(5);
else:
if saw_face and last_image < time.time() - 6:
image_recognition(robot)
elif last_image < time.time() - 45:
image_recognition(robot, probability=0.4)
def on_robot_observed_object(robot, event_type, event):
global saw_face, last_image
if saw_face and last_image < time.time() - 2:
image_recognition(robot)
def on_robot_observed_face(robot, event_type, event):
for face in robot.world.visible_faces:
print(f"Face ID: {face.face_id}")
if not face.face_id == -1:
battery_state = robot.get_battery_state()
if not battery_state.is_on_charger_platform:
global saw_face, saw_face_time
if not saw_face:
saw_face = True
saw_face_time = time.time()
#await robot.conn.control_lost_event.wait()
robot.conn.request_control(timeout=5.0)
robot.say_text("I am now going to start saying what I see for a few minutes.")
robot.conn.release_control()
time.sleep(5);
break
def on_robot_wake_word():
global saw_face
saw_face = False
async def setup_vector():
with anki_vector.Robot(requires_behavior_control=False, enable_camera_feed=True, enable_face_detection=True) as robot:
try:
on_robot_observed_face_event = functools.partial(on_robot_observed_face, robot)
on_robot_observed_object_event = functools.partial(on_robot_observed_object, robot)
on_robot_status_update_event = functools.partial(on_robot_status_update, robot)
robot.events.subscribe(on_robot_observed_face_event, Events.robot_observed_face)
robot.events.subscribe(on_robot_status_update_event, Events.robot_state)
robot.events.subscribe(on_robot_observed_object_event, Events.robot_observed_object)
robot.events.subscribe(on_robot_observed_object_event, Events.object_appeared)
robot.events.subscribe(on_robot_observed_object_event, Events.object_stopped_moving)
robot.events.subscribe(on_robot_wake_word, Events.wake_word)
global saw_face, images_processed, saw_face_time
while True:
time.sleep(5)
if saw_face and saw_face_time < time.time() - max_runtime or images_processed > 60:
saw_face = False
images_processed = 0
print("Not saying things anymore")
except anki_vector.exceptions.VectorNotFoundException:
print("Unable to connect to Vector, re-trying in 20 seconds")
time.sleep(20)
await setup_vector()
except KeyboardInterrupt:
pass
robot.events.unsubscribe(on_robot_observed_face_event, Events.robot_observed_face)
robot.events.unsubscribe(on_robot_observed_object_event, Events.robot_observec_object)
robot.events.unsubscribe(on_robot_observed_object_event, Events.object_appeared)
robot.events.unsubscribe(on_robot_status_update_event, Events.robot_state)
def image_recognition(robot, probability=0.15):
try:
global images_processed, last_image
battery_state = robot.get_battery_state()
if battery_state:
if not battery_state.is_on_charger_platform and not robot.status.is_carrying_block and not robot.status.is_charging and not robot.status.is_animating and not robot.status.is_pathing:
print("Getting image from vector")
robot.camera.init_camera_feed()
image = robot.camera.latest_image
#robot.camera.close_camera_feed()
output = io.BytesIO()
image.save(output, format='JPEG')
#image_data = types.Image(content=output.getvalue())
#get_text(robot, image_data)
get_label(robot, output, probability=probability)
last_image = time.time()
images_processed = images_processed + 1
#robot.conn.close()
#print("{1}: Vector battery voltage: {0}".format(battery_state.battery_volts, datetime.datetime.now()))
except AttributeError as e:
print(e)
except Exception as e:
print("Unexpected error:", sys.exc_info()[0])
print(e)
print("Could not get image")
models = [ "resnet152_v2", "vgg19_bn", "alexnet", "densenet201", "squeezenet1.1" ]
def get_label(robot, image_data, attempt=1, probability=0.15):
if attempt > 7:
image_recognition(robot)
print("Getting labels with " + models[attempt])
labels = mx_classify.detect_image(image_data.getvalue(), models[attempt])
#response = client.label_detection(image=image_data)
#labels = response.label_annotations
possibilities = []
for label in labels:
if label.score > probability and "auto" not in label.description and "product" not in label.description and "technology" not in label.description:
possibilities.append(label)
if len(possibilities) > 0:
take_control_and_say(robot, possibilities)
else:
get_label(robot, image_data, attempt + 1, probability)
def get_text(robot, image_data):
if random.randint(0,3) == 2:
get_label(robot, image_data)
print("Getting text")
response = client.text_detection(image=image_data)
texts = response.text_annotations
possibilities = []
for text in texts:
if text.description and text.locale == "en":
possibilities.append(text)
take_control_and_say(robot, [text])
break
if len(possibilities) == 0:
get_label(robot, image_data)
def take_control_and_say(robot, text):
print("------ Vector saying %s --------" % text)
if len(text) == 2:
text_say = str(text[0].description) + " and " + str(text[1].description)
else:
text_say = random.choice(text).description
#await conn.control_lost_event.wait()
robot.conn.request_control(timeout=5.0)
robot.say_text(text_say)
robot.conn.release_control()
time.sleep(5);
await setup_vector()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment