Skip to content

Instantly share code, notes, and snippets.

@OliverF
Created January 14, 2015 21:05
Show Gist options
  • Save OliverF/f0a75ed4dd38c029b779 to your computer and use it in GitHub Desktop.
Save OliverF/f0a75ed4dd38c029b779 to your computer and use it in GitHub Desktop.
import RPi.GPIO as GPIO
import time
import socket
import threading
import re
QUEUESYSTEM_IP = "127.0.0.1"
QUEUESYSTEM_PORT = "1234"
MA_ENABLE = 4
MA_DIR1 = 17
MA_DIR2 = 27
MB_ENABLE = 10
MB_DIR1 = 9
MB_DIR2 = 11
DIRECTION_NEUTRAL = 0
DIRECTION_LEFT = 37
DIRECTION_RIGHT = 39
DIRECTION_FORWARDS = 38
DIRECTION_BACKWARDS = 40
keyDefinitions = {
"steering": [DIRECTION_LEFT, DIRECTION_RIGHT, DIRECTION_NEUTRAL],
"drive": [DIRECTION_FORWARDS, DIRECTION_BACKWARDS, DIRECTION_NEUTRAL],
}
def setupGPIO():
GPIO.setmode(GPIO.BCM)
GPIO.setup(MA_ENABLE, GPIO.OUT)
GPIO.setup(MA_DIR1, GPIO.OUT)
GPIO.setup(MA_DIR2, GPIO.OUT)
GPIO.setup(MB_ENABLE, GPIO.OUT)
GPIO.setup(MB_DIR1, GPIO.OUT)
GPIO.setup(MB_DIR2, GPIO.OUT)
def setSteering(direction):
if (not direction in keyDefinitions['steering']):
return
if (direction == DIRECTION_NEUTRAL):
GPIO.output(MB_ENABLE, GPIO.LOW)
else:
GPIO.output(MB_ENABLE, GPIO.HIGH)
if (direction == DIRECTION_LEFT):
GPIO.output(MB_DIR1, GPIO.HIGH)
GPIO.output(MB_DIR2, GPIO.LOW)
else:
GPIO.output(MB_DIR1, GPIO.LOW)
GPIO.output(MB_DIR2, GPIO.HIGH)
def setDrive(direction):
if (not direction in keyDefinitions['drive']):
return
if (direction == DIRECTION_NEUTRAL):
GPIO.output(MA_ENABLE, GPIO.LOW)
else:
GPIO.output(MA_ENABLE, GPIO.HIGH)
if (direction == DIRECTION_FORWARDS):
GPIO.output(MA_DIR1, GPIO.HIGH)
GPIO.output(MA_DIR2, GPIO.LOW)
else:
GPIO.output(MA_DIR1, GPIO.LOW)
GPIO.output(MA_DIR2, GPIO.HIGH)
def queueSystem(address, port):
sock = None
currentSteer = False
currentDrive = False
while (not stop):
print("Connecting to queue system...")
while (sock == None):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((address, port))
except Exception, e:
print("Error: Unable to connect to the queue system at {}:{}: {}".format(address, port, e))
sock = None
time.sleep(5)
print("Queue system connected!")
while(not stop):
data = sock.recv(1024)
if (not data):
print("Queue system disconnected!")
break
else:
keyMatch = re.search(r'(up|down)_(\d+)', data)
stopMatch = re.search(r'stop', data)
if (keyMatch):
key = int(keyMatch.group(2))
keyDirection = keyMatch.group(1)
if (keyDirection == "down"):
if (key in keyDefinitions['steering']):
setSteering(key)
currentSteer = key
elif (key in keyDefinitions['drive']):
setDrive(key)
currentDrive = key
else:
print "Got invalid key: " + data
elif (keyMatch.group(1) == "up"):
if (key in keyDefinitions['steering'] and key == currentSteer):
setSteering(DIRECTION_NEUTRAL)
elif (key in keyDefinitions['drive'] and key == currentDrive):
setDrive(DIRECTION_NEUTRAL)
else:
setDrive(DIRECTION_NEUTRAL)
setSteering(DIRECTION_NEUTRAL)
sock = None
setupGPIO()
setSteering(DIRECTION_NEUTRAL)
setDrive(DIRECTION_NEUTRAL)
stop = False
queueSystemThread = threading.Thread(target=queueSystem, args=(QUEUESYSTEM_IP, QUEUESYSTEM_PORT))
queueSystemThread.daemon = True #don't wait for this thread to close before program closes
queueSystemThread.start()
try:
while True:
while (raw_input() != "quit"):
continue
GPIO.cleanup()
stop = True
except:
GPIO.cleanup()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment