Skip to content

Instantly share code, notes, and snippets.

@wireboy5
Last active December 6, 2023 03:04
Show Gist options
  • Save wireboy5/969f9d9524b6c32cc5f462d34722fda5 to your computer and use it in GitHub Desktop.
Save wireboy5/969f9d9524b6c32cc5f462d34722fda5 to your computer and use it in GitHub Desktop.
A very simple socket server and client that allows sending simple one-way commands from a client to a server, and receiving a response back.
"""
# uSock
A very simple socket server and client that allows sending simple one-way commands from a client to a server, and receiving a response back.
"""
try:
import uasyncio as asyncio
import ustruct as struct
MICROPYTHON = True
except ModuleNotFoundError:
import asyncio
import struct
MICROPYTHON = False
class uSockClient:
host: str
port: int
writer: asyncio.StreamWriter
reader: asyncio.StreamReader
# buffer reads for micropython support
buf: bytes
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.reader = None
self.writer = None
self.buf = bytes()
async def connect(self):
reader, writer = await asyncio.open_connection(
self.host, self.port)
self.reader = reader
self.writer = writer
async def request(self, event: str, data: bytes) -> bytes:
if self.reader == None:
raise RuntimeError("You must call the connect function first.")
# Encode the name as bytes
name = event.encode()
# Encode the length as bytes
name_len = struct.pack("!I", len(name))
# Encode the length of the data as bytes
data_len = struct.pack("!I", len(data))
# Append the values together
packet = name_len + name + data_len + data
# Send the packet
self.writer.write(packet)
# Read the response length
while len(self.buf) < 4:
self.buf += await self.reader.read(4)
resp_length = struct.unpack("!I", self.buf[:4])[0]
self.buf = self.buf[4:]
# Read the response
while len(self.buf) < resp_length:
self.buf += await self.reader.read(4)
response: bytes = self.buf[:resp_length]
self.buf = self.buf[resp_length:]
# Return the response
return response
def close(self):
if self.writer:
self.writer.close()
class uSock:
handlers: dict
def __init__(self):
self.handlers = {}
def handle(self, event: str):
def decorator(handler):
self.handlers[event] = handler
return handler
return decorator
def add_handler(self, event: str, handler):
self.handlers[event] = handler
async def _handle_client(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
# The packet buffer
buf: bytes = bytes()
# Loop, waiting for packets
while True:
# Read in the length of the event name as an unsigned integer
# use network-endianness (big endian)
while len(buf) < 4:
buf += await reader.read(4)
name_length = struct.unpack("!I", buf[:4])[0]
buf = buf[4:]
# Read that many bytes as the name of the event
while len(buf) < name_length:
buf += await reader.read(4)
name: bytes = buf[:name_length]
buf = buf[name_length:]
# Decode the name as utf-8
name: str = name.decode()
# Read the data length
while len(buf) < 4:
buf += await reader.read(4)
data_length = struct.unpack("!I", buf[:4])[0]
buf = buf[4:]
# Read the data
while len(buf) < data_length:
buf += await reader.read(4)
data: bytes = buf[:data_length]
buf = buf[data_length:]
# If the event has a handler, call it
if name in self.handlers.keys():
resp = await self.handlers[name](data)
# If None, set to empty bytes
if resp == None:
resp = b""
else:
# Otherwise, provide an empty response
resp = b""
# Get the response's length and encode as bytes
resp_len = struct.pack("!I", len(resp))
# Encode the packet
packet = resp_len + resp
# Send the length and response
if MICROPYTHON:
writer.write(packet)
await writer.drain()
else:
await writer.write(packet)
async def run(self, host: str, port: int):
"""
Runs the uSock server
"""
if not MICROPYTHON:
# Start a server
server = await asyncio.start_server(self._handle_client, host, port)
async with server:
await server.serve_forever()
else:
asyncio.create_task(asyncio.start_server(self._handle_client, host, port))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment