Skip to content

Instantly share code, notes, and snippets.

Last active May 25, 2017 14:15
Show Gist options
  • Save dummerbd/75316ab8c109b5bbc46b to your computer and use it in GitHub Desktop.
Save dummerbd/75316ab8c109b5bbc46b to your computer and use it in GitHub Desktop.
Asyncio compatible pyserial adapter
import asyncio
import threading
import queue
import serial
import logging
logger = logging.getLogger(__name__)
class LinesBuffer:
Simple buffer for splitting a byte string on new lines.
def __init__(self):
self._buffer = bytearray()
self._last_i = 0
def add(self, data):
Add raw data bytes to the buffer.
self._buffer += data
def next(self, maxlen):
Get the next line from the buffer or `maxlen` number of characters or
None if `maxlen` number of characters are not available yet and a new
line character hasn't been encountered yet. All newline characters are
normalized to `\n`.
line = None
i, c = 0, 0
N, R = 13, 10
# Pick up where we left off last
for i in range(self._last_i, len(self._buffer)):
c = self._buffer[i]
if c == N or c == R:
line = self._buffer[:i]
# Save where we left off
self._last_i = i
# Max characters read in with no newline
if line is None and len(self._buffer) >= maxlen:
line = self._buffer[:maxlen]
self._buffer = self._buffer[maxlen:]
self._last_i = 0
# No newline
return line
# No new-lines read
if line is None:
return None
# Consume new line character
i += 1
# Consume paired new line character if present
p = self._buffer[i] if i < len(self._buffer) else None
if p is not None and c != p and (p == N or p == R):
i += 1
# Trim buffer
self._buffer = self._buffer[i:]
self._last_i = 0
# Return line with a \n on the end
return line + b'\n'
def clear(self):
Clear the bufffer.
self._buffer = bytearray()
class SerialTransport(asyncio.Transport):
Threaded serial transport.
write_buffer_size = 16
write_buffer_low = 4
write_buffer_high = 10
def __init__(self, loop, serial_factory, protocol):
self._loop = loop
self._protocol = protocol
self._is_closing = False
self._pause_reading = False
self._extra_info = {}
self._out = queue.Queue(self.write_buffer_size)
self._out_high = self.write_buffer_high
self._out_low = self.write_buffer_low
self._out_paused = False
self._out_lock = threading.Lock()
self._worker_thread = threading.Thread(
target=self._worker, name='SerialTransport.worker', daemon=True, args=(serial_factory,))
self._loop.call_soon(self._protocol.connection_made, self)
def _worker(self, serial_factory):
Worker thread that handles the blocking calls of the serial port.
ser = serial_factory()
ser.timeout = 1.0
ser.writeTimeout = 1.0
buf = LinesBuffer()
read_in = []
# Record some extra info about the serial port
self._extra_info = {k: getattr(ser, k, None) for k in [
'name', 'port', 'baudrate', 'bytesize', 'parity', 'stopbits'
while not self._is_closing and ser.isOpen():
# Get output data to write
data = self._out.get_nowait()
except queue.Empty:
# Resume writing if needed
# Read in any incoming data
if ser.inWaiting() > 0:
data =
line =
if line:
# Only call data_received when not paused
if not self._pause_reading and len(read_in) > 0:
lines = bytearray()
for line in read_in:
lines += line
self._loop.call_soon_threadsafe(self._protocol.data_received, lines)
read_in = []
except Exception as exc:
if ser and ser.isOpen():
self._extra_info = {}
def _maybe_pause_protocol(self, threadsafe=False):
This is a modified version of the ascio.protocol._FlowControlMixin
method of the same name.
It is called when the input buffer increases in size.
with self._out_lock:
size = self.get_write_buffer_size()
if size <= self._out_high:
if not self._out_paused:
self._out_paused = True
if threadsafe:
except Exception as exc:
'message': 'protocol.pause_writing() failed',
'exception': exc,
'transport': self,
'protocol': self._protocol,
def _maybe_resume_protocol(self, threadsafe=False):
This is a modified version of the ascio.protocol._FlowControlMixin
method of the same name.
It is called when the input buffer decreases in size.
with self._out_lock:
if self._out_paused and self.get_write_buffer_size() <= self._out_low:
self._out_paused = False
if threadsafe:
except Exception as exc:
'message': 'protocol.resume_writing() failed',
'exception': exc,
'transport': self,
'protocol': self._protocol,
def _close(self, exc):
Serial port closed after exception.
self._loop.call_soon_threadsafe(self._protocol.connection_lost, exc)
def close(self):
Close the serial port.
self._is_closing = True
def is_closing(self):
Is the serial port closed/closing?
return self._is_closing
def get_extra_info(self, name, default=None):
Provide extra info about the serial port.
return self._extra_info.get(name, default)
def pause_reading(self):
Pause reading until resume_reading is called.
self._pause_reading = True
def resume_reading(self):
Resume reading if it was paused.
self._pause_reading = False
def abort(self):
Abort write operations.
def write(self, data):
Write data to the serial port.
success = True
except queue.Full:
success = False
return success
def writelines(self, data_list):
Write multiple lines to the serial port.
for data in data_list:
def get_write_buffer_size(self):
Get the size of the write buffer.
return self._out.qsize()
def get_write_buffer_limits(self):
Get the high and low water marks for the write buffer.
return (self._out_low, self._out_high)
def set_write_buffer_limits(self, high=None, low=None):
Set the high and low water marks for the write buffer.
self._out_high = high if high is not None else self.write_buffer_high
if self._out_high == 0:
self._out_low = 0
elif low is None:
self._out_low = self._out_high // 2
self._out_low = min(self._out_high, low)
def can_write_eof(self):
EOF not supported.
return False
def write_eof(self):
Not supported.
raise NotImplementedError
def create_connection(protocol_factory, loop=None, **serial_kwargs):
Create a serial streaming transport connection to a serial port.
if loop is None:
loop = asyncio.get_event_loop()
serial_factory = lambda: serial.Serial(**serial_kwargs)
protocol = protocol_factory()
return SerialTransport(loop, serial_factory, protocol), protocol
def open_connection(limit=1024, loop=None, **serial_kwargs):
Get a stream reader and a stream writer for a serial connection.
if loop is None:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(limit=limit, loop=loop)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
transport, _ = yield from create_connection(lambda: protocol, loop=loop, **serial_kwargs)
writer = asyncio.StreamWriter(transport, protocol, reader, loop)
return reader, writer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment