Skip to content

Instantly share code, notes, and snippets.

@itzexor

itzexor/test Secret

Created July 25, 2025 04:48
Show Gist options
  • Save itzexor/bfa125296d1a0e8dd407c1312ec13866 to your computer and use it in GitHub Desktop.
Save itzexor/bfa125296d1a0e8dd407c1312ec13866 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
rpi_bridge.py
Python implementation of rpi-bridge-init and serial tunnel, supporting two modes:
Modes:
--mode source : USB detection, raw serial bridges for MCU and nozzle,
plus TCP-over-serial tunnel on third CDC-ACM interface.
--mode destination : Setup USB gadget, then framed serial tunnel (destination side):
reads from serial gadget interfaces and forwards to TCP.
Source flags:
--mcu-tty MCU serial port (e.g. /dev/ttyS7)
--nozzle-tty Nozzle serial port (e.g. /dev/ttyS1)
--baud Baud rate for raw serial bridges and tunnel (default 230400)
--bind-ip IP for tunnel listener (default 127.0.0.1)
--bind-port Port for tunnel listener (default 7125)
Destination flags:
--baud Baud rate for serial tunnel (default 115200)
--fwd-ip IP to forward TCP to (default 127.0.0.1)
--fwd-port Port to forward TCP to (default 7125)
Common flags:
--log Log file path
Note: USB vendor/product IDs and reset scripts are fixed to defaults and not configurable.
"""
import os
import sys
import time
import argparse
import logging
import signal
from logging.handlers import RotatingFileHandler
import subprocess
import threading
from pathlib import Path
import stat
import termios
import struct
import select
import socket
DEFAULT_RAW_BAUD = 230400
DEFAULT_TUNNEL_BAUD = 115200
DEFAULT_IP = "127.0.0.1"
DEFAULT_PORT = 7125
USB_VID = "1d6b"
USB_PID = "0104"
GADGET_NAME = "g1"
TUNNEL_FRAME_CHUNK_SIZE = 64
DETECT_TIMEOUT = 30 # seconds to wait for device enumeration
USB_RESET_DELAY = 5 # seconds to wait after usbreset command
LOG_FILE = "/tmp/rpi_bridge.log"
USBRESET_BIN = "/opt/bin/usbreset"
MCU_RESET_SCRIPT = "/usr/bin/mcu_reset.sh"
# Global state for cleanup
shutdown_event = threading.Event()
source_server = None
source_conns = []
dest_conns = []
# --- Logging Setup ---
def setup_logging(path):
fmt = logging.Formatter('%(asctime)s rpi-bridge: %(message)s','%Y-%m-%d %H:%M:%S')
fh = RotatingFileHandler(path, maxBytes=10*1024*1024, backupCount=3)
fh.setFormatter(fmt)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(fh)
sh = logging.StreamHandler(sys.stdout)
sh.setFormatter(fmt)
root.addHandler(sh)
# --- Signal Handler with cleanup ---
def handle_signal(signum, frame):
logging.info(f"Received signal {signum}, shutting down...")
shutdown_event.set()
# cleanup source tunnel
global source_server, source_conns, dest_conns
if source_server:
try:
source_server.close()
logging.info("Closed source server socket")
except:
pass
for conn in source_conns:
try:
conn.close()
except:
pass
# cleanup destination connections
for sock in dest_conns:
try:
sock.close()
except:
pass
# --- Raw Serial Helper ---
def open_serial_fd(dev, baud):
fd = os.open(dev, os.O_RDWR | os.O_NOCTTY)
attrs = termios.tcgetattr(fd)
attrs[0] = 0; attrs[1] = 0
attrs[2] = termios.CREAD | termios.CLOCAL | termios.CS8
attrs[3] = 0
attrs[6][termios.VMIN] = 1; attrs[6][termios.VTIME] = 0
b = getattr(termios, f"B{baud}", None)
if b is None:
raise ValueError(f"Unsupported baud rate: {baud}")
attrs[4] = b; attrs[5] = b
termios.tcsetattr(fd, termios.TCSANOW, attrs)
termios.tcflush(fd, termios.TCIOFLUSH)
return fd
# --- Framing Helpers ---
def send_frame(fd, sid, cmd, payload=b""):
hdr = struct.pack(">BBH", sid, cmd, len(payload))
frame = hdr + payload
for i in range(0, len(frame), TUNNEL_FRAME_CHUNK_SIZE):
os.write(fd, frame[i:i+TUNNEL_FRAME_CHUNK_SIZE])
termios.tcdrain(fd)
def read_exact(fd, n):
buf = bytearray()
while len(buf) < n:
chunk = os.read(fd, n - len(buf))
if not chunk:
raise RuntimeError("Serial read error or disconnect")
buf.extend(chunk)
return bytes(buf)
def read_frame(fd):
hdr = read_exact(fd, 4)
sid, cmd, ln = struct.unpack(">BBH", hdr)
data = read_exact(fd, ln)
return sid, cmd, data
# --- USB Gadget / ACM Detection with Verification ---
def wait_for_usb(vid, pid):
start = time.time()
while not shutdown_event.is_set():
if subprocess.run(["lsusb", "-d", f"{vid}:{pid}"], stdout=subprocess.DEVNULL).returncode == 0:
logging.info(f"USB gadget {vid}:{pid} detected")
return
if time.time() - start > DETECT_TIMEOUT:
logging.error("Timeout waiting for USB gadget")
sys.exit(1)
time.sleep(1)
def find_acm_devices(vid, pid):
devices = []
for tty in Path('/sys/class/tty').glob('ttyACM*'):
node = (tty / 'device').resolve()
while node != node.parent:
vfile, pfile = node / 'idVendor', node / 'idProduct'
if vfile.exists() and pfile.exists() and \
vfile.read_text().strip().lower() == vid.lower() and \
pfile.read_text().strip().lower() == pid.lower():
path = f"/dev/{tty.name}"
if os.path.exists(path) and stat.S_ISCHR(os.stat(path).st_mode):
try:
test_fd = os.open(path, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
os.close(test_fd)
devices.append(path)
except:
logging.warning(f"Unresponsive ACM device skipped: {path}")
node = node.parent
devices.sort()
return devices
# --- Raw Serial Bridges ---
def serial_bridge(src, dst, baud):
def loop(rdev, wdev):
while not shutdown_event.is_set():
fr = fw = None
try:
logging.info(f"Bridge {rdev} -> {wdev} @ {baud}")
fr = open_serial_fd(rdev, baud)
fw = open_serial_fd(wdev, baud)
while not shutdown_event.is_set():
data = os.read(fr, 1024)
if data:
os.write(fw, data)
except Exception as e:
logging.error(f"Bridge failure {rdev}->{wdev}: {e}")
for fd_handle in (fr, fw):
if fd_handle is not None:
try: os.close(fd_handle)
except: pass
time.sleep(1)
threading.Thread(target=loop, args=(src, dst), daemon=True).start()
threading.Thread(target=loop, args=(dst, src), daemon=True).start()
# --- Tunnel: Source Mode ---
def tunnel_source(fd, bind_ip, bind_port):
global source_server, source_conns
source_server = socket.create_server((bind_ip, bind_port), backlog=10)
source_server.setblocking(False)
conns = {}
source_conns = []
next_id = 1
while not shutdown_event.is_set():
rlist, _, _ = select.select([fd, source_server] + list(conns.keys()), [], [], 0.1)
for s in rlist:
if s is source_server:
conn, _ = source_server.accept(); conn.setblocking(False)
source_conns.append(conn)
sid = next_id; next_id = (sid % 255) + 1
conns[conn] = sid
send_frame(fd, sid, 0)
elif s is fd:
try:
sid, cmd, data = read_frame(fd)
sock = next((c for c, i in conns.items() if i == sid), None)
if cmd == 1 and sock:
sock.sendall(data)
elif cmd == 2 and sock:
sock.close(); del conns[sock]
except Exception as e:
logging.error(f"Source frame error: {e}")
else:
sid = conns[s]
try:
d = s.recv(4096)
if d: send_frame(fd, sid, 1, d)
else: send_frame(fd, sid, 2); s.close(); del conns[s]
except:
send_frame(fd, sid, 2); s.close(); del conns[s]
# --- Gadget Setup for Destination ---
def setup_usb_gadget():
gadget_dir = Path(f"/sys/kernel/config/usb_gadget/{GADGET_NAME}")
if gadget_dir.exists():
logging.info("Gadget already configured, skipping setup")
return
subprocess.run(["modprobe", "libcomposite"], check=True)
os.makedirs(gadget_dir)
(gadget_dir / 'idVendor').write_text(USB_VID)
(gadget_dir / 'idProduct').write_text(USB_PID)
(gadget_dir / 'bcdDevice').write_text('0x0100')
(gadget_dir / 'bcdUSB').write_text('0x0200')
strings_dir = gadget_dir / 'strings' / '0x409'
os.makedirs(strings_dir)
serial = Path('/sys/firmware/devicetree/base/serial-number').read_text().strip()
(strings_dir / 'serialnumber').write_text(serial)
(strings_dir / 'manufacturer').write_text('Raspberry Pi')
(strings_dir / 'product').write_text('Multi-Serial Gadget')
cfg_str = gadget_dir / 'configs' / 'c.1' / 'strings' / '0x409'
os.makedirs(cfg_str, exist_ok=True)
(cfg_str / 'configuration').write_text('Config 1: Multi-Serial')
for i in range(3):
func = gadget_dir / 'functions' / f'acm.usb{i}'
os.makedirs(func, exist_ok=True)
try:
os.symlink(f"../functions/acm.usb{i}", gadget_dir / 'configs' / 'c.1' / f'acm.usb{i}')
except FileExistsError:
pass
udc = Path('/sys/class/udc').read_text().splitlines()[0]
(gadget_dir / 'UDC').write_text(udc)
# --- Tunnel: Destination Mode ---
def tunnel_destination(fd, fwd_ip, fwd_port):
global dest_conns
conns = {}
dest_conns = []
while not shutdown_event.is_set():
rlist, _, _ = select.select([fd] + list(conns.values()), [], [], 0.1)
for s in rlist:
if s is fd:
try:
sid, cmd, data = read_frame(fd)
if cmd == 0:
sock = socket.socket(); sock.connect((fwd_ip, fwd_port)); sock.setblocking(False)
conns[sid] = sock
dest_conns.append(sock)
elif cmd == 1 and sid in conns:
conns[sid].sendall(data)
elif cmd == 2 and sid in conns:
conns[sid].close(); del conns[sid]
except Exception as e:
logging.error(f"Destination frame error: {e}")
else:
sid = next(k for k, v in conns.items() if v == s)
try:
d = s.recv(4096)
if d: send_frame(fd, sid, 1, d)
else: send_frame(fd, sid, 2); s.close(); del conns[sid]
except:
send_frame(fd, sid, 2); s.close(); del conns[sid]
# --- Main ---
def main():
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
p = argparse.ArgumentParser()
p.add_argument("--mode", choices=["source", "destination"], required=True)
p.add_argument("--log", default=LOG_FILE)
# source args
p.add_argument("--mcu-tty")
p.add_argument("--nozzle-tty")
p.add_argument("--raw-baud", type=int, default=DEFAULT_RAW_BAUD)
p.add_argument("--bind-ip", default=DEFAULT_IP)
p.add_argument("--bind-port", type=int, default=DEFAULT_PORT)
# destination args
p.add_argument("--tunnel-baud", type=int, default=DEFAULT_TUNNEL_BAUD)
p.add_argument("--fwd-ip", default=DEFAULT_IP)
p.add_argument("--fwd-port", type=int, default=DEFAULT_PORT)
args = p.parse_args()
setup_logging(args.log)
if args.mode == "source":
wait_for_usb(USB_VID, USB_PID)
start = time.time()
while not shutdown_event.is_set():
acms = find_acm_devices(USB_VID, USB_PID)
if len(acms) >= 3:
acms = acms[:3]
break
if time.time() - start > DETECT_TIMEOUT:
logging.error("Timeout waiting for ACM devices")
sys.exit(1)
time.sleep(1)
subprocess.run([USBRESET_BIN, f"{USB_VID}:{USB_PID}"], check=True)
time.sleep(USB_RESET_DELAY)
start = time.time()
while not shutdown_event.is_set():
acms = find_acm_devices(USB_VID, USB_PID)
if len(acms) >= 3:
acms = acms[:3]
break
if time.time() - start > DETECT_TIMEOUT:
logging.error("Timeout re-detecting ACM devices")
sys.exit(1)
time.sleep(1)
serial_bridge(args.mcu_tty, acms[0], args.raw_baud)
serial_bridge(args.nozzle_tty, acms[1], args.raw_baud)
fd = open_serial_fd(acms[2], args.tunnel_baud)
threading.Thread(target=tunnel_source, args=(fd, args.bind_ip, args.bind_port), daemon=True).start()
subprocess.run([MCU_RESET_SCRIPT], check=True)
shutdown_event.wait()
else:
setup_usb_gadget()
start = time.time()
while not shutdown_event.is_set():
gs = []
for t in Path('/sys/class/tty').glob('ttyGS*'):
node = (t / 'device').resolve()
while node != node.parent:
vfile = node / 'idVendor'; pfile = node / 'idProduct'
if vfile.exists() and pfile.exists() and \
vfile.read_text().strip().lower() == USB_VID and \
pfile.read_text().strip().lower() == USB_PID:
path = f"/dev/{t.name}"
if os.path.exists(path) and stat.S_ISCHR(os.stat(path).st_mode):
try:
fd_test = os.open(path, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
os.close(fd_test)
gs.append(path)
except:
logging.warning(f"Unresponsive GS device skipped: {path}")
break
node = node.parent
gs.sort()
if gs:
dev = gs[-1]
break
if time.time() - start > DETECT_TIMEOUT:
logging.error("Timeout waiting for GS device")
sys.exit(1)
time.sleep(1)
fd = open_serial_fd(dev, args.baud)
logging.info(f"Destination tunnel on {dev} -> {args.fwd_ip}:{args.fwd_port}")
tunnel_destination(fd, args.fwd_ip, args.fwd_port)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment