Last active November 22, 2023 00:54
Websocket Server that Interfaces with PyFirmata intended to be used with Pixel Composer
import asyncio
import websockets
import pyfirmata
import json
import platform
major,minor,_ = platform.python_version_tuple()
if major != '3':
logging.error('ERROR: Python 3 required to run.')
if minor >= '11':
# 3.11 fix for property name change in inspect for pyfirmata
import inspect
if not hasattr(inspect, 'getargspec'):
inspect.getargspec = inspect.getfullargspec
class BoardControl:
def __init__(self):
self.port = None
self.board = None = {}
def clamp(self, x, min_val, max_val):
return max(min_val, min(x, max_val))
def update(self, port, data):
if port != self.port:
if self.board is not None:
self.port = port
self.board = pyfirmata.Arduino(self.port) = {}
gdata = {}
for key, value in data.items():
if key == "port":
attrib,name = key.split('_')
if name not in gdata:
gdata[name] = {}
gdata[name][attrib] = value
for io_name,io_attrib in gdata.items():
pin = int(io_attrib['pin'])
value = io_attrib['value']
io_type = io_attrib['type']
if pin not in
if io_type == 'servo':[pin] = self.board.get_pin(f'd:{pin}:s')
elif io_type == 'digital':[pin] = self.board.get_pin(f'd:{pin}:o')
elif io_type == 'analog':[pin] = self.board.get_pin(f'a:{pin}:o')
elif io_type == 'digital_pwm':[pin] = self.board.get_pin(f'd:{pin}:p')
elif io_type == 'analog_pwm':[pin] = self.board.get_pin(f'a:{pin}:p')
if io_type == 'servo':[pin].write(self.clamp(int(value),0,180))
elif io_type == 'digital':[pin].write(self.clamp(int(value),0,1))
elif io_type == 'analog':[pin].write(self.clamp(int(value),0,1))
elif io_type == 'digital_pwm':[pin].write(self.clamp(value,0.0,1.0))
elif io_type == 'analog_pwm':[pin].write(self.clamp(value,0.0,1.0))
connected_clients = set()
servo_ctrl = BoardControl()
async def register(websocket):
print(f"Client connected: {websocket.remote_address}")
async def unregister(websocket):
print(f"Client disconnected: {websocket.remote_address}")
async def echo(websocket, path):
await register(websocket)
async for message in websocket:
#print(f"Received message from {websocket.remote_address}: {message}")
data = json.loads(message)
except websockets.exceptions.ConnectionClosed as e:
print(f"Connection closed with {websocket.remote_address}: {e.reason}")
await unregister(websocket)
async def main():
async with websockets.serve(echo, "", 22300):
await asyncio.Future()
if __name__ == "__main__":
