Skip to content

Instantly share code, notes, and snippets.

Last active November 22, 2023 00:54
Show Gist options
  • Save Lokno/3a131d0c9e67c097e539e604f73961d7 to your computer and use it in GitHub Desktop.
Save Lokno/3a131d0c9e67c097e539e604f73961d7 to your computer and use it in GitHub Desktop.
Websocket Server that Interfaces with PyFirmata intended to be used with Pixel Composer
import asyncio
import websockets
import pyfirmata
import json
import platform
major,minor,_ = platform.python_version_tuple()
if major != '3':
logging.error('ERROR: Python 3 required to run.')
if minor >= '11':
# 3.11 fix for property name change in inspect for pyfirmata
import inspect
if not hasattr(inspect, 'getargspec'):
inspect.getargspec = inspect.getfullargspec
class BoardControl:
def __init__(self):
self.port = None
self.board = None = {}
def clamp(self, x, min_val, max_val):
return max(min_val, min(x, max_val))
def update(self, port, data):
if port != self.port:
if self.board is not None:
self.port = port
self.board = pyfirmata.Arduino(self.port) = {}
gdata = {}
for key, value in data.items():
if key == "port":
attrib,name = key.split('_')
if name not in gdata:
gdata[name] = {}
gdata[name][attrib] = value
for io_name,io_attrib in gdata.items():
pin = int(io_attrib['pin'])
value = io_attrib['value']
io_type = io_attrib['type']
if pin not in
if io_type == 'servo':[pin] = self.board.get_pin(f'd:{pin}:s')
elif io_type == 'digital':[pin] = self.board.get_pin(f'd:{pin}:o')
elif io_type == 'analog':[pin] = self.board.get_pin(f'a:{pin}:o')
elif io_type == 'digital_pwm':[pin] = self.board.get_pin(f'd:{pin}:p')
elif io_type == 'analog_pwm':[pin] = self.board.get_pin(f'a:{pin}:p')
if io_type == 'servo':[pin].write(self.clamp(int(value),0,180))
elif io_type == 'digital':[pin].write(self.clamp(int(value),0,1))
elif io_type == 'analog':[pin].write(self.clamp(int(value),0,1))
elif io_type == 'digital_pwm':[pin].write(self.clamp(value,0.0,1.0))
elif io_type == 'analog_pwm':[pin].write(self.clamp(value,0.0,1.0))
connected_clients = set()
servo_ctrl = BoardControl()
async def register(websocket):
print(f"Client connected: {websocket.remote_address}")
async def unregister(websocket):
print(f"Client disconnected: {websocket.remote_address}")
async def echo(websocket, path):
await register(websocket)
async for message in websocket:
#print(f"Received message from {websocket.remote_address}: {message}")
data = json.loads(message)
except websockets.exceptions.ConnectionClosed as e:
print(f"Connection closed with {websocket.remote_address}: {e.reason}")
await unregister(websocket)
async def main():
async with websockets.serve(echo, "", 22300):
await asyncio.Future()
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment