-
-
Save itzexor/bfa125296d1a0e8dd407c1312ec13866 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| rpi_bridge.py | |
| Python implementation of rpi-bridge-init and serial tunnel, supporting two modes: | |
| Modes: | |
| --mode source : USB detection, raw serial bridges for MCU and nozzle, | |
| plus TCP-over-serial tunnel on third CDC-ACM interface. | |
| --mode destination : Setup USB gadget, then framed serial tunnel (destination side): | |
| reads from serial gadget interfaces and forwards to TCP. | |
| Source flags: | |
| --mcu-tty MCU serial port (e.g. /dev/ttyS7) | |
| --nozzle-tty Nozzle serial port (e.g. /dev/ttyS1) | |
| --baud Baud rate for raw serial bridges and tunnel (default 230400) | |
| --bind-ip IP for tunnel listener (default 127.0.0.1) | |
| --bind-port Port for tunnel listener (default 7125) | |
| Destination flags: | |
| --baud Baud rate for serial tunnel (default 115200) | |
| --fwd-ip IP to forward TCP to (default 127.0.0.1) | |
| --fwd-port Port to forward TCP to (default 7125) | |
| Common flags: | |
| --log Log file path | |
| Note: USB vendor/product IDs and reset scripts are fixed to defaults and not configurable. | |
| """ | |
| import os | |
| import sys | |
| import time | |
| import argparse | |
| import logging | |
| import signal | |
| from logging.handlers import RotatingFileHandler | |
| import subprocess | |
| import threading | |
| from pathlib import Path | |
| import stat | |
| import termios | |
| import struct | |
| import select | |
| import socket | |
| DEFAULT_RAW_BAUD = 230400 | |
| DEFAULT_TUNNEL_BAUD = 115200 | |
| DEFAULT_IP = "127.0.0.1" | |
| DEFAULT_PORT = 7125 | |
| USB_VID = "1d6b" | |
| USB_PID = "0104" | |
| GADGET_NAME = "g1" | |
| TUNNEL_FRAME_CHUNK_SIZE = 64 | |
| DETECT_TIMEOUT = 30 # seconds to wait for device enumeration | |
| USB_RESET_DELAY = 5 # seconds to wait after usbreset command | |
| LOG_FILE = "/tmp/rpi_bridge.log" | |
| USBRESET_BIN = "/opt/bin/usbreset" | |
| MCU_RESET_SCRIPT = "/usr/bin/mcu_reset.sh" | |
| # Global state for cleanup | |
| shutdown_event = threading.Event() | |
| source_server = None | |
| source_conns = [] | |
| dest_conns = [] | |
| # --- Logging Setup --- | |
| def setup_logging(path): | |
| fmt = logging.Formatter('%(asctime)s rpi-bridge: %(message)s','%Y-%m-%d %H:%M:%S') | |
| fh = RotatingFileHandler(path, maxBytes=10*1024*1024, backupCount=3) | |
| fh.setFormatter(fmt) | |
| root = logging.getLogger() | |
| root.setLevel(logging.INFO) | |
| root.addHandler(fh) | |
| sh = logging.StreamHandler(sys.stdout) | |
| sh.setFormatter(fmt) | |
| root.addHandler(sh) | |
| # --- Signal Handler with cleanup --- | |
| def handle_signal(signum, frame): | |
| logging.info(f"Received signal {signum}, shutting down...") | |
| shutdown_event.set() | |
| # cleanup source tunnel | |
| global source_server, source_conns, dest_conns | |
| if source_server: | |
| try: | |
| source_server.close() | |
| logging.info("Closed source server socket") | |
| except: | |
| pass | |
| for conn in source_conns: | |
| try: | |
| conn.close() | |
| except: | |
| pass | |
| # cleanup destination connections | |
| for sock in dest_conns: | |
| try: | |
| sock.close() | |
| except: | |
| pass | |
| # --- Raw Serial Helper --- | |
| def open_serial_fd(dev, baud): | |
| fd = os.open(dev, os.O_RDWR | os.O_NOCTTY) | |
| attrs = termios.tcgetattr(fd) | |
| attrs[0] = 0; attrs[1] = 0 | |
| attrs[2] = termios.CREAD | termios.CLOCAL | termios.CS8 | |
| attrs[3] = 0 | |
| attrs[6][termios.VMIN] = 1; attrs[6][termios.VTIME] = 0 | |
| b = getattr(termios, f"B{baud}", None) | |
| if b is None: | |
| raise ValueError(f"Unsupported baud rate: {baud}") | |
| attrs[4] = b; attrs[5] = b | |
| termios.tcsetattr(fd, termios.TCSANOW, attrs) | |
| termios.tcflush(fd, termios.TCIOFLUSH) | |
| return fd | |
| # --- Framing Helpers --- | |
| def send_frame(fd, sid, cmd, payload=b""): | |
| hdr = struct.pack(">BBH", sid, cmd, len(payload)) | |
| frame = hdr + payload | |
| for i in range(0, len(frame), TUNNEL_FRAME_CHUNK_SIZE): | |
| os.write(fd, frame[i:i+TUNNEL_FRAME_CHUNK_SIZE]) | |
| termios.tcdrain(fd) | |
| def read_exact(fd, n): | |
| buf = bytearray() | |
| while len(buf) < n: | |
| chunk = os.read(fd, n - len(buf)) | |
| if not chunk: | |
| raise RuntimeError("Serial read error or disconnect") | |
| buf.extend(chunk) | |
| return bytes(buf) | |
| def read_frame(fd): | |
| hdr = read_exact(fd, 4) | |
| sid, cmd, ln = struct.unpack(">BBH", hdr) | |
| data = read_exact(fd, ln) | |
| return sid, cmd, data | |
| # --- USB Gadget / ACM Detection with Verification --- | |
| def wait_for_usb(vid, pid): | |
| start = time.time() | |
| while not shutdown_event.is_set(): | |
| if subprocess.run(["lsusb", "-d", f"{vid}:{pid}"], stdout=subprocess.DEVNULL).returncode == 0: | |
| logging.info(f"USB gadget {vid}:{pid} detected") | |
| return | |
| if time.time() - start > DETECT_TIMEOUT: | |
| logging.error("Timeout waiting for USB gadget") | |
| sys.exit(1) | |
| time.sleep(1) | |
| def find_acm_devices(vid, pid): | |
| devices = [] | |
| for tty in Path('/sys/class/tty').glob('ttyACM*'): | |
| node = (tty / 'device').resolve() | |
| while node != node.parent: | |
| vfile, pfile = node / 'idVendor', node / 'idProduct' | |
| if vfile.exists() and pfile.exists() and \ | |
| vfile.read_text().strip().lower() == vid.lower() and \ | |
| pfile.read_text().strip().lower() == pid.lower(): | |
| path = f"/dev/{tty.name}" | |
| if os.path.exists(path) and stat.S_ISCHR(os.stat(path).st_mode): | |
| try: | |
| test_fd = os.open(path, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) | |
| os.close(test_fd) | |
| devices.append(path) | |
| except: | |
| logging.warning(f"Unresponsive ACM device skipped: {path}") | |
| node = node.parent | |
| devices.sort() | |
| return devices | |
| # --- Raw Serial Bridges --- | |
| def serial_bridge(src, dst, baud): | |
| def loop(rdev, wdev): | |
| while not shutdown_event.is_set(): | |
| fr = fw = None | |
| try: | |
| logging.info(f"Bridge {rdev} -> {wdev} @ {baud}") | |
| fr = open_serial_fd(rdev, baud) | |
| fw = open_serial_fd(wdev, baud) | |
| while not shutdown_event.is_set(): | |
| data = os.read(fr, 1024) | |
| if data: | |
| os.write(fw, data) | |
| except Exception as e: | |
| logging.error(f"Bridge failure {rdev}->{wdev}: {e}") | |
| for fd_handle in (fr, fw): | |
| if fd_handle is not None: | |
| try: os.close(fd_handle) | |
| except: pass | |
| time.sleep(1) | |
| threading.Thread(target=loop, args=(src, dst), daemon=True).start() | |
| threading.Thread(target=loop, args=(dst, src), daemon=True).start() | |
| # --- Tunnel: Source Mode --- | |
| def tunnel_source(fd, bind_ip, bind_port): | |
| global source_server, source_conns | |
| source_server = socket.create_server((bind_ip, bind_port), backlog=10) | |
| source_server.setblocking(False) | |
| conns = {} | |
| source_conns = [] | |
| next_id = 1 | |
| while not shutdown_event.is_set(): | |
| rlist, _, _ = select.select([fd, source_server] + list(conns.keys()), [], [], 0.1) | |
| for s in rlist: | |
| if s is source_server: | |
| conn, _ = source_server.accept(); conn.setblocking(False) | |
| source_conns.append(conn) | |
| sid = next_id; next_id = (sid % 255) + 1 | |
| conns[conn] = sid | |
| send_frame(fd, sid, 0) | |
| elif s is fd: | |
| try: | |
| sid, cmd, data = read_frame(fd) | |
| sock = next((c for c, i in conns.items() if i == sid), None) | |
| if cmd == 1 and sock: | |
| sock.sendall(data) | |
| elif cmd == 2 and sock: | |
| sock.close(); del conns[sock] | |
| except Exception as e: | |
| logging.error(f"Source frame error: {e}") | |
| else: | |
| sid = conns[s] | |
| try: | |
| d = s.recv(4096) | |
| if d: send_frame(fd, sid, 1, d) | |
| else: send_frame(fd, sid, 2); s.close(); del conns[s] | |
| except: | |
| send_frame(fd, sid, 2); s.close(); del conns[s] | |
| # --- Gadget Setup for Destination --- | |
| def setup_usb_gadget(): | |
| gadget_dir = Path(f"/sys/kernel/config/usb_gadget/{GADGET_NAME}") | |
| if gadget_dir.exists(): | |
| logging.info("Gadget already configured, skipping setup") | |
| return | |
| subprocess.run(["modprobe", "libcomposite"], check=True) | |
| os.makedirs(gadget_dir) | |
| (gadget_dir / 'idVendor').write_text(USB_VID) | |
| (gadget_dir / 'idProduct').write_text(USB_PID) | |
| (gadget_dir / 'bcdDevice').write_text('0x0100') | |
| (gadget_dir / 'bcdUSB').write_text('0x0200') | |
| strings_dir = gadget_dir / 'strings' / '0x409' | |
| os.makedirs(strings_dir) | |
| serial = Path('/sys/firmware/devicetree/base/serial-number').read_text().strip() | |
| (strings_dir / 'serialnumber').write_text(serial) | |
| (strings_dir / 'manufacturer').write_text('Raspberry Pi') | |
| (strings_dir / 'product').write_text('Multi-Serial Gadget') | |
| cfg_str = gadget_dir / 'configs' / 'c.1' / 'strings' / '0x409' | |
| os.makedirs(cfg_str, exist_ok=True) | |
| (cfg_str / 'configuration').write_text('Config 1: Multi-Serial') | |
| for i in range(3): | |
| func = gadget_dir / 'functions' / f'acm.usb{i}' | |
| os.makedirs(func, exist_ok=True) | |
| try: | |
| os.symlink(f"../functions/acm.usb{i}", gadget_dir / 'configs' / 'c.1' / f'acm.usb{i}') | |
| except FileExistsError: | |
| pass | |
| udc = Path('/sys/class/udc').read_text().splitlines()[0] | |
| (gadget_dir / 'UDC').write_text(udc) | |
| # --- Tunnel: Destination Mode --- | |
| def tunnel_destination(fd, fwd_ip, fwd_port): | |
| global dest_conns | |
| conns = {} | |
| dest_conns = [] | |
| while not shutdown_event.is_set(): | |
| rlist, _, _ = select.select([fd] + list(conns.values()), [], [], 0.1) | |
| for s in rlist: | |
| if s is fd: | |
| try: | |
| sid, cmd, data = read_frame(fd) | |
| if cmd == 0: | |
| sock = socket.socket(); sock.connect((fwd_ip, fwd_port)); sock.setblocking(False) | |
| conns[sid] = sock | |
| dest_conns.append(sock) | |
| elif cmd == 1 and sid in conns: | |
| conns[sid].sendall(data) | |
| elif cmd == 2 and sid in conns: | |
| conns[sid].close(); del conns[sid] | |
| except Exception as e: | |
| logging.error(f"Destination frame error: {e}") | |
| else: | |
| sid = next(k for k, v in conns.items() if v == s) | |
| try: | |
| d = s.recv(4096) | |
| if d: send_frame(fd, sid, 1, d) | |
| else: send_frame(fd, sid, 2); s.close(); del conns[sid] | |
| except: | |
| send_frame(fd, sid, 2); s.close(); del conns[sid] | |
| # --- Main --- | |
| def main(): | |
| signal.signal(signal.SIGINT, handle_signal) | |
| signal.signal(signal.SIGTERM, handle_signal) | |
| p = argparse.ArgumentParser() | |
| p.add_argument("--mode", choices=["source", "destination"], required=True) | |
| p.add_argument("--log", default=LOG_FILE) | |
| # source args | |
| p.add_argument("--mcu-tty") | |
| p.add_argument("--nozzle-tty") | |
| p.add_argument("--raw-baud", type=int, default=DEFAULT_RAW_BAUD) | |
| p.add_argument("--bind-ip", default=DEFAULT_IP) | |
| p.add_argument("--bind-port", type=int, default=DEFAULT_PORT) | |
| # destination args | |
| p.add_argument("--tunnel-baud", type=int, default=DEFAULT_TUNNEL_BAUD) | |
| p.add_argument("--fwd-ip", default=DEFAULT_IP) | |
| p.add_argument("--fwd-port", type=int, default=DEFAULT_PORT) | |
| args = p.parse_args() | |
| setup_logging(args.log) | |
| if args.mode == "source": | |
| wait_for_usb(USB_VID, USB_PID) | |
| start = time.time() | |
| while not shutdown_event.is_set(): | |
| acms = find_acm_devices(USB_VID, USB_PID) | |
| if len(acms) >= 3: | |
| acms = acms[:3] | |
| break | |
| if time.time() - start > DETECT_TIMEOUT: | |
| logging.error("Timeout waiting for ACM devices") | |
| sys.exit(1) | |
| time.sleep(1) | |
| subprocess.run([USBRESET_BIN, f"{USB_VID}:{USB_PID}"], check=True) | |
| time.sleep(USB_RESET_DELAY) | |
| start = time.time() | |
| while not shutdown_event.is_set(): | |
| acms = find_acm_devices(USB_VID, USB_PID) | |
| if len(acms) >= 3: | |
| acms = acms[:3] | |
| break | |
| if time.time() - start > DETECT_TIMEOUT: | |
| logging.error("Timeout re-detecting ACM devices") | |
| sys.exit(1) | |
| time.sleep(1) | |
| serial_bridge(args.mcu_tty, acms[0], args.raw_baud) | |
| serial_bridge(args.nozzle_tty, acms[1], args.raw_baud) | |
| fd = open_serial_fd(acms[2], args.tunnel_baud) | |
| threading.Thread(target=tunnel_source, args=(fd, args.bind_ip, args.bind_port), daemon=True).start() | |
| subprocess.run([MCU_RESET_SCRIPT], check=True) | |
| shutdown_event.wait() | |
| else: | |
| setup_usb_gadget() | |
| start = time.time() | |
| while not shutdown_event.is_set(): | |
| gs = [] | |
| for t in Path('/sys/class/tty').glob('ttyGS*'): | |
| node = (t / 'device').resolve() | |
| while node != node.parent: | |
| vfile = node / 'idVendor'; pfile = node / 'idProduct' | |
| if vfile.exists() and pfile.exists() and \ | |
| vfile.read_text().strip().lower() == USB_VID and \ | |
| pfile.read_text().strip().lower() == USB_PID: | |
| path = f"/dev/{t.name}" | |
| if os.path.exists(path) and stat.S_ISCHR(os.stat(path).st_mode): | |
| try: | |
| fd_test = os.open(path, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) | |
| os.close(fd_test) | |
| gs.append(path) | |
| except: | |
| logging.warning(f"Unresponsive GS device skipped: {path}") | |
| break | |
| node = node.parent | |
| gs.sort() | |
| if gs: | |
| dev = gs[-1] | |
| break | |
| if time.time() - start > DETECT_TIMEOUT: | |
| logging.error("Timeout waiting for GS device") | |
| sys.exit(1) | |
| time.sleep(1) | |
| fd = open_serial_fd(dev, args.baud) | |
| logging.info(f"Destination tunnel on {dev} -> {args.fwd_ip}:{args.fwd_port}") | |
| tunnel_destination(fd, args.fwd_ip, args.fwd_port) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment