Skip to content

Instantly share code, notes, and snippets.

@Lokno
Last active November 22, 2023 00:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lokno/3a131d0c9e67c097e539e604f73961d7 to your computer and use it in GitHub Desktop.
Save Lokno/3a131d0c9e67c097e539e604f73961d7 to your computer and use it in GitHub Desktop.
Websocket Server that Interfaces with PyFirmata intended to be used with Pixel Composer
import asyncio
import websockets
import pyfirmata
import json
import platform
major,minor,_ = platform.python_version_tuple()
if major != '3':
logging.error('ERROR: Python 3 required to run.')
sys.exit(-1)
if minor >= '11':
# 3.11 fix for property name change in inspect for pyfirmata
import inspect
if not hasattr(inspect, 'getargspec'):
inspect.getargspec = inspect.getfullargspec
class BoardControl:
def __init__(self):
self.port = None
self.board = None
self.io = {}
def clamp(self, x, min_val, max_val):
return max(min_val, min(x, max_val))
def update(self, port, data):
if port != self.port:
if self.board is not None:
self.board.exit()
self.port = port
self.board = pyfirmata.Arduino(self.port)
self.io = {}
gdata = {}
for key, value in data.items():
if key == "port":
continue
attrib,name = key.split('_')
if name not in gdata:
gdata[name] = {}
gdata[name][attrib] = value
for io_name,io_attrib in gdata.items():
pin = int(io_attrib['pin'])
value = io_attrib['value']
io_type = io_attrib['type']
if pin not in self.io:
if io_type == 'servo':
self.io[pin] = self.board.get_pin(f'd:{pin}:s')
elif io_type == 'digital':
self.io[pin] = self.board.get_pin(f'd:{pin}:o')
elif io_type == 'analog':
self.io[pin] = self.board.get_pin(f'a:{pin}:o')
elif io_type == 'digital_pwm':
self.io[pin] = self.board.get_pin(f'd:{pin}:p')
elif io_type == 'analog_pwm':
self.io[pin] = self.board.get_pin(f'a:{pin}:p')
if io_type == 'servo':
self.io[pin].write(self.clamp(int(value),0,180))
elif io_type == 'digital':
self.io[pin].write(self.clamp(int(value),0,1))
elif io_type == 'analog':
self.io[pin].write(self.clamp(int(value),0,1))
elif io_type == 'digital_pwm':
self.io[pin].write(self.clamp(value,0.0,1.0))
elif io_type == 'analog_pwm':
self.io[pin].write(self.clamp(value,0.0,1.0))
connected_clients = set()
servo_ctrl = BoardControl()
async def register(websocket):
connected_clients.add(websocket)
print(f"Client connected: {websocket.remote_address}")
async def unregister(websocket):
connected_clients.remove(websocket)
print(f"Client disconnected: {websocket.remote_address}")
async def echo(websocket, path):
await register(websocket)
try:
async for message in websocket:
#print(f"Received message from {websocket.remote_address}: {message}")
data = json.loads(message)
servo_ctrl.update(data['port'],data)
except websockets.exceptions.ConnectionClosed as e:
print(f"Connection closed with {websocket.remote_address}: {e.reason}")
finally:
await unregister(websocket)
async def main():
async with websockets.serve(echo, "127.0.0.1", 22300):
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment