Skip to content

Instantly share code, notes, and snippets.

@chrisstubbs93
Created September 19, 2018 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chrisstubbs93/5bc55706ba59853af0bd52d1fb2580f0 to your computer and use it in GitHub Desktop.
Save chrisstubbs93/5bc55706ba59853af0bd52d1fb2580f0 to your computer and use it in GitHub Desktop.
Python Websockets server for motor control on Pi GPIO
import socket, hashlib, base64, threading
import RPi.GPIO as GPIO
import time
from collections import namedtuple
from functools import wraps
import threading
from threading import Timer
from functools import partial
GPIO.setmode(GPIO.BOARD)
pin_list = [11, 12, 13, 16, 18] #enable, right track 1, right track 2, left track 1, left track 2
GPIO.setup(pin_list, GPIO.OUT)
global lasttime
lasttime = 0
#define motor controls
def stp():
GPIO.output(pin_list, [0,0,0,0,0])
def fwd():
GPIO.output(pin_list, [1,0,1,0,1])
global lasttime
lasttime = int(time.time())
def bck():
GPIO.output(pin_list, [1,1,0,1,0])
global lasttime
lasttime = int(time.time())
def right(): #on the spot turn right
GPIO.output(pin_list, [1,0,1,1,0])
global lasttime
lasttime = int(time.time())
def left(): #on the spot turn left
GPIO.output(pin_list, [1,1,0,0,1])
global lasttime
lasttime = int(time.time())
def fr(): #forward right turn
GPIO.output(pin_list, [1,0,1,0,0])
global lasttime
lasttime = int(time.time())
def fl(): #forward left turn
GPIO.output(pin_list, [1,0,0,0,1])
global lasttime
lasttime = int(time.time())
def br(): #reverse right turn
GPIO.output(pin_list, [1,1,0,0,0])
global lasttime
lasttime = int(time.time())
def bl(): #reverse left turn
GPIO.output(pin_list, [1,0,0,1,0])
global lasttime
lasttime = int(time.time())
class PyWSock:
MAGIC = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
HSHAKE_RESP = "HTTP/1.1 101 Switching Protocols\r\n" + \
"Upgrade: websocket\r\n" + \
"Connection: Upgrade\r\n" + \
"Sec-WebSocket-Accept: %s\r\n" + \
"\r\n"
LOCK = threading.Lock()
clients = []
def recv_data (self, client):
# as a simple server, we expect to receive:
# - all data at one go and one frame
# - one frame at a time
# - text protocol
# - no ping pong messages
data = bytearray(client.recv(512))
if(len(data) < 6):
raise Exception("Error reading data")
# FIN bit must be set to indicate end of frame
assert(0x1 == (0xFF & data[0]) >> 7)
# data must be a text frame
# 0x8 (close connection) is handled with assertion failure
assert(0x1 == (0xF & data[0]))
# assert that data is masked
assert(0x1 == (0xFF & data[1]) >> 7)
datalen = (0x7F & data[1])
#print("received data len %d" %(datalen,))
str_data = ''
if(datalen > 0):
mask_key = data[2:6]
masked_data = data[6:(6+datalen)]
unmasked_data = [masked_data[i] ^ mask_key[i%4] for i in range(len(masked_data))]
str_data = str(bytearray(unmasked_data))
return str_data
def broadcast_resp(self, data):
# 1st byte: fin bit set. text frame bits set.
# 2nd byte: no mask. length set in 1 byte.
resp = bytearray([0b10000001, len(data)])
# append the data bytes
for d in bytearray(data):
resp.append(d)
self.LOCK.acquire()
for client in self.clients:
try:
client.send(resp)
except:
print("error sending to a client")
self.LOCK.release()
def parse_headers (self, data):
headers = {}
lines = data.splitlines()
for l in lines:
parts = l.split(": ", 1)
if len(parts) == 2:
headers[parts[0]] = parts[1]
headers['code'] = lines[len(lines) - 1]
return headers
def handshake (self, client):
# print('Handshaking...')
data = client.recv(2048)
headers = self.parse_headers(data)
# print('Got headers:')
# for k, v in headers.iteritems():
# print(k, ':', v)
key = headers['Sec-WebSocket-Key']
resp_data = self.HSHAKE_RESP % ((base64.b64encode(hashlib.sha1(key+self.MAGIC).digest()),))
# print('Response: [%s]' % (resp_data,))
return client.send(resp_data)
def handle_client (self, client, addr):
self.handshake(client)
try:
while 1:
data = self.recv_data(client)
# print("received [%s]" % (data,))
if data == "s":
# print("motors stop")
stp() #motors go forward for 0.5s
if data == "f":
# print("motors forward")
fwd() #motors go forward for 0.5s
if data == "b":
# print("motors rev")
bck() #motors go rev for 0.5s
if data == "l":
# print("motors left")
left() #motors go left for 0.5s
if data == "r":
# print("motors right")
right() #motors go right for 0.5s
if data == "fl":
# print("motors forward-left")
fl() #motors go forward-left for 0.5s
if data == "fr":
# print("motors forward-right")
fr() #motors go forward-right for 0.5s
if data == "bl":
# print("motors rev-left")
bl() #motors go rev-left for 0.5s
if data == "br":
# print("motors rev-right")
br() #motors go rev-right for 0.5s
self.broadcast_resp(data)
except Exception as e:
print("Exception %s" % (str(e)))
print('Client closed: ' + str(addr))
self.LOCK.acquire()
self.clients.remove(client)
self.LOCK.release()
client.close()
def start_server (self, port):
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', port))
s.listen(5)
while(1):
print ('Waiting for connection...')
conn, addr = s.accept()
print ('Connection from: ' + str(addr))
threading.Thread(target = self.handle_client, args = (conn, addr)).start()
self.LOCK.acquire()
self.clients.append(conn)
self.LOCK.release()
#@periodic(interval=1)
def task1():
try:
global lasttime
print("Lasttime:" + str(lasttime) + " Now:" + str(int(time.time())))
if (int(time.time())>=int(lasttime+2)):
print("timeout")
stp()
except:
print("closing task1")
class Interval(object):
def __init__(self, interval, function, args=[], kwargs={}):
"""
Runs the function at a specified interval with given arguments.
"""
self.interval = interval
self.function = partial(function, *args, **kwargs)
self.running = False
self._timer = None
def __call__(self):
"""
Handler function for calling the partial and continuting.
"""
self.running = False # mark not running
self.start() # reset the timer for the next go
self.function() # call the partial function
def start(self):
"""
Starts the interval and lets it run.
"""
if self.running:
# Don't start if we're running!
return
# Create the timer object, start and set state.
self._timer = Timer(self.interval, self)
self._timer.start()
self.running = True
def stop(self):
"""
Cancel the interval (no more function calls).
"""
if self._timer:
self._timer.cancel()
self.running = False
self._timer = None
try:
interval = Interval(0.5, task1,)
# print "Starting Interval, press CTRL+C to stop."
interval.start()
except:
print("closing interval")
interval.stop()
ws = PyWSock()
ws.start_server(9876)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment