Skip to content

Instantly share code, notes, and snippets.

@samirsogay
Created November 24, 2020 19:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samirsogay/70b7f4850ca538e27f1407110c57ba75 to your computer and use it in GitHub Desktop.
Save samirsogay/70b7f4850ca538e27f1407110c57ba75 to your computer and use it in GitHub Desktop.
Python code for Toddler Following Rover
# -*- coding: utf-8 -*
# mode: 1..hand control, 2..find green area, 3..find squares and recognize, 4..found
# 41..right_step1, 42..rs2, 43..rs3, ...
# 51..left_step1, 52..ls2, 53..ls3, ...
# 61, 62, 63 ... back to the middle
from megapi_python3 import *
import numpy as np
import imutils
import cv2
from imutils.video import VideoStream
from flask import Response
from flask import Flask
from flask import render_template
import threading
import datetime
import time
import os
from tflite_runtime.interpreter import Interpreter
MODEL_NAME = 'Sample_TFLite_model'
GRAPH_NAME = 'detect.tflite'
LABELMAP_NAME = 'labelmap.txt'
min_conf_threshold = float(0.6)
imW, imH = 640, 480
# Get path to current working directory
CWD_PATH = os.getcwd()
# Path to .tflite file, which contains the model that is used for object detection
PATH_TO_CKPT = os.path.join(CWD_PATH,MODEL_NAME,GRAPH_NAME)
# Path to label map file
PATH_TO_LABELS = os.path.join(CWD_PATH,MODEL_NAME,LABELMAP_NAME)
# Load the label map
with open(PATH_TO_LABELS, 'r') as f:
labels = [line.strip() for line in f.readlines()]
# Have to do a weird fix for label map if using the COCO "starter model" from
# https://www.tensorflow.org/lite/models/object_detection/overview
# First label is '???', which has to be removed.
if labels[0] == '???':
del(labels[0])
interpreter = Interpreter(model_path=PATH_TO_CKPT)
interpreter.allocate_tensors()
# Get model details
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
height = input_details[0]['shape'][1]
width = input_details[0]['shape'][2]
floating_model = (input_details[0]['dtype'] == np.float32)
input_mean = 127.5
input_std = 127.5
# Initialize frame rate calculation
frame_rate_calc = 1
freq = cv2.getTickFrequency()
# initialize the output frame and a lock used to ensure thread-safe
# exchanges of the output frames (useful for multiple browsers/tabs
# are viewing tthe stream)
outputFrame = None
lock = threading.Lock()
detect=1
# initialize a flask object
app = Flask(__name__,static_url_path='/static')
@app.route("/")
def index():
# return the rendered template
return render_template("index.html")
@app.route("/forward")
def forward( ):
global key
key=82
return 'OK'
@app.route("/stop")
def stop( ):
global key
key = ord('t')
return 'OK'
@app.route("/backward")
def backward( ):
global key
key=84
return 'OK'
@app.route("/right")
def right( ):
global key
key = 83
return 'OK'
@app.route("/left")
def left( ):
global key
key=81
return 'OK'
@app.route("/hand")
def hand( ):
global key
key = ord('h')
return 'OK'
@app.route("/auto")
def auto( ):
global key
key=ord('s')
return 'OK'
@app.route("/quit")
def quit( ):
global key
key = ord('q')
return 'OK'
def onRead(v):
global dist
dist = v
def onEnd(v):
stop()
def stop():
bot.encoderMotorRun(1,0)
bot.encoderMotorRun(2,0)
def right(step):
bot.encoderMotorMove(1,120,-step,onEnd)
bot.encoderMotorMove(2,120,-step,onEnd)
def left(step):
bot.encoderMotorMove(1,120,step,onEnd)
bot.encoderMotorMove(2,120,step,onEnd)
def forw(step):
bot.encoderMotorMove(1,150, step, onEnd)
bot.encoderMotorMove(2,150, -step, onEnd)
def back(step):
bot.encoderMotorMove(1,150, -step, onEnd)
bot.encoderMotorMove(2,150, step, onEnd)
def s_forw():
bot.encoderMotorRun(1,150)
bot.encoderMotorRun(2,-150)
def s_left():
bot.encoderMotorRun(1, 120)
bot.encoderMotorRun(2, 120)
def s_right():
bot.encoderMotorRun(1,-120)
bot.encoderMotorRun(2,-120)
def s_back():
bot.encoderMotorRun(1,-150)
bot.encoderMotorRun(2,150)
def dete(im): # Adrian R.
global dist,detect,hold_time_start,end_time,start_time,mode,person
print('Detection begins')
start_time = time.time()
frame_rgb = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
frame_resized = cv2.resize(frame_rgb, (width, height))
input_data = np.expand_dims(frame_resized, axis=0)
# Normalize pixel values if using a floating model (i.e. if model is non-quantized)
if floating_model:
input_data = (np.float32(input_data) - input_mean) / input_std
start_time=time.time()
# Perform the actual detection by running the model with the image as input
interpreter.set_tensor(input_details[0]['index'],input_data)
interpreter.invoke()
end_time=time.time()
zyc=end_time-start_time
# Retrieve detection results
boxes = interpreter.get_tensor(output_details[0]['index'])[0] # Bounding box coordinates of detected objects
classes = interpreter.get_tensor(output_details[1]['index'])[0] # Class index of detected objects
scores = interpreter.get_tensor(output_details[2]['index'])[0] # Confidence of detected objects
end_time = time.time()
person = 0
for i in range(len(scores)):
if ((scores[i] > min_conf_threshold) and (scores[i] <= 1.0) and int(classes[i]) == 0):
print('Confidence greater than 0.6 for class person')
# Get bounding box coordinates and draw box
# Interpreter can return coordinates that are outside of image dimensions, need to force them to be within image using max() and min()
ymin = int(max(1,(boxes[i][0] * imH)))
xmin = int(max(1,(boxes[i][1] * imW)))
ymax = int(min(imH,(boxes[i][2] * imH)))
xmax = int(min(imW,(boxes[i][3] * imW)))
cv2.rectangle(im, (xmin,ymin), (xmax,ymax), (10, 255, 0), 2)
print('found person')
person=1
x = (xmin+xmax)/2
print('x',x)
if x == 0: # nothing found
left(50) # turn left
print('nothing found')
elif x < 240: # M-point too far left
y=240-int(x)
# y=y*2/3
print('y',y)
left(int(y))
sleep(1)
print('Turning left to centre camera')
elif x > 400: # M-point too far right
z=int(x)-400
# z=z*2/3
right(int(z))
sleep(1)
print('Turning right to centre camera')
else: # in the center (230 ... 370)
print('found person in centre')
bot.ultrasonicSensorRead(7,onRead)
sleep(0.1)
a = round(dist)
print('Distance=')
print( str(a))
hold_time_start=time.time()
detect=0
if a == 400:
print('Wrong reading')
sleep(1)
elif a > 200:
forw(250)
print('Forwarding 250 steps')
sleep(1)
return im
# Main Program
camera = VideoStream(resolution=(imW,imH),framerate=30).start()
time.sleep(2.0)
bot = MegaPi()
bot.start()
stop()
mode = 1 # hand control
frcnt = 40 # Camera initialisation time 4 s (40 x 0.1 s)
key=ord('h')
person=0
hold_time_start=time.time()
def main():
global frcnt,mode,lock,outputFrame,key,detect,person
while True: # loop takes 0.1 s
# start_time = time.time() # Seconds display
frame = camera.read()
frame = cv2.flip(frame,-1)
frcnt = frcnt - 1 # show some frames without activity
if frcnt < 1:
# print('mode',mode)
if mode == 1: # hand control
frcnt = 1
if key == 82: # arrow keys
s_forw()
bot.ultrasonicSensorRead(7,onRead)
sleep(0.1)
a = round(dist)
if a < 20:
key = ord('t')
elif key == 84:
s_back()
elif key == ord('t'):
stop()
elif key == 81:
s_left()
elif key == 83:
s_right()
with lock:
outputFrame = frame.copy()
elif mode == 62:
left(150)
mode = 4
frcnt = 30
with lock:
outputFrame = frame.copy()
elif mode == 4: # find person
stop()
current_time=time.time()
hold_time=current_time - hold_time_start
if hold_time>=10:
detect=1
if detect==1:
frame = dete(frame) # recognize the picture now
zyc = end_time-start_time
label = "{:.1f}".format(zyc)
cv2.putText(frame, label, (5, 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)
if person == 1 :
mode = 4
else :
mode = 62
person = 0
with lock:
outputFrame = frame.copy()
if key == ord('q'):
break
elif key == ord('h'):
mode = 1
print('hand control')
elif key == ord('s'):
key = 0
mode = 4
print('find Person')
stop()
bot.close()
sys.exit()
def generate():
# grab global references to the output frame and lock variables
global outputFrame, lock
# loop over frames from the output stream
while True:
# wait until the lock is acquired
with lock:
# check if the output frame is available, otherwise skip
# the iteration of the loop
if outputFrame is None:
continue
# encode the frame in JPEG format
(flag, encodedImage) = cv2.imencode(".jpg", outputFrame)
# ensure the frame was successfully encoded
if not flag:
continue
# yield the output frame in the byte format
yield(b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' +
bytearray(encodedImage) + b'\r\n')
@app.route("/video_feed")
def video_feed():
# return the response generated along with the specific media
# type (mime type)
return Response(generate(),
mimetype = "multipart/x-mixed-replace; boundary=frame")
if __name__ == '__main__':
print('[INFO] Abort with Ctrl+c')
print('[INFO] (megapi-Threads terminate)')
t = threading.Thread(target=main)
t.daemon = True
t.start()
# start the flask app
app.run(host='0.0.0.0', port='8000', debug=True,
threaded=True, use_reloader=False)
sys.exit()
sleep(0.1)
camera.stop()
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment