Created
November 19, 2014 01:50
-
-
Save marcdjulien/b0e49b6fd0e4f80b5397 to your computer and use it in GitHub Desktop.
Raphael Turtle Graphics Robot - Uses WiringPi and OpenCV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Marc-Daniel Julien | |
Akiva Krauthamer | |
Intro to Physical Compting | |
Carnegie Mellon University | |
CarLogic implements all of the behavior of the robot via a | |
Finite State Machine | |
""" | |
from VideoThread import VideoThread | |
from MotorController import MotorController | |
from threading import Lock, Thread | |
import time | |
import random | |
STANDARD_STATE = "STANDARD_STATE" | |
CIRCLE_STATE = "CIRCLE_STATE" | |
LINE_STATE = "LINE_STATE" | |
CROSS_STATE = "CROSS_STATE" | |
BORDER_STATE = "BORDER_STATE" | |
RUN_STATE = "RUN_STATE" | |
DONE_STATE = "DONE_STATE" | |
BORDER_ACTION = "BORDER_ACTION" | |
CIRCLE_ACTION = "CIRCLE_ACTION" | |
LINE_ACTION = "LINE_ACTION" | |
CROSS_ACTION = "CROSS_ACTION" | |
RUN_ACTION = "RUN_ACTION" | |
LEFT = "LEFT" | |
RIGHT = "RIGHT" | |
RUN_TIME = 10.0 | |
class CarLogic(Thread): | |
def __init__(self): | |
super(CarLogic, self).__init__() | |
self.state_mutex = Lock() | |
self.action_mutex = Lock() | |
self.mc = MotorController() | |
self.start_state_time = 0 | |
self.state = None | |
self.action = None | |
self.done = False | |
self.run_time = time.time() | |
def run(self): | |
self.set_state(STANDARD_STATE) | |
while self.get_state() != DONE_STATE: | |
# Get state | |
state = self.get_state() | |
# Act based on state | |
# Border state gets priority | |
if state == BORDER_STATE: | |
self.border_state() | |
# Check if should run | |
if self.run_timer() > RUN_TIME: | |
action = [RUN_ACTION, random.randint(1,360), random.randint(1000,4000)/1000.0] | |
self.set_state(RUN_STATE, action) | |
self.run_time = time.time() | |
elif state == STANDARD_STATE: | |
self.standard_state() | |
elif state == CIRCLE_STATE: | |
self.circle_state() | |
elif state == LINE_STATE: | |
self.line_state() | |
elif state == CROSS_STATE: | |
self.cross_state() | |
elif state == RUN_STATE: | |
self.run_state() | |
print "Done" | |
def border_state(self): | |
t = self.time_in_state() | |
action = self.get_action() | |
if action[0] != BORDER_ACTION: | |
return | |
stop_time = action[1] | |
reverse_time = stop_time+2.000 | |
spin_time = reverse_time + self.degree_to_time(45+random.randint(1,45)) | |
if (0 <= t) and (t < stop_time): | |
self.mc.stop() | |
elif (stop_time <= t) and (t <= reverse_time): | |
self.mc.reverse() | |
elif (reverse_time < t) and (t <= spin_time): | |
self.mc.left() | |
else: | |
self.set_state(STANDARD_STATE) | |
def standard_state(self): | |
self.mc.forward() | |
def run_state(self): | |
t = self.time_in_state() | |
action = self.get_action() | |
if action[0] != RUN_ACTION: | |
return | |
spin_time = self.degree_to_time(action[1]) | |
run_time = spin_time + action[2] | |
if (0 <= t) and (t < spin_time): | |
self.mc.right() | |
elif (spin_time <= t) and (t < run_time): | |
self.mc.forward() | |
else: | |
self.set_state(STANDARD_STATE) | |
# Reseting timer each run through | |
self.run_time = time.time() | |
def circle_state(self): | |
t = self.time_in_state() | |
action = self.get_action() | |
if action[0] != CIRCLE_ACTION: | |
return | |
spin_time = self.degree_to_time(action[1]) | |
if (0 <= t) and (t < spin_time): | |
self.mc.right() | |
else: | |
self.set_state(STANDARD_STATE) | |
def line_state(self): | |
t = self.time_in_state() | |
action = self.get_action() | |
if action[0] != LINE_ACTION: | |
return | |
spin_time = self.degree_to_time(45) | |
if (0 <= t) and (t < spin_time): | |
if self.action[1] == RIGHT: | |
self.mc.right() | |
else: | |
self.mc.left() | |
else: | |
self.set_state(STANDARD_STATE) | |
def cross_state(self): | |
t = self.time_in_state() | |
action = self.get_action() | |
if action[0] != CROSS_ACTION: | |
return | |
spin_time = self.degree_to_time(action[1]) | |
if (0 <= t) and (t < spin_time): | |
self.mc.right() | |
else: | |
self.set_state(STANDARD_STATE) | |
# Determine how much time the bot needs to spin | |
# to turn a numebr of degrees | |
def degree_to_time(self, degree): | |
FACTOR = 5.5/360.0 | |
return FACTOR * degree | |
def time_in_state(self): | |
return time.time() - self.start_state_time | |
# Time until bot flees in one direction | |
def run_timer(self): | |
return time.time() - self.run_time | |
def get_state(self): | |
self.state_mutex.acquire() | |
state = self.state | |
self.state_mutex.release() | |
return state | |
def set_state(self, state, action=None): | |
if state == self.get_state(): | |
return | |
self.state_mutex.acquire() | |
self.state = state | |
self.start_state_time = time.time() | |
if action != None: | |
self.set_action(action) | |
print action | |
self.state_mutex.release() | |
print state | |
def set_action(self, action): | |
self.action_mutex.acquire() | |
self.action = action | |
self.action_mutex.release() | |
def get_action(self): | |
self.action_mutex.acquire() | |
action = self.action | |
self.action_mutex.release() | |
return action | |
# Called from VieoThread when the robot should | |
# change its behavior | |
def signal(self, action): | |
if action[0] == BORDER_ACTION: | |
if self.get_state() != BORDER_STATE: | |
self.set_state(BORDER_STATE, action) | |
elif action[0] == CIRCLE_ACTION: | |
if self.get_state() == STANDARD_STATE: | |
self.set_state(CIRCLE_STATE, action) | |
elif action[0] == LINE_ACTION: | |
if self.get_state() == STANDARD_STATE: | |
self.set_state(LINE_STATE, action) | |
elif action[0] == CROSS_ACTION: | |
if self.get_state() == STANDARD_STATE: | |
self.set_state(CROSS_STATE, action) | |
def end(self): | |
self.set_state(DONE_STATE) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Marc-Daniel Julien | |
Akiva Krauthamer | |
Intro to Physical Compting | |
Carnegie Mellon University | |
The FrameGrabber solely grabs frames and stores them for the VideoThread to use | |
""" | |
import cv2 | |
import time | |
import numpy as np | |
from threading import Thread, Lock | |
DISPLAY = False | |
FPS = 20 | |
SLEEP_PERIOD = 1/FPS | |
LOWER_WHITE = np.array([180*(000.0/360.0), 0, 0], dtype=np.uint8) | |
HIGHER_WHITE = np.array([180*(360.0/360.0), 100, 255], dtype=np.uint8) | |
class FrameGrabber(Thread): | |
def __init__(self): | |
super(FrameGrabber, self).__init__() | |
self.frames = [] | |
self.frames_mutex = Lock() | |
self.done = True | |
self.capture = cv2.VideoCapture(-1) | |
def run(self): | |
# Check camera | |
if not self.capture.isOpened(): | |
print "No camera" | |
# Begin main loop | |
else: | |
while True: | |
start = time.time() | |
frame = self.get_frame() | |
self.add_frame(frame) | |
proc_time = time.time()-start | |
time.sleep(max(0,SLEEP_PERIOD-proc_time)) | |
def get_frame(self): | |
ret, frame1 = self.capture.read() | |
frame1 = cv2.blur(frame1, (5,5)) | |
frame1 = cv2.resize(frame1, (frame1.shape[1]/4,frame1.shape[0]/4), interpolation = cv2.INTER_CUBIC) | |
frame2 = cv2.cvtColor(frame1, cv2.COLOR_BGR2HSV) | |
frame2 = cv2.inRange(frame2, LOWER_WHITE, HIGHER_WHITE) | |
frame1[frame2 == 255] = [255,255,255] | |
return frame1 | |
def add_frame(self, frame): | |
self.frames_mutex.acquire() | |
self.frames.append(frame) | |
self.frames_mutex.release() | |
# Called by Videothread to get the latest fame | |
def last_frame(self): | |
frame = None | |
self.frames_mutex.acquire() | |
if len(self.frames) > 0: | |
frame = self.frames.pop() | |
del self.frames | |
self.frames = [] | |
self.frames_mutex.release() | |
return frame |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Marc-Daniel Julien | |
Akiva Krauthamer | |
Intro to Physical Compting | |
Carnegie Mellon University | |
Program entry point, starts all of the threads | |
""" | |
from VideoThread import VideoThread | |
from CarLogic import CarLogic | |
from FrameGrabber import FrameGrabber | |
def main(): | |
cl = CarLogic() | |
fg = FrameGrabber() | |
vt = VideoThread(cl,fg) | |
fg.start() | |
cl.start() | |
vt.start() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Marc-Daniel Julien | |
Akiva Krauthamer | |
Intro to Physical Compting | |
Carnegie Mellon University | |
""" | |
import wiringpi2 as wpi | |
# Setup constants | |
INPUT = 0 | |
OUTPUT = 1 | |
PWM = 2 | |
HIGH = 1 | |
LOW = 0 | |
# GPIO Pins | |
SPEED_PIN = 18 | |
DIR_PIN = 24 | |
INA_PIN = 4 | |
INB_PIN = 17 | |
class MotorController(object): | |
# Initialize WiringPi/GPIO pins | |
def __init__(self): | |
wpi.wiringPiSetupGpio() | |
wpi.pinMode(DIR_PIN, OUTPUT) | |
wpi.softPwmCreate(SPEED_PIN, 0, 100) | |
wpi.pinMode(INA_PIN, OUTPUT) | |
wpi.pinMode(INB_PIN, OUTPUT) | |
# Spin wheel 2 left | |
def w2left(self): | |
wpi.digitalWrite(INA_PIN, HIGH) | |
wpi.digitalWrite(INB_PIN, LOW) | |
# Spin wheel 2 right | |
def w2right(self): | |
wpi.digitalWrite(INA_PIN, LOW) | |
wpi.digitalWrite(INB_PIN, HIGH) | |
# Spin wheel 1 right speed 1 | |
def w1speed1r(self): | |
wpi.softPwmWrite(SPEED_PIN, 50) | |
wpi.digitalWrite(DIR_PIN, LOW) | |
# Spin wheel 1 right speed 2 | |
def w1speed2r(self): | |
wpi.softPwmWrite(SPEED_PIN, 100) | |
wpi.digitalWrite(DIR_PIN, LOW) | |
# Spin wheel 1 left speed 1 | |
def w1speed1l(self): | |
wpi.softPwmWrite(SPEED_PIN, 50) | |
wpi.digitalWrite(DIR_PIN, HIGH) | |
# Spin wheel 1 left speed 2 | |
def w1speed2l(self): | |
wpi.softPwmWrite(SPEED_PIN, 0) | |
wpi.digitalWrite(DIR_PIN, HIGH) | |
# Turn robot left | |
def left(self): | |
self.w2left() | |
self.w1speed1r() | |
# Turn robot right | |
def right(self): | |
self.w2right() | |
self.w1speed1l() | |
# Move robot forward | |
def forward(self): | |
self.w2right() | |
self.w1speed2r() | |
# Move robot backward | |
def reverse(self): | |
self.w2left() | |
self.w1speed2l() | |
# Stop robot | |
def stop(self): | |
wpi.digitalWrite(INA_PIN, HIGH) | |
wpi.digitalWrite(INB_PIN, HIGH) | |
wpi.softPwmWrite(SPEED_PIN, 100) | |
wpi.digitalWrite(DIR_PIN, HIGH) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Marc-Daniel Julien | |
Akiva Krauthamer | |
Intro to Physical Compting | |
Carnegie Mellon University | |
The VideoThread takes the most recent frame from the FrameGrabber thread and | |
analyzes the frame to determine the robots behavior | |
""" | |
import cv2 | |
import numpy as np | |
import threading | |
import time | |
import random | |
import math | |
BORDER_ACTION = "BORDER_ACTION" | |
CIRCLE_ACTION = "CIRCLE_ACTION" | |
LINE_ACTION = "LINE_ACTION" | |
CROSS_ACTION = "CROSS_ACTION" | |
LEFT = "LEFT" | |
RIGHT = "RIGHT" | |
LINE = "LINE" | |
INTERSECTION = "INTERSECTION" | |
LOWER_BLUE = np.array([180*(135.0/360.0), 100, 100], dtype=np.uint8) | |
HIGHER_BLUE = np.array([180*(270.0/360.0), 255, 255], dtype=np.uint8) | |
LOWER_RED = np.array([180*(000.0/360.0), 100, 100], dtype=np.uint8) | |
HIGHER_RED = np.array([180*(30.0/360.0), 255, 255], dtype=np.uint8) | |
THRESH = 100 | |
DISPLAY = False | |
class VideoThread(threading.Thread): | |
# Assigne MotorController and FrameGrabber | |
def __init__(self, ct, fg): | |
super(VideoThread, self).__init__() | |
self.ct = ct | |
self.fg = fg | |
def run(self): | |
while True: | |
# Grab last frame | |
frame = self.fg.last_frame() | |
if frame == None: | |
continue | |
# Convert to grayscale | |
gray_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
#Analyze the grayscale and color frame to determine action | |
action = self.analyze_frame(frame, gray_frame) | |
# Signal the MotorController as to what it should do next | |
if action != None: | |
self.ct.signal(action) | |
if DISPLAY: cv2.imshow("Frame", new_frame) | |
self.ct.end() | |
# Finds circles in frame | |
def find_circles(self, frame): | |
FACTOR = 0.5 | |
minDist = frame.shape[1]*FACTOR | |
#dp = 4.62 | |
return cv2.HoughCircles(frame, cv2.cv.CV_HOUGH_GRADIENT, dp=6, minDist=3) | |
# Draws circles in frame | |
def draw_circles(self, frame, circles): | |
new_frame = frame | |
if circles != None: | |
for i in circles[0, :]: | |
cv2.circle(new_frame, (i[0], i[1]), i[2], (255,0,0), 1) | |
cv2.circle(new_frame, (i[0], i[1]), 1, (255,255,0), 1) | |
return new_frame | |
# Draws locations of corners in frame | |
def draw_corners(self, new_frame, corners): | |
if corners != None: | |
y = corners[0] | |
x = corners[1] | |
for i in xrange(x.shape[0]): | |
a,b = x[i],y[i] | |
cv2.circle(new_frame, (a, b), 1, (255,0,255), 1) | |
return new_frame | |
# Find lines in frame | |
def find_lines(self, frame): | |
frame = cv2.Canny(frame, 50, 150, apertureSize=3) | |
if DISPLAY: cv2.imshow("Edges",frame) | |
#return cv2.HoughLinesP(frame, rho=10, theta=3.1415927/30.0, threshold=10, minLineLength=20, maxLineGap=10) | |
return cv2.HoughLines(frame, rho=1, theta=3.1415927/32.0, threshold=20) | |
# Draws lines in frame | |
def draw_lines(self, frame, lines): | |
if lines != None: | |
for i in lines[0, :]: | |
a = np.cos(i[1]) | |
b = np.sin(i[1]) | |
x0 = a*i[0] | |
y0 = b*i[0] | |
x1 = int(x0 + 1000*(-b)) | |
y1 = int(y0 + 1000*(a)) | |
x2 = int(x0 - 1000*(-b)) | |
y2 = int(y0 - 1000*(a)) | |
cv2.line(frame, (x1, y1), (x2, y2), (0,255,0)) | |
return frame | |
# Detect red markings, which indicate border | |
def detect_border(self, frame): | |
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) | |
frame = cv2.inRange(frame, LOWER_RED, HIGHER_RED) | |
return np.any(frame) | |
# Find corners in frame | |
def find_corners(self, frame): | |
frame = np.float32(frame) | |
corners = cv2.cornerHarris(frame, blockSize=2, ksize=5, k=0.2) | |
if corners == None: | |
return None | |
corners = corners > 0.01*corners.max() | |
x,y = np.where(corners == True) | |
return x,y | |
# Determine if if a single line or an intersection of | |
# lines is in the frame by calculating the standard | |
# deviation of the line orientation (angle) | |
def line_or_intersection(self, lines): | |
angles = self.get_angles(lines) | |
std = np.std(angles) | |
if std > 20: | |
return INTERSECTION | |
else: | |
return LINE | |
# Get list of line angles | |
def get_angles(self, lines): | |
angles = [] | |
for l in lines[0, :]: | |
angles.append(l[1]*180.0/math.pi) | |
return np.array(angles) | |
def angle(self, p): | |
return math.atan2(p[3]-p[1], p[2]-p[0])*180/math.pi | |
def analyze_frame(self, frame, gray_frame): | |
# Check for border immediately | |
if self.detect_border(frame): | |
return [BORDER_ACTION, random.randint(1,5000)/1000.0] | |
# Check for circles | |
circles = self.find_circles(gray_frame) | |
if circles != None: | |
action = [CIRCLE_ACTION, 360 + random.randint(1,360)] | |
return action | |
# Check for lines | |
lines = self.find_lines(gray_frame) | |
if lines != None: | |
line_type = self.line_or_intersection(lines) | |
if line_type == LINE: | |
direction = LEFT if random.randint(1,2) == 1 else RIGHT | |
action = [LINE_ACTION, direction] | |
return action | |
else: #line_type == INTERSECTION | |
angles = self.get_angles(lines) | |
avg = np.mean(angles) | |
action = [CROSS_ACTION, avg%360] | |
return action | |
return None | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment