Skip to content

Instantly share code, notes, and snippets.

@xssfox
Created October 26, 2022 07:53
Show Gist options
  • Save xssfox/8b320bde5c9785408e16d3ce4208abfb to your computer and use it in GitHub Desktop.
Save xssfox/8b320bde5c9785408e16d3ce4208abfb to your computer and use it in GitHub Desktop.
import kissfix # TODO do we need to worry about the python3 issue / kissfix
import serial
import os, pty, serial, tty, termios
import threading
import logging
import sys, traceback
import fcntl
import time
logger = logging.getLogger()
logging.basicConfig(level=logging.DEBUG)
# This deals with encoding and decoding KISS frames
class KissInterface():
def __init__(self, callback):
self.k = kissfix.SerialKISS('/dev/ptmx', 9600)
self.k.start()
# Override the serial interface with our own PTY file descriptor
self.control, self.user_port = pty.openpty()
self.ttyname = os.ttyname(self.user_port)
self.k.interface.fd = self.control # we need to override the the serial port with the fd from pty
tty.setraw(self.control, termios.TCSANOW) # this makes the tty act more like a serial port
# change flags to be non blocking so that buffer full doesn't cause issues
flags = fcntl.fcntl(self.control, fcntl.F_GETFL)
flags |= os.O_NONBLOCK
fcntl.fcntl(self.control, fcntl.F_SETFL, flags)
self.rx_thread = KissThread(callback, self.k)
self.rx_thread.setDaemon(True)
self.rx_thread.start()
def tx(self, bytes_in: bytes):
frame = kissfix.FEND + b'\00' + kissfix.escape_special_codes(bytes_in) + kissfix.FEND
try:
os.write(self.control, frame)
except BlockingIOError:
logging.error("PTY interface buffer is full. The connected application may have crashed or isn't reading fast enough. Data loss is likely. Alternatively you aren't using the PTY interface and should have used --no-pty. Clearing the buffer now so we can keep going")
blocking = os.get_blocking(self.user_port) # remember what the state was before
os.set_blocking(self.user_port, False)
try:
while 1:
os.read(self.user_port,32) # read off the buffer until we've cleared it
except BlockingIOError:
pass
os.set_blocking(self.user_port, blocking) # restore the state after
class KissThread(threading.Thread):
def __init__(self,callback, interface):
threading.Thread.__init__(self)
self.callback = callback
self._running = True
self.interface = interface
def run(self):
while self._running == True:
# check TNC port
for frame in self.interface.read(readmode=False):
self.callback(bytes(frame[1:])) #we strip the first two byte which is TNC port number.
def terminate(self):
self._running = False
def kiss_rx_callback(frame: bytes):
if len(frame) < 18: # I don't know the rules yet
frame += b'\x00' * 18
radio.write(frame)
logging.debug(f"Received KISS frame: {frame.hex()}")
tnc_interface = KissInterface(kiss_rx_callback)
logger.info(f'TNC port is at : {tnc_interface.ttyname}')
radio = kissfix.SerialKISS('/dev/ttyAMA0', 9600)
radio.start()
def radio_rx(frame: bytes):
tnc_interface.tx(frame)
logging.debug(f"Received KISS frame from radio: {frame.hex()}")
radio_thread = KissThread(radio_rx, radio)
radio_thread.setDaemon(True)
radio_thread.start()
print("running")
while 1:
time.sleep(0.1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment