Skip to content

Instantly share code, notes, and snippets.

@Novakov
Created July 18, 2020 14:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Novakov/9983f53ee28c6f1e291c7461fcc048a1 to your computer and use it in GitHub Desktop.
Save Novakov/9983f53ee28c6f1e291c7461fcc048a1 to your computer and use it in GitHub Desktop.
import serial
import serial.tools.list_ports
import threading
from time import sleep
class PortChecker:
def __init__(self, port_name: str):
self.port_name = port_name
self.thread = threading.Thread(target=self._run)
self._abort = False
def _run(self):
p = serial.Serial(self.port_name, baudrate=115200, timeout=1)
while not self._abort:
try:
l = p.read(1024)
if l != b'':
print(f'[{self.port_name}] GOT!!! {l}')
else:
print(f'[{self.port_name}] Nothing')
except:
print(f'[{self.port_name}] Timeout')
def start(self):
self.thread.start()
def stop(self):
self._abort = True
def wait_stop(self):
self.thread.join()
def main():
ports = serial.tools.list_ports.comports()
print(f'Found {len(ports)} serial ports')
checkers = []
for port in ports:
checkers.append(PortChecker(port.device))
print('Starting checkers')
for checker in checkers:
checker.start()
try:
while True:
sleep(0.5)
except KeyboardInterrupt:
pass
print('Stopping checkers')
for checker in checkers:
checker.stop()
for checker in checkers:
checker.wait_stop()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment