Skip to content

Instantly share code, notes, and snippets.

@chrisconlon-klaviyo
Created August 11, 2019 22:46
Show Gist options
  • Save chrisconlon-klaviyo/6ab0996780e66d2640076857bc89124e to your computer and use it in GitHub Desktop.
Save chrisconlon-klaviyo/6ab0996780e66d2640076857bc89124e to your computer and use it in GitHub Desktop.
A python3 script that can find + connect to an Ardunio and web socket, and relay commands between them.
import argparse
import asyncio
import colored
import serial
from serial.tools import list_ports
import sys
import uuid
import websockets
parser = argparse.ArgumentParser(
description='Listen for finger game players and control hardware.',
)
parser.add_argument(
'--server',
dest='server_address',
action='store',
default='localhost:9000',
help='The URL and port of the coordinating server to connect to.',
)
parser.add_argument(
'--debug',
dest='debug',
action='store_true',
default=False,
help='Whether to print debugging information or not.',
)
args = parser.parse_args()
VALID_COMMANDS = [
'BEGIN_TOUCHING_CUP',
'STOPPED_TOUCHING_CUP',
'HEALTH_CHECK',
]
BAUD_RATE = 9600
def log(message, color):
color_code = {
'danger': 13,
'good': 40,
'info': 6,
}.get(color, 9)
print(colored.stylize(message, colored.bg(color_code)))
def find_device():
arduino_devices = []
for device in list_ports.comports():
if 'arduino' in device.description.lower() or (device.manufacturer and 'arduino' in device.manufacturer.lower()):
arduino_devices.append(device)
if len(arduino_devices) > 1:
log(
' - more than one arduino found: {}'.format(','.join([device.product for device in arduino_devices])),
'danger',
)
if not arduino_devices:
log(
' - no arduino like devices found.',
'danger',
)
return
return arduino_devices[0]
async def client(identifier, arduino):
async with websockets.connect('wss://{}'.format(args.server_address)) as websocket:
await websocket.send('REGISTER:{}'.format(identifier))
while True:
command = await websocket.recv()
if command not in VALID_COMMANDS:
if args.debug:
log('Unknown command: {}'.format(command), 'danger')
else:
if command == 'HEALTH_CHECK':
await websocket.send('HEALTHY:{}'.format(identifier))
else:
log(command, 'good')
arduino.write('{}\n'.format(command).encode('utf-8'))
if __name__ == '__main__':
identifier = str(uuid.uuid4())
arduino = find_device()
if not arduino:
sys.exit(1)
try:
arduino = serial.Serial(arduino.device, BAUD_RATE)
log(' - arduino found and connected', 'info')
log(' - connecting client to server as: {}'.format(identifier), 'info')
asyncio.get_event_loop().run_until_complete(client(identifier, arduino))
except websockets.exceptions.ConnectionClosed:
log(' - connection closed by remote', 'danger')
except OSError as error:
log(' - cannot connect to remote server: {}'.format(error), 'danger')
except KeyboardInterrupt:
log(' - disconnecting', 'info')
except serial.serialutil.SerialException:
log(' - cannot connect to arduino, it is in use', 'danger')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment