Created
December 9, 2022 09:31
-
-
Save jmrobles/4acbc459620ae7ca9c145906bf61698f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial, time, sys | |
import subprocess | |
import argparse | |
import serials_find | |
def dbg_print(line=''): | |
sys.stdout.write(line + '\n') | |
sys.stdout.flush() | |
try: | |
import streamexpect | |
except ImportError: | |
sys.stdout.write("Installing pexpect") | |
subprocess.check_call([sys.executable, "-m", "pip", "install", "streamexpect"]) | |
try: | |
import streamexpect | |
except ImportError: | |
env.Execute("$PYTHONEXE -m pip install streamexpect") | |
try: | |
import streamexpect | |
except ImportError: | |
streamexpect = None | |
def etx_passthrough_init(port, requestedBaudrate): | |
sys.stdout.flush() | |
dbg_print("======== PASSTHROUGH INIT ========") | |
dbg_print(" Trying to initialize %s @ %s" % (port, requestedBaudrate)) | |
s = serial.Serial(port=port, baudrate=requestedBaudrate, | |
bytesize=8, parity='N', stopbits=1, | |
timeout=1, xonxoff=0, rtscts=0) | |
with streamexpect.wrap(s) as rl: | |
rl.flush() | |
rl.write(b"set pulses 0\n") | |
rl.expect_bytes(b"set: ", timeout=1.0) | |
rl.expect_bytes(b"> ", timeout=1.0) | |
rl.write(b"set rfmod 0 power off\n") | |
rl.expect_bytes(b"set: ", timeout=1.0) | |
rl.expect_bytes(b"> ", timeout=1.0) | |
time.sleep(1.5) | |
rl.write(b"set rfmod 0 power on\n") | |
rl.expect_bytes(b"set: ", timeout=1.0) | |
rl.expect_bytes(b"> ", timeout=1.0) | |
time.sleep(.1) | |
cmd = "serialpassthrough rfmod 0 %s" % requestedBaudrate | |
dbg_print("Enabling serial passthrough...") | |
dbg_print(" CMD: '%s'" % cmd) | |
rl.write(cmd.encode("utf-8")) | |
rl.write(b'\n') | |
time.sleep(.2) | |
s.close() | |
dbg_print("======== PASSTHROUGH DONE ========") | |
def init_passthrough(source, target, env): | |
env.AutodetectUploadPort([env]) | |
port = env['UPLOAD_PORT'] | |
etx_passthrough_init(port, env['UPLOAD_SPEED']) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser( | |
description="Initialize EdgeTX passthrough to internal module") | |
parser.add_argument("-b", "--baud", type=int, default=115200, | |
help="Baud rate for passthrough communication") | |
parser.add_argument("-p", "--port", type=str, | |
help="Override serial port autodetection and use PORT") | |
args = parser.parse_args() | |
if (args.port == None): | |
args.port = serials_find.get_serial_port() | |
etx_passthrough_init(args.port, args.baud) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial | |
import sys, glob | |
def serial_ports(): | |
""" Lists serial port names | |
:raises Exception: | |
On unsupported or unknown platforms | |
:returns: | |
A list of the serial ports available on the system | |
""" | |
result = [] | |
ports = [] | |
try: | |
from serial.tools.list_ports import comports | |
if comports: | |
print(" ** Searching flight controllers **") | |
__ports = list(comports()) | |
for port in __ports: | |
if (port.manufacturer and port.manufacturer in ['FTDI', 'Betaflight', ]) or \ | |
(port.product and "STM32" in port.product) or (port.vid and port.vid == 0x0483): | |
print(" > FC found from '%s'" % port.device) | |
ports.append(port.device) | |
except ImportError: | |
pass | |
if not ports: | |
print(" ** No FC found, find all ports **") | |
platform = sys.platform.lower() | |
if platform.startswith('win'): | |
ports = ['COM%s' % (i + 1) for i in range(256)] | |
elif platform.startswith('linux') or platform.startswith('cygwin'): | |
# this excludes your current terminal "/dev/tty" | |
#ports = glob.glob('/dev/tty[A-Za-z]*') | |
# List all ttyACM* and ttyUSB* ports only | |
ports = glob.glob('/dev/ttyACM*') | |
ports.extend(glob.glob('/dev/ttyUSB*')) | |
elif platform.startswith('darwin'): | |
ports = glob.glob('/dev/tty.usbmodem*') | |
ports.extend(glob.glob('/dev/tty.SLAB*')) | |
else: | |
raise Exception('Unsupported platform') | |
for port in ports: | |
try: | |
s = serial.Serial(port) | |
s.close() | |
result.append(port) | |
except (OSError, serial.SerialException) as error: | |
if "permission denied" in str(error).lower(): | |
raise Exception("You don't have persmission to use serial port!") | |
pass | |
result.reverse() | |
return result | |
def get_serial_port(debug=True): | |
result = serial_ports() | |
if debug: | |
print() | |
print("Detected the following serial ports on this system:") | |
for port in result: | |
print(" %s" % port) | |
print() | |
if len(result) == 0: | |
raise Exception('No valid serial port detected or port already open') | |
return result[0] | |
if __name__ == '__main__': | |
results = get_serial_port(True) | |
print("Found: %s" % (results, )) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment