Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save thewh1teagle/d4d42b53a62c9ff96202ca9d4f14a87d to your computer and use it in GitHub Desktop.
Save thewh1teagle/d4d42b53a62c9ff96202ca9d4f14a87d to your computer and use it in GitHub Desktop.
import serial
import threading
import time
import sys
class STM:
def __init__(self, port: str, baudrate: str) -> None:
self.ser: serial.Serial = None
self.port = port
self.baudrate = baudrate
def run(self):
self.ser = self.open_serial_port(self.port)
self.read_serial()
def open_serial_port(self, port):
while True:
try:
ser = serial.Serial(port, baudrate=self.baudrate)
print(f"Connected to {port} at {self.baudrate} baud.")
return ser
except serial.SerialException:
print(f"Failed to connect to {port}. Retrying in 2 seconds...")
time.sleep(2)
def read_serial(self):
try:
while True:
try:
data = self.ser.readline().decode('utf-8')
print(f'Received: {data}')
except serial.SerialException:
print("Serial disconnected. Reconnecting...")
self.ser = self.open_serial_port(self.port)
except KeyboardInterrupt:
print("Read thread terminated.")
finally:
self.ser.close()
def send(self, data: str):
try:
self.ser.write(data.encode('utf-8'))
print(f'Sent: {data}')
except serial.SerialException:
print("Failed to send data. Reconnecting...")
self.ser = self.open_serial_port(self.port)
self.send(data)
if __name__ == '__main__':
stm = STM(port='COM24', baudrate='115200')
stm_thread = threading.Thread(target=stm.run)
stm_thread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment