Skip to content

Instantly share code, notes, and snippets.

@Salamandar
Created March 23, 2022 08:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Salamandar/bebc62a5428a22ad193d763d2d596aa3 to your computer and use it in GitHub Desktop.
Save Salamandar/bebc62a5428a22ad193d763d2d596aa3 to your computer and use it in GitHub Desktop.
Duplicate TTY serial port using socat in Python to log everything that goes through it.
#!/usr/bin/env python3
import sys
import os
import signal
import threading
import subprocess
from typing import Optional
from pathlib import Path
class SerialDuplicator(threading.Thread):
def __init__(self, real_tty: Path, fake_tty: Path, log_file: Path, *args, **kwargs):
super().__init__(*args, **kwargs)
self.real_tty = real_tty
self.fake_tty = fake_tty
self.log_file = log_file
self.tmp_file = Path(self.fake_tty.name + ".tmp")
self._stop = threading.Event()
self._subproc: Optional[subprocess.Popen] = None
def dup_tty(self):
command = f'''socat \
'{self.real_tty},raw,echo=0' \
'SYSTEM:tee {self.log_file} | socat - "PTY,link={self.fake_tty},raw,echo=0,waitslave,ignoreeof" | tee {self.tmp_file}'
'''
self._subproc = subprocess.Popen(
command, shell=True, stderr=sys.stderr, stdout=sys.stdout, start_new_session=True)
def stop(self):
self._stop.set()
if self._subproc:
os.killpg(os.getpgid(self._subproc.pid), signal.SIGTERM)
self._subproc.wait()
self._subproc = None
def stopped(self):
return self._stop.isSet()
def run(self):
while True:
self.dup_tty()
self._subproc.wait()
if self.stopped():
break
print("dup_tty stopped")
with open(self.log_file, "a", encoding="utf-8") as logfile:
logfile.write("#################### TTY DISAPPEARED\n")
# Cleanup
try:
self.tmp_file.unlink()
self.fake_tty.unlink()
except FileNotFoundError:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment