Skip to content

Instantly share code, notes, and snippets.

@RoganDawes
Last active October 23, 2020 13:56
Show Gist options
  • Save RoganDawes/247c503257776ff8fe426131e1b5987d to your computer and use it in GitHub Desktop.
Save RoganDawes/247c503257776ff8fe426131e1b5987d to your computer and use it in GitHub Desktop.
Some Quick and Dirty python code for reading from two serial ports at once. Also, a Teensy sketch for doing the same using actual simultaneous UARTs.
#!/usr/bin/env python3
import serial, time, sys, threading
from colorama import Fore, Style, init as colorama_init
from termcolor import colored
colorama_init()
# lock to serialize console output
lock = threading.Lock()
class Highlight:
def __init__(self, clazz, color):
self.color = color
self.clazz = clazz
def __enter__(self):
print(self.color, end="")
def __exit__(self, type, value, traceback):
if self.clazz == Fore:
print(Fore.RESET, end="")
else:
assert self.clazz == Style
print(Style.RESET_ALL, end="")
sys.stdout.flush()
if len(sys.argv) != 3 and len(sys.argv) != 4:
sys.stderr.write("Usage: %s <baud> <port1> [<port2>]\n" % (sys.argv[0]))
exit(1)
def hexdump( start, src, length=16, sep='.' ):
'''
@brief Return {src} in hex dump.
@param[in] length {Int} Nb Bytes by row.
@param[in] sep {Char} For the text part, {sep} will be used for non ASCII char.
@return {Str} The hexdump
@note Full support for python2 and python3 !
'''
result = [];
# Python3 support
try:
xrange(0,1);
except NameError:
xrange = range;
for i in xrange(0, len(src), length):
subSrc = src[i:i+length];
hexa = '';
isMiddle = False;
for h in xrange(0,len(subSrc)):
if h == length/2:
hexa += ' ';
h = subSrc[h];
if not isinstance(h, int):
h = ord(h);
h = hex(h).replace('0x','');
if len(h) == 1:
h = '0'+h;
hexa += h+' ';
hexa = hexa.strip(' ');
text = '';
for c in subSrc:
if not isinstance(c, int):
c = ord(c);
if 0x20 <= c < 0x7F:
text += chr(c);
else:
text += sep;
result.append(('%08X: %-'+str(length*(2+1)+1)+'s |%s|') % (start + i, hexa, text));
return '\n'.join(result);
def read_serial(port, baud, color, binary):
ser = serial.Serial()
ser.port = port
ser.baudrate = baud
ser.bytesize = serial.EIGHTBITS #number of bits per bytes
ser.parity = serial.PARITY_NONE #set parity check: no parity
ser.stopbits = serial.STOPBITS_ONE #number of stop bits
# ser.timeout = None #block read
ser.timeout = 0.1 # non blocking read
ser.xonxoff = False #disable software flow control
ser.rtscts = False #disable hardware (RTS/CTS) flow control
ser.dsrdtr = False #disable hardware (DSR/DTR) flow control
ser.writeTimeout = 2 #timeout for write
pos = 0
try:
ser.open()
except Exception as e:
print("error open serial port: " + str(e))
exit()
if ser.isOpen():
try:
ser.dtr=True
while True:
c = ser.read(size=16)
if len(c) > 0:
with lock:
# sys.stdout.write((color))
if (binary):
# sys.stdout.buffer.write(bytes(colored(hexdump(pos, c, 8)+'\n', color),"ascii"))
print(colored(hexdump(pos, c, 16), color))
pos += len(c)
else:
sys.stdout.buffer.write(c)
sys.stdout.buffer.flush()
ser.close()
except Exception as e1:
print ("error communicating...: " + str(e1))
exit()
else:
print("cannot open serial port ")
exit()
# Create two threads as follows
try:
t1 = threading.Thread(target=read_serial, args=(sys.argv[2], sys.argv[1], 'green', True ))
t1.daemon = True # thread dies when main thread (only non-daemon thread) exits.
t1.start()
if len(sys.argv) == 4:
t2 = threading.Thread(target=read_serial, args=(sys.argv[3], sys.argv[1], 'red', True ))
t2.daemon = True # thread dies when main thread (only non-daemon thread) exits.
t2.start()
except:
print("Error: unable to start thread")
try:
t1.join()
except KeyboardInterrupt:
exit()
#!/usr/bin/env python3
import serial, time, sys, threading
# lock to serialize console output
lock = threading.Lock()
if len(sys.argv) != 3 and len(sys.argv) != 4:
sys.stderr.write("Usage: %s <baud> <port1> [<port2>]\n" % (sys.argv[0]))
exit(1)
def read_serial(port, baud):
ser = serial.Serial()
ser.port = port
ser.baudrate = baud
ser.bytesize = serial.EIGHTBITS #number of bits per bytes
ser.parity = serial.PARITY_NONE #set parity check: no parity
ser.stopbits = serial.STOPBITS_ONE #number of stop bits
# ser.timeout = None #block read
ser.timeout = 0.1 # non blocking read
ser.xonxoff = False #disable software flow control
ser.rtscts = False #disable hardware (RTS/CTS) flow control
ser.dsrdtr = False #enable hardware (DSR/DTR) flow control
ser.writeTimeout = 2 #timeout for write
pos = 0
try:
ser.open()
except Exception as e:
print("error open serial port: " + str(e))
exit()
if ser.isOpen():
try:
ser.dtr=True
while True:
c = ser.read(size=65)
if len(c) > 0:
with lock:
sys.stdout.buffer.write(c)
sys.stdout.buffer.flush()
ser.close()
except Exception as e1:
print ("error communicating...: " + str(e1))
exit()
else:
print("cannot open serial port ")
exit()
# Create two threads as follows
try:
t1 = threading.Thread(target=read_serial, args=(sys.argv[2], sys.argv[1]))
t1.daemon = True # thread dies when main thread (only non-daemon thread) exits.
t1.start()
if len(sys.argv) == 4:
t2 = threading.Thread(target=read_serial, args=(sys.argv[3], sys.argv[1]))
t2.daemon = True # thread dies when main thread (only non-daemon thread) exits.
t2.start()
except:
print("Error: unable to start thread")
try:
t1.join()
except KeyboardInterrupt:
exit()
#include <elapsedMillis.h>
#define PIN_D2 2
#define MAX_BUFFER 16
uint8_t buffer1[MAX_BUFFER], buffer1pos = 0, buffer3[MAX_BUFFER], buffer3pos = 0;
char buffer1prefix[] = "E> ", buffer3prefix[] = "A> ";
elapsedMillis TimeSinceRead;
void setup() {
Serial.begin(250000);
while (!Serial.dtr());
Serial1.begin(250000);
Serial3.begin(250000);
pinMode(PIN_D2, OUTPUT);
digitalWrite(PIN_D2, HIGH);
Serial.println("In process");
}
void PrintBuffer(char *prefix, uint8_t buffer[], uint8_t *buffer_pos, bool flush) {
if (*buffer_pos == 0 || (*buffer_pos < MAX_BUFFER && !flush))
return;
char buff[12 + strlen(prefix) + MAX_BUFFER*4+5];
uint8_t o = 0;
o += sprintf(buff+o, "%010lu: ", millis());
o += sprintf(buff+o, "%s", prefix);
for (uint8_t i=0; i<MAX_BUFFER; i++) {
if (i < *buffer_pos) {
o += sprintf(buff+o, "%02X ", buffer[i]);
} else {
o += sprintf(buff+o, " ");
}
}
o += sprintf(buff+o, " |");
for (uint8_t i=0; i<MAX_BUFFER; i++) {
o += sprintf(buff+o, "%c", i<*buffer_pos ? (buffer[i] >31 && buffer[i] <= 0x7F ? buffer[i] : '.') : ' ');
}
o += sprintf(buff+o, "|\r\n");
*buffer_pos = 0;
Serial.write(buff, o);
}
void PrintBuffer1(bool flush) {
PrintBuffer(buffer1prefix, buffer1, &buffer1pos, flush);
}
void PrintBuffer3(bool flush) {
PrintBuffer(buffer3prefix, buffer3, &buffer3pos, flush);
}
void loop() {
int incomingByte;
if (Serial.available() > 0) {
incomingByte = Serial.read();
if (incomingByte == 32) {
Serial.print("\r\nReset!\r\n");
digitalWrite(PIN_D2, LOW);
delay(250);
digitalWrite(PIN_D2, HIGH);
delay(250);
}
}
if (Serial1.available() > 0) {
buffer1[buffer1pos++] = Serial1.read();
TimeSinceRead = 0;
PrintBuffer1(false);
}
if (Serial3.available() > 0) {
buffer3[buffer3pos++] = Serial3.read();
TimeSinceRead = 0;
PrintBuffer3(false);
}
if (TimeSinceRead > 100) {
PrintBuffer1(true);
PrintBuffer3(true);
TimeSinceRead = 0;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment