Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A SerialTransporter using multiprocessing. For use in future projects.
#!python
import serial
from multiprocessing import Queue, Process, Event, Value
import logging
class SerialTransporter(object):
def __init__(self, port, baud=115200, timeout=1):
self.port = port
self.baud = baud
self.timeout = timeout
self.isRunning = Event()
self.isRunning.clear()
self.rx_q = Queue()
self.tx_q = Queue()
self.error_counters = {
"generic": Value('i', 0)
}
def rx_process(self):
self.ser = serial.Serial(port=self.port, baudrate=self.baud,
bytesize=8, parity='N',
stopbits=1, timeout=self.timeout,
xonxoff=0,
rtscts=0)
self.isOpen = True
msg = ""
while self.isRunning.is_set():
while self.ser.inWaiting():
current = self.ser.read(1)
if (current == '\n' or current == '\r'):
self.rx_q.put(msg)
logging.info("{0}".format(msg))
msg = ""
else:
msg += current
if not self.tx_q.empty():
cmd = self.tx_q.get()
if isinstance(cmd, str):
self.ser.write(cmd)
logging.info(cmd)
def run(self):
self.r = Process(target=self.rx_process, args=())
self.isRunning.set()
self.r.start()
def join(self):
self.isRunning.clear()
self.r.join()
def send(self, x):
# try:
self.tx_q.put(x)
# except:
# logging.critical("Could not put {0} on transmit queue".format(x))
def receive(self):
if self.rx_q.empty():
return None
return self.rx_q.get()
def empty(self):
return self.rx_q.empty()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.