Skip to content

Instantly share code, notes, and snippets.

@notro
Created May 27, 2018 14:08
Show Gist options
  • Save notro/1063505b2eb72720dfe419ea74de1b31 to your computer and use it in GitHub Desktop.
Save notro/1063505b2eb72720dfe419ea74de1b31 to your computer and use it in GitHub Desktop.
import os
import re
import serial
import sys
import time
import serial.tools.list_ports
import sh
import shutil
class CPboardError(BaseException):
pass
# supervisor/messages/default.h:
MSG_OUTPUT_SUFFIX = b" output:\r\n"
MSG_NEWLINE = b"\r\n"
MSG_AUTORELOAD_ON = b"Auto-reload is on. Simply save files over USB to run them or enter REPL to disable.\r\n"
MSG_AUTORELOAD_OFF = b"Auto-reload is off.\r\n"
MSG_SAFE_MODE_ON = b"Running in safe mode! Auto-reload is off.\r\n"
MSG_SAFE_MODE_NO_MAIN = b"Running in safe mode! Not running saved code.\r\n"
MSG_SAFE_MODE_USER_REQUESTED = b"You requested starting safe mode by "
MSG_SAFE_MODE_USER_EXIT = b"To exit, please reset the board without "
MSG_BAD_SAFE_MODE = b"You are running in safe mode which means something really bad happened."
MSG_SAFE_MODE_CRASH = b"Looks like our core CircuitPython code crashed hard. Whoops!"
MSG_SAFE_MODE_FILE_ISSUE = b"Please file an issue here with the contents of your CIRCUITPY drive:"
MSG_SAFE_MODE_ISSUE_LINK = b"https://github.com/adafruit/circuitpython/issues"
MSG_SAFE_MODE_BROWN_OUT_LINE_1 = b"The microcontroller's power dipped. Please make sure your power supply provides"
MSG_SAFE_MODE_BROWN_OUT_LINE_2 = b"enough power for the whole circuit and press reset (after ejecting CIRCUITPY)."
MSG_WAIT_BEFORE_REPL = b"Press any key to enter the REPL. Use CTRL-D to reload."
MSG_SOFT_REBOOT = b"soft reboot"
MSG_DOUBLE_FILE_EXTENSION = b"WARNING: Your code filename has two extensions\r\n"
class RawREPL:
@classmethod
def execute(cls, repl, code, timeout, async):
repl.read() # Throw away
repl.write(REPL.CHAR_CTRL_A)
repl.read_until(b'\r\n>')
repl.write(code)
repl.write(REPL.CHAR_CTRL_D)
if async:
return b'', b''
repl.read_until(b'OK')
output = repl.read_until(b'\x04', timeout=timeout)
output = output[:-1]
error = repl.read_until(b'\x04')
error = error[:-1]
return output, error
class FriendlyREPL:
@classmethod
def execute(cls, repl, code, timeout, async):
repl.read() # Throw away
repl.write(REPL.CHAR_CTRL_E)
repl.read_until(b'=== ')
repl.write(code)
repl.write(b'\r\n#EOF_REPL')
repl.read_until(b'EOF_REPL')
repl.write(REPL.CHAR_CTRL_D)
if async:
return b'', b''
output = repl.read_until(b'>>> ', timeout=timeout)
output = output[:-4]
if output.startswith(b'\r\n'):
output = output[2:]
traceback = b'Traceback (most recent call last):\r\n'
output, _, error = output.partition(traceback)
if error:
error = traceback + error
return output, error
class REPL:
CHAR_CTRL_A = b'\x01'
CHAR_CTRL_B = b'\x02'
CHAR_CTRL_C = b'\x03'
CHAR_CTRL_D = b'\x04'
CHAR_CTRL_E = b'\x05'
CHAR_CTRL_F = b'\x06'
CHAR_CTRL_K = b'\x11'
CHAR_CTRL_N = b'\x14'
CHAR_CTRL_P = b'\x16'
CHAR_CTRL_U = b'\x21'
def __init__(self, board):
self.board = board
self.write_chunk_size = 32
self.mode_raw = False
self.safe_mode = False
self.session = b''
def __enter__(self):
self.reset()
return self
def __exit__(self, exception_type, exception_value, traceback):
pass
@property
def serial(self):
return self.board.serial
@property
def raw(self):
self.mode_raw = True
return self
def read(self):
if self.serial.inWaiting():
data = self.serial.read(self.serial.inWaiting())
else:
data = b''
self.session += data
#print('READ: ', data)
return data
def read_until(self, ending, timeout=10, data_consumer=None):
data = b''
timeout_count = 0
while True:
if data.endswith(ending):
break
elif self.serial.inWaiting() > 0:
new_data = self.serial.read(1)
data += new_data
self.session += new_data
if data_consumer:
data_consumer(new_data)
timeout_count = 0
else:
timeout_count += 1
if timeout is not None and timeout_count >= 100 * timeout:
print('SESSION: ', self.session)
raise TimeoutError(110, "timeout waiting for", ending)
time.sleep(0.01)
#print('READ_UNTIL: ', data)
return data
def write(self, data, chunk_size=None):
if chunk_size is None:
chunk_size = self.write_chunk_size
if not isinstance(data, bytes):
data = bytes(data, encoding='utf8')
for i in range(0, len(data), chunk_size):
chunk = data[i:min(i + chunk_size, len(data))]
self.session += chunk
self.serial.write(chunk)
time.sleep(0.01)
#print('WRITE: ', data)
def reset(self):
try:
self.serial.reset_input_buffer()
except termios.error:
pass
self.session = b''
self.write(b'\r' + REPL.CHAR_CTRL_C + REPL.CHAR_CTRL_C) # interrupt any running program
self.write(b'\r' + REPL.CHAR_CTRL_B) # enter or reset friendly repl
data = self.read_until(b'>>> ')
def execute(self, code, timeout=10, async=False):
if self.mode_raw:
cls = RawREPL
else:
cls = FriendlyREPL
output, error = cls.execute(self, code, timeout, async)
return output, error
def run(self):
if self.mode_raw:
raise RuntimeError("Can't run in raw repl")
if self.safe_mode:
raise RuntimeError("Can't run in safe mode")
self.read() # Throw away
self.write(REPL.CHAR_CTRL_D)
data = self.read_until(b' output:\r\n')
if b'Running in safe mode' in data:
self.safe_mode = True
raise RuntimeError("Can't run in safe mode")
# TODO: MSG_SAFE_MODE_CRASH
# TODO: BROWNOUT
marker = MSG_NEWLINE + MSG_WAIT_BEFORE_REPL + MSG_NEWLINE
data = self.read_until(marker)
data = data.split(marker)[0]
#print('\nRUN: ', self.session)
#print('data: ', data)
# Don't know why I have strip off this...
if data.endswith(b'\r\n\r\n'):
data = data[:-4]
#print('data: ', data)
return data
class Disk:
def __init__(self, dev):
self.dev = os.path.realpath(dev)
self.mountpoint = None
with open('/etc/mtab', 'r') as f:
mtab = f.read()
mount = [mount.split(' ') for mount in mtab.splitlines() if mount.startswith(self.dev)]
if mount:
self._path = mount[0][1]
else:
name = os.path.basename(dev)
#print('name', name)
sh.pmount("-tvfat", dev, name)
self.mountpoint = "/media/" + name
self._path = self.mountpoint
def __enter__(self):
return self
def __exit__(self, exception_type, exception_value, traceback):
try:
self.close()
except:
pass
@property
def path(self):
return self._path
def close(self):
if not self.mountpoint:
return
#print("Mount.close")
mountpoint = self.mountpoint
self.mountpoint = None
start_time = time.monotonic()
unmounted = False
while not unmounted and start_time - time.monotonic() < 30:
try:
sh.pumount(mountpoint)
unmounted = True
except sh.ErrorReturnCode_5:
time.sleep(0.1)
def sync(self):
#disk_device = os.path.basename(os.readlink(self.dev))[:-1]
disk_device = os.path.basename(self.dev)[:-1]
os.sync()
# Monitor the block device so we know when the sync request is actually finished.
with open("/sys/block/" + disk_device + "/stat", "r") as f:
disk_inflight = 1
last_wait_time = 0
wait_time = 1
while disk_inflight > 0 or wait_time > last_wait_time:
f.seek(0)
stats = f.read()
block_stats = stats.split()
disk_inflight = int(block_stats[8])
last_wait_time = wait_time
wait_time = int(block_stats[9])
def copy(self, src, dst=None, sync=True):
if dst is None:
dst = os.path.basename(src)
shutil.copy(src, os.path.join(self.path, dst))
if sync:
self.sync()
class Firmware:
def __init__(self, board):
self.board = board
@property
def disk(self):
disks = self.board.get_disks()
if len(disks) != 1:
raise RuntimeError("Boot disk not found for: " + self.board.device)
return Disk(disks[0])
@property
def info(self):
with self.disk as disk:
fname = os.path.join(disk.path, 'INFO_UF2.TXT')
with open(fname, 'r') as f:
info = f.read()
lines = info.splitlines()
res = {}
res['header'] = lines[0]
for line in lines[1:]:
k, _, v = line.partition(':')
res[k.replace(':', '')] = v.strip()
return res
def upload(self, fw):
with open(fw, 'rb') as f:
header = f.read(32)
if header[0:4] != b'UF2\n':
raise ValueError('Only UF2 files are supported')
self.board.close()
with self.disk as disk:
disk.copy(fw, sync=False)
class CPboard:
@classmethod
def from_try_all(cls, name, **kwargs):
try:
return CPboard.from_build_name(name, **kwargs)
except ValueError:
vendor, _, product = name.partition(':')
try:
return CPboard.from_usb(**kwargs, idVendor=int(vendor, 16), idProduct=int(product, 16))
except ValueError:
raise
pass
return CPboard(name, **kwargs)
@classmethod
def from_build_name(cls, name, **kwargs):
boards = {
#'arduino_zero', (,)
'circuitplayground_express' : (0x239a, 0x8019),
#'feather_m0_adalogger', (,)
#'feather_m0_basic', (,)
'feather_m0_express' : (0x239a, 0x8023),
#'feather_m0_rfm69', (,)
#'feather_m0_rfm9x', (,)
#'feather_m0_supersized', (,)
#'feather_m4_express', (,)
#'gemma_m0', (,)
#'itsybitsy_m0_express', (,)
#'itsybitsy_m4_express', (,)
#'metro_m0_express', (,)
#'metro_m4_express', (,)
#'metro_m4_express_revb', (,)
#'pirkey_m0', (,)
#'trinket_m0', (,)
#'trinket_m0_haxpress', (,)
#'ugame10', (,)
}
try:
vendor, product = boards[name]
except KeyError:
raise ValueError("Unknown build name: " + name)
return CPboard.from_usb(**kwargs, idVendor=vendor, idProduct=product)
@classmethod
def from_usb(cls, baudrate=115200, wait=0, timeout=10, **kwargs):
import usb.core
dev = usb.core.find(**kwargs)
if not dev:
s = "Can't find USB device: "
args = []
for x in kwargs.items():
try:
args.append('%s=0x%x' % x)
except:
args.append('%s = %s' % x)
raise RuntimeError("Can't find USB device: " + ', '.join(args))
return cls(dev, baudrate=baudrate, wait=wait, timeout=timeout)
def __init__(self, device, baudrate=115200, wait=0, timeout=10):
self.device = device
self.usb_dev = None
try:
# Is it a usb.core.Device?
portstr = ':' + '.'.join(map(str, device.port_numbers)) + ':'
except:
pass
else:
serials = [serial for serial in os.listdir("/dev/serial/by-path") if portstr in serial]
if len(serials) != 1:
raise RuntimeError("Can't find excatly one matching usb serial device")
self.device = os.path.realpath("/dev/serial/by-path/" + serials[0])
self.usb_dev = device
self.baudrate = baudrate
self.wait = wait
self.timeout = timeout
self.debug = True
self.mount = None
self.serial = None
self._repl = REPL(self)
#self.open()
def __enter__(self):
self.open()
return self
def __exit__(self, exception_type, exception_value, traceback):
self.close()
def open(self, baudrate=None, wait=None):
if self.serial:
return
if baudrate is None:
baudrate = self.baudrate
if wait is None:
wait = self.wait
delayed = False
for attempt in range(wait + 1):
try:
self.serial = serial.Serial(self.device, baudrate=self.baudrate, timeout=self.timeout, write_timeout=self.timeout, interCharTimeout=1)
break
except (OSError, IOError): # Py2 and Py3 have different errors
if wait == 0:
continue
if attempt == 0:
sys.stdout.write('Waiting {} seconds for board '.format(wait))
delayed = True
time.sleep(1)
sys.stdout.write('.')
sys.stdout.flush()
else:
if delayed:
print('')
raise CPboardError('failed to access ' + self.device)
if delayed:
print('')
def close(self):
if self.serial:
self.serial.close()
self.serial = None
@property
def repl(self):
self._repl.mode_raw = False
return self._repl
@property
def rawrepl(self):
self._repl.mode_raw = True
return self._repl
@property
def exec_mode(self):
mode = os.environ.get('CPBOARD_EXEC_MODE')
if mode in ['raw', 'disk', 'friendly']:
return mode
else:
return 'raw'
@property
def repl_from_env(self):
if self.exec_mode == 'raw':
self._repl.mode_raw = True
else:
self._repl.mode_raw = False
return self._repl
def exec(self, command, timeout=10, async=False):
with self.repl_from_env as repl:
try:
output, error = repl.execute(command, timeout=timeout, async=async)
except OSError as e:
if self.debug:
print('exec: session: ', self.repl.session)
raise CPboardError('timeout', e)
if error:
raise CPboardError('exception', output, error)
return output
def eval(self, expression, timeout=10):
command = 'print({})'.format(expression)
with self.repl_from_env as repl:
output, error = repl.execute(command, timeout=timeout)
if error:
raise CPboardError('exception', output, error)
try:
res = eval(str(output, encoding='utf8'))
except:
raise CPboardError('failed to eval: %s' % output)
return res
def _reset(self, mode='NORMAL'):
self.exec("import microcontroller;microcontroller.on_next_reset(microcontroller.RunMode.%s)" % mode)
try:
self.exec("import microcontroller;microcontroller.reset()", async=True)
except OSError:
pass
def reset(self, safe_mode=False, delay=5):
self._reset('SAFE_MODE' if safe_mode else 'NORMAL')
self.close()
time.sleep(delay)
self.open(wait=10)
time.sleep(delay)
def reset_to_bootloader(self, repl=False):
if repl:
self._reset('BOOTLOADER')
self.close()
else:
self.close()
s = serial.Serial(self.device, 1200, write_timeout=4, timeout=4)
s.close()
def get_port_info(self):
portinfo = None
for port_iter in serial.tools.list_ports.comports():
if port_iter.device == self.device:
portinfo = port_iter
break
return portinfo
@property
def serial_number(self):
try: # Maybe root permissions are needed to read the value
return self.usb_dev.serial_number
except:
pass
p = self.get_port_info()
return p.serial_number if p else None
def get_disks(self):
if self.usb_dev:
portstr = ':' + '.'.join(map(str, self.usb_dev.port_numbers)) + ':'
return ["/dev/disk/by-path/" + disk for disk in os.listdir("/dev/disk/by-path") if portstr in disk]
serial = self.serial_number
if not serial:
raise RuntimeError("Serial number not found for: " + self.device)
return ["/dev/disk/by-id/" + disk for disk in os.listdir("/dev/disk/by-id") if serial in disk]
@property
def disk(self):
disks = self.get_disks()
part = [part for part in disks if 'part1' in part]
#print(part)
if not part:
raise RuntimeError("Disk not found for: " + self.device)
disk = part[0]
#print(disk)
return Disk(disk)
@property
def firmware(self):
return Firmware(self)
def execfile_disk(self, filename):
with self.disk as disk:
disk.copy(filename, 'code.py')
with self.repl as repl:
try:
output = repl.run()
except OSError as e:
raise CPboardError('timeout', e)
except RuntimeError:
if self.repl.safe_mode:
raise PyboardError("Can't run in safe mode")
else:
raise
return output
def execfile(self, filename, timeout=10):
if self.exec_mode == 'disk':
return self.execfile_disk(filename)
else:
with open(filename, 'rb') as f:
pyfile = f.read()
ret = self.exec(pyfile, timeout=timeout)
#print('SESSION: ', self.repl.session)
return ret
# Just enough to make tests/run-tests work
PyboardError = CPboardError
class Pyboard:
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0):
self.board = CPboard(device, baudrate=baudrate, wait=wait)
with self.board.disk as disk:
disk.copy('skip_if.py')
def close(self):
self.board.close()
def enter_raw_repl(self):
self.board.open()
def execfile(self, filename):
#for f in ['repl_emacs_check.py', 'gen_stack_overflow.py']:
# if f in filename:
# return b'SKIP\n'
return self.board.execfile(filename)
def eval_namedtuple(board, command):
from collections import namedtuple
s = board.exec("print(%s)" % command)
s = s.decode().strip()
items = [key.split('=') for key in s[1:-1].split(', ')]
keys = [item[0] for item in items]
vals = [item[1] for item in items]
nt = namedtuple('eval', keys)
res = nt(*[eval(val) for val in vals])
return res
def os_uname(board):
return eval_namedtuple(board, "__import__('os').uname()")
def upload(board, fw, verbose):
if verbose:
board.open()
if verbose > 1:
print("Serial number :", board.serial_number, flush=True)
if verbose:
print('Current version:', os_uname(board).version, flush=True)
if verbose > 1:
print('Reset to bootloader...', end='', flush=True)
board.reset_to_bootloader()
time.sleep(5)
if verbose > 1:
print('done', flush=True)
if verbose > 1:
print('Bootloader:', board.firmware.info)
if verbose > 1:
print('Upload firmware...', end='', flush=True)
board.firmware.upload(fw)
if verbose > 1:
print('done', flush=True)
if verbose > 1:
print('Wait for board...', end='', flush=True)
time.sleep(5)
if verbose > 1:
print('done', flush=True)
if verbose:
board.open(wait=10)
print('New version:', os_uname(board).version, flush=True)
def repl(board):
import subprocess
subprocess.call("screen " + board.device, shell=True)
def main():
import argparse
cmd_parser = argparse.ArgumentParser(description='Circuit Python Board Tool')
cmd_parser.add_argument('board', help='build name, vid:pid or /dev/tty')
cmd_parser.add_argument('--firmware', '-f', help='upload UF2 firmware file')
cmd_parser.add_argument('--repl', action='store_true', help='enter REPL')
cmd_parser.add_argument('--tty', action='store_true', help='print tty')
cmd_parser.add_argument('--verbose', '-v', action='count', default=0, help='be verbose (-vv)')
args = cmd_parser.parse_args()
try:
board = CPboard.from_try_all(args.board)
except BaseException as e:
if args.verbose > 1:
raise
else:
print(e, file=sys.stderr)
sys.exit(1)
if args.verbose:
print('CPBOARD_EXEC_MODE =', os.environ.get('CPBOARD_EXEC_MODE'))
# Make sure we can open serial
if not args.firmware:
try:
with board:
pass
except BaseException as e:
if args.verbose > 1:
raise
else:
print(e, file=sys.stderr)
sys.exit(1)
if args.tty:
print(board.device)
elif args.repl:
repl(board)
elif args.firmware:
try:
upload(board, args.firmware, args.verbose)
except BaseException as e:
if args.verbose > 1:
raise
else:
print(e, file=sys.stderr)
sys.exit(1)
else:
with board as b:
print('board:', board.device)
uname = os_uname(b)
print('sysname:', uname.sysname)
print('nodename:', uname.nodename)
print('release:', uname.release)
print('version:', uname.version)
print('machine:', uname.machine)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment