Skip to content

Instantly share code, notes, and snippets.

@chrisdel101
Last active July 20, 2022 15:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chrisdel101/ac8d17ac32b8e7214f3fe2a21975d672 to your computer and use it in GitHub Desktop.
Save chrisdel101/ac8d17ac32b8e7214f3fe2a21975d672 to your computer and use it in GitHub Desktop.
Example of the fake serial message exchange; 2 examples of trying to write and read to it, and the resuls
# It hangs, or else I thik this would be almost correct
Python 3.7.3 (default, Jan 22 2021, 20:04:44)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from Dyno.Classes.Dyno.Dyno import Dyno
ENV development
>>> dyno = Dyno()
Listening on ('localhost', 65432)
>>> dyno.makeSerialConnection()
Opening serial: If hangs, check e-stop
ENV development
command: b'test2\r\n'
result: b'I dont understand\r\n'
# send some bytes to the serialPort right away
def makeSerialConnection(self):
try:
# ideall this would be inside the class. Niether work right now
serial_exchange = FakeSerialExchange()
# create a separate thread that listens on the master device for commands
thread = threading.Thread(
target=serial_exchange.listener,
args=[serial_exchange.pty_tuple[0]]
)
thread.start()
self.serial_port = serial.Serial(
port=serial_exchange.slave_file_name, baudrate=dyno_constants.UART_RX_BAUD_RATE, timeout=5
)
#testing it immediatley
self.serial_port.write(b'test2\r\n') # write the first command
res = b""
self.lock2.acquire()
while not res.endswith(b'\r\n'):
# read the response
res += self.serial_port.read()
print("result: %s" % res)
self.lock2.release()
self.serial_port.write(b'QPGS\r\n') # write a second command
res = b""
while not res.endswith(b'\r\n'):
# read the response
res += self.serial_port.read()
print("result: %s" % res)
# it accepts only the first write to the serial port, then nothing
>>> from Dyno.Classes.Dyno.Dyno import Dyno
>>> dyno = Dyno()
Listening on ('localhost', 65432)
>>> dyno.makeSerialConnection()
Opening serial: If hangs, check e-stop
ENV development
makeSerialConnection: created OK
>>> dyno.serial_port
Serial<id=0xb5ecc830, open=True>(port='/dev/pts/6', baudrate=230400, bytesize=8, parity='N', stopbits=1, timeout=5, xonxoff=False, rtscts=False, dsrdtr=False)
>>> dyno.serial_port.write(b'test2\r\n')
7
>>> command: b'test2\r\n'
dyno.serial_port.write(b'hello\r\n')
7
>>> dyno.serial_port.write(b'test2\r\n')
7
# send some bytes to the serialPort after init. Purpose of two sep checks is b/c this way, the proper way, was not working at all b4.
# Now it works a little bit
def makeSerialConnection(self):
try:
# ideall this would be inside the class. Niether work right now
serial_exchange = FakeSerialExchange()
# create a separate thread that listens on the master device for commands
thread = threading.Thread(
target=serial_exchange.listener,
args=[serial_exchange.pty_tuple[0]]
)
thread.start()
self.serial_port = serial.Serial(
port=serial_exchange.slave_file_name, baudrate=dyno_constants.UART_RX_BAUD_RATE, timeout=5
)
from enum import Enum
import os
import pty
import time
import threading
import logging
format = "%(asctime)s: %(message)s"
logging.basicConfig(
format=format,
level=logging.INFO,
datefmt="%H:%M:%S"
)
class FakeSerialExchange:
def __init__(self):
# create 2 terminals, master/slave
self.pty_tuple = self.createPtys()
# get name of file_desc, dyno write to this file
self.slave_file_name = self.openPty(self.pty_tuple[1])
def listener(self, master_pty):
res = b""
while not res.endswith(b"\r\n"):
# keep reading one byte at a time until we have a full line
res += os.read(master_pty, 1)
print("command: %s" % res)
# write back the response
if res == b'QPGS\r\n':
os.write(master_pty, b"correct result\r\n")
else:
os.write(master_pty, b"I dont understand\r\n")
def createPtys(self):
master, slave = pty.openpty()
return master, slave
def openPty(self, slave_pty):
filename = os.ttyname(slave_pty)
return filename
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment