Skip to content

Instantly share code, notes, and snippets.

@marcdjulien
Created November 19, 2014 01:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcdjulien/b0e49b6fd0e4f80b5397 to your computer and use it in GitHub Desktop.
Save marcdjulien/b0e49b6fd0e4f80b5397 to your computer and use it in GitHub Desktop.
Raphael Turtle Graphics Robot - Uses WiringPi and OpenCV
"""
Marc-Daniel Julien
Akiva Krauthamer
Intro to Physical Compting
Carnegie Mellon University
CarLogic implements all of the behavior of the robot via a
Finite State Machine
"""
from VideoThread import VideoThread
from MotorController import MotorController
from threading import Lock, Thread
import time
import random
STANDARD_STATE = "STANDARD_STATE"
CIRCLE_STATE = "CIRCLE_STATE"
LINE_STATE = "LINE_STATE"
CROSS_STATE = "CROSS_STATE"
BORDER_STATE = "BORDER_STATE"
RUN_STATE = "RUN_STATE"
DONE_STATE = "DONE_STATE"
BORDER_ACTION = "BORDER_ACTION"
CIRCLE_ACTION = "CIRCLE_ACTION"
LINE_ACTION = "LINE_ACTION"
CROSS_ACTION = "CROSS_ACTION"
RUN_ACTION = "RUN_ACTION"
LEFT = "LEFT"
RIGHT = "RIGHT"
RUN_TIME = 10.0
class CarLogic(Thread):
def __init__(self):
super(CarLogic, self).__init__()
self.state_mutex = Lock()
self.action_mutex = Lock()
self.mc = MotorController()
self.start_state_time = 0
self.state = None
self.action = None
self.done = False
self.run_time = time.time()
def run(self):
self.set_state(STANDARD_STATE)
while self.get_state() != DONE_STATE:
# Get state
state = self.get_state()
# Act based on state
# Border state gets priority
if state == BORDER_STATE:
self.border_state()
# Check if should run
if self.run_timer() > RUN_TIME:
action = [RUN_ACTION, random.randint(1,360), random.randint(1000,4000)/1000.0]
self.set_state(RUN_STATE, action)
self.run_time = time.time()
elif state == STANDARD_STATE:
self.standard_state()
elif state == CIRCLE_STATE:
self.circle_state()
elif state == LINE_STATE:
self.line_state()
elif state == CROSS_STATE:
self.cross_state()
elif state == RUN_STATE:
self.run_state()
print "Done"
def border_state(self):
t = self.time_in_state()
action = self.get_action()
if action[0] != BORDER_ACTION:
return
stop_time = action[1]
reverse_time = stop_time+2.000
spin_time = reverse_time + self.degree_to_time(45+random.randint(1,45))
if (0 <= t) and (t < stop_time):
self.mc.stop()
elif (stop_time <= t) and (t <= reverse_time):
self.mc.reverse()
elif (reverse_time < t) and (t <= spin_time):
self.mc.left()
else:
self.set_state(STANDARD_STATE)
def standard_state(self):
self.mc.forward()
def run_state(self):
t = self.time_in_state()
action = self.get_action()
if action[0] != RUN_ACTION:
return
spin_time = self.degree_to_time(action[1])
run_time = spin_time + action[2]
if (0 <= t) and (t < spin_time):
self.mc.right()
elif (spin_time <= t) and (t < run_time):
self.mc.forward()
else:
self.set_state(STANDARD_STATE)
# Reseting timer each run through
self.run_time = time.time()
def circle_state(self):
t = self.time_in_state()
action = self.get_action()
if action[0] != CIRCLE_ACTION:
return
spin_time = self.degree_to_time(action[1])
if (0 <= t) and (t < spin_time):
self.mc.right()
else:
self.set_state(STANDARD_STATE)
def line_state(self):
t = self.time_in_state()
action = self.get_action()
if action[0] != LINE_ACTION:
return
spin_time = self.degree_to_time(45)
if (0 <= t) and (t < spin_time):
if self.action[1] == RIGHT:
self.mc.right()
else:
self.mc.left()
else:
self.set_state(STANDARD_STATE)
def cross_state(self):
t = self.time_in_state()
action = self.get_action()
if action[0] != CROSS_ACTION:
return
spin_time = self.degree_to_time(action[1])
if (0 <= t) and (t < spin_time):
self.mc.right()
else:
self.set_state(STANDARD_STATE)
# Determine how much time the bot needs to spin
# to turn a numebr of degrees
def degree_to_time(self, degree):
FACTOR = 5.5/360.0
return FACTOR * degree
def time_in_state(self):
return time.time() - self.start_state_time
# Time until bot flees in one direction
def run_timer(self):
return time.time() - self.run_time
def get_state(self):
self.state_mutex.acquire()
state = self.state
self.state_mutex.release()
return state
def set_state(self, state, action=None):
if state == self.get_state():
return
self.state_mutex.acquire()
self.state = state
self.start_state_time = time.time()
if action != None:
self.set_action(action)
print action
self.state_mutex.release()
print state
def set_action(self, action):
self.action_mutex.acquire()
self.action = action
self.action_mutex.release()
def get_action(self):
self.action_mutex.acquire()
action = self.action
self.action_mutex.release()
return action
# Called from VieoThread when the robot should
# change its behavior
def signal(self, action):
if action[0] == BORDER_ACTION:
if self.get_state() != BORDER_STATE:
self.set_state(BORDER_STATE, action)
elif action[0] == CIRCLE_ACTION:
if self.get_state() == STANDARD_STATE:
self.set_state(CIRCLE_STATE, action)
elif action[0] == LINE_ACTION:
if self.get_state() == STANDARD_STATE:
self.set_state(LINE_STATE, action)
elif action[0] == CROSS_ACTION:
if self.get_state() == STANDARD_STATE:
self.set_state(CROSS_STATE, action)
def end(self):
self.set_state(DONE_STATE)
"""
Marc-Daniel Julien
Akiva Krauthamer
Intro to Physical Compting
Carnegie Mellon University
The FrameGrabber solely grabs frames and stores them for the VideoThread to use
"""
import cv2
import time
import numpy as np
from threading import Thread, Lock
DISPLAY = False
FPS = 20
SLEEP_PERIOD = 1/FPS
LOWER_WHITE = np.array([180*(000.0/360.0), 0, 0], dtype=np.uint8)
HIGHER_WHITE = np.array([180*(360.0/360.0), 100, 255], dtype=np.uint8)
class FrameGrabber(Thread):
def __init__(self):
super(FrameGrabber, self).__init__()
self.frames = []
self.frames_mutex = Lock()
self.done = True
self.capture = cv2.VideoCapture(-1)
def run(self):
# Check camera
if not self.capture.isOpened():
print "No camera"
# Begin main loop
else:
while True:
start = time.time()
frame = self.get_frame()
self.add_frame(frame)
proc_time = time.time()-start
time.sleep(max(0,SLEEP_PERIOD-proc_time))
def get_frame(self):
ret, frame1 = self.capture.read()
frame1 = cv2.blur(frame1, (5,5))
frame1 = cv2.resize(frame1, (frame1.shape[1]/4,frame1.shape[0]/4), interpolation = cv2.INTER_CUBIC)
frame2 = cv2.cvtColor(frame1, cv2.COLOR_BGR2HSV)
frame2 = cv2.inRange(frame2, LOWER_WHITE, HIGHER_WHITE)
frame1[frame2 == 255] = [255,255,255]
return frame1
def add_frame(self, frame):
self.frames_mutex.acquire()
self.frames.append(frame)
self.frames_mutex.release()
# Called by Videothread to get the latest fame
def last_frame(self):
frame = None
self.frames_mutex.acquire()
if len(self.frames) > 0:
frame = self.frames.pop()
del self.frames
self.frames = []
self.frames_mutex.release()
return frame
"""
Marc-Daniel Julien
Akiva Krauthamer
Intro to Physical Compting
Carnegie Mellon University
Program entry point, starts all of the threads
"""
from VideoThread import VideoThread
from CarLogic import CarLogic
from FrameGrabber import FrameGrabber
def main():
cl = CarLogic()
fg = FrameGrabber()
vt = VideoThread(cl,fg)
fg.start()
cl.start()
vt.start()
if __name__ == '__main__':
main()
"""
Marc-Daniel Julien
Akiva Krauthamer
Intro to Physical Compting
Carnegie Mellon University
"""
import wiringpi2 as wpi
# Setup constants
INPUT = 0
OUTPUT = 1
PWM = 2
HIGH = 1
LOW = 0
# GPIO Pins
SPEED_PIN = 18
DIR_PIN = 24
INA_PIN = 4
INB_PIN = 17
class MotorController(object):
# Initialize WiringPi/GPIO pins
def __init__(self):
wpi.wiringPiSetupGpio()
wpi.pinMode(DIR_PIN, OUTPUT)
wpi.softPwmCreate(SPEED_PIN, 0, 100)
wpi.pinMode(INA_PIN, OUTPUT)
wpi.pinMode(INB_PIN, OUTPUT)
# Spin wheel 2 left
def w2left(self):
wpi.digitalWrite(INA_PIN, HIGH)
wpi.digitalWrite(INB_PIN, LOW)
# Spin wheel 2 right
def w2right(self):
wpi.digitalWrite(INA_PIN, LOW)
wpi.digitalWrite(INB_PIN, HIGH)
# Spin wheel 1 right speed 1
def w1speed1r(self):
wpi.softPwmWrite(SPEED_PIN, 50)
wpi.digitalWrite(DIR_PIN, LOW)
# Spin wheel 1 right speed 2
def w1speed2r(self):
wpi.softPwmWrite(SPEED_PIN, 100)
wpi.digitalWrite(DIR_PIN, LOW)
# Spin wheel 1 left speed 1
def w1speed1l(self):
wpi.softPwmWrite(SPEED_PIN, 50)
wpi.digitalWrite(DIR_PIN, HIGH)
# Spin wheel 1 left speed 2
def w1speed2l(self):
wpi.softPwmWrite(SPEED_PIN, 0)
wpi.digitalWrite(DIR_PIN, HIGH)
# Turn robot left
def left(self):
self.w2left()
self.w1speed1r()
# Turn robot right
def right(self):
self.w2right()
self.w1speed1l()
# Move robot forward
def forward(self):
self.w2right()
self.w1speed2r()
# Move robot backward
def reverse(self):
self.w2left()
self.w1speed2l()
# Stop robot
def stop(self):
wpi.digitalWrite(INA_PIN, HIGH)
wpi.digitalWrite(INB_PIN, HIGH)
wpi.softPwmWrite(SPEED_PIN, 100)
wpi.digitalWrite(DIR_PIN, HIGH)
"""
Marc-Daniel Julien
Akiva Krauthamer
Intro to Physical Compting
Carnegie Mellon University
The VideoThread takes the most recent frame from the FrameGrabber thread and
analyzes the frame to determine the robots behavior
"""
import cv2
import numpy as np
import threading
import time
import random
import math
BORDER_ACTION = "BORDER_ACTION"
CIRCLE_ACTION = "CIRCLE_ACTION"
LINE_ACTION = "LINE_ACTION"
CROSS_ACTION = "CROSS_ACTION"
LEFT = "LEFT"
RIGHT = "RIGHT"
LINE = "LINE"
INTERSECTION = "INTERSECTION"
LOWER_BLUE = np.array([180*(135.0/360.0), 100, 100], dtype=np.uint8)
HIGHER_BLUE = np.array([180*(270.0/360.0), 255, 255], dtype=np.uint8)
LOWER_RED = np.array([180*(000.0/360.0), 100, 100], dtype=np.uint8)
HIGHER_RED = np.array([180*(30.0/360.0), 255, 255], dtype=np.uint8)
THRESH = 100
DISPLAY = False
class VideoThread(threading.Thread):
# Assigne MotorController and FrameGrabber
def __init__(self, ct, fg):
super(VideoThread, self).__init__()
self.ct = ct
self.fg = fg
def run(self):
while True:
# Grab last frame
frame = self.fg.last_frame()
if frame == None:
continue
# Convert to grayscale
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
#Analyze the grayscale and color frame to determine action
action = self.analyze_frame(frame, gray_frame)
# Signal the MotorController as to what it should do next
if action != None:
self.ct.signal(action)
if DISPLAY: cv2.imshow("Frame", new_frame)
self.ct.end()
# Finds circles in frame
def find_circles(self, frame):
FACTOR = 0.5
minDist = frame.shape[1]*FACTOR
#dp = 4.62
return cv2.HoughCircles(frame, cv2.cv.CV_HOUGH_GRADIENT, dp=6, minDist=3)
# Draws circles in frame
def draw_circles(self, frame, circles):
new_frame = frame
if circles != None:
for i in circles[0, :]:
cv2.circle(new_frame, (i[0], i[1]), i[2], (255,0,0), 1)
cv2.circle(new_frame, (i[0], i[1]), 1, (255,255,0), 1)
return new_frame
# Draws locations of corners in frame
def draw_corners(self, new_frame, corners):
if corners != None:
y = corners[0]
x = corners[1]
for i in xrange(x.shape[0]):
a,b = x[i],y[i]
cv2.circle(new_frame, (a, b), 1, (255,0,255), 1)
return new_frame
# Find lines in frame
def find_lines(self, frame):
frame = cv2.Canny(frame, 50, 150, apertureSize=3)
if DISPLAY: cv2.imshow("Edges",frame)
#return cv2.HoughLinesP(frame, rho=10, theta=3.1415927/30.0, threshold=10, minLineLength=20, maxLineGap=10)
return cv2.HoughLines(frame, rho=1, theta=3.1415927/32.0, threshold=20)
# Draws lines in frame
def draw_lines(self, frame, lines):
if lines != None:
for i in lines[0, :]:
a = np.cos(i[1])
b = np.sin(i[1])
x0 = a*i[0]
y0 = b*i[0]
x1 = int(x0 + 1000*(-b))
y1 = int(y0 + 1000*(a))
x2 = int(x0 - 1000*(-b))
y2 = int(y0 - 1000*(a))
cv2.line(frame, (x1, y1), (x2, y2), (0,255,0))
return frame
# Detect red markings, which indicate border
def detect_border(self, frame):
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
frame = cv2.inRange(frame, LOWER_RED, HIGHER_RED)
return np.any(frame)
# Find corners in frame
def find_corners(self, frame):
frame = np.float32(frame)
corners = cv2.cornerHarris(frame, blockSize=2, ksize=5, k=0.2)
if corners == None:
return None
corners = corners > 0.01*corners.max()
x,y = np.where(corners == True)
return x,y
# Determine if if a single line or an intersection of
# lines is in the frame by calculating the standard
# deviation of the line orientation (angle)
def line_or_intersection(self, lines):
angles = self.get_angles(lines)
std = np.std(angles)
if std > 20:
return INTERSECTION
else:
return LINE
# Get list of line angles
def get_angles(self, lines):
angles = []
for l in lines[0, :]:
angles.append(l[1]*180.0/math.pi)
return np.array(angles)
def angle(self, p):
return math.atan2(p[3]-p[1], p[2]-p[0])*180/math.pi
def analyze_frame(self, frame, gray_frame):
# Check for border immediately
if self.detect_border(frame):
return [BORDER_ACTION, random.randint(1,5000)/1000.0]
# Check for circles
circles = self.find_circles(gray_frame)
if circles != None:
action = [CIRCLE_ACTION, 360 + random.randint(1,360)]
return action
# Check for lines
lines = self.find_lines(gray_frame)
if lines != None:
line_type = self.line_or_intersection(lines)
if line_type == LINE:
direction = LEFT if random.randint(1,2) == 1 else RIGHT
action = [LINE_ACTION, direction]
return action
else: #line_type == INTERSECTION
angles = self.get_angles(lines)
avg = np.mean(angles)
action = [CROSS_ACTION, avg%360]
return action
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment