Created
May 27, 2018 14:08
-
-
Save notro/1063505b2eb72720dfe419ea74de1b31 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import serial | |
import sys | |
import time | |
import serial.tools.list_ports | |
import sh | |
import shutil | |
class CPboardError(BaseException): | |
pass | |
# supervisor/messages/default.h: | |
MSG_OUTPUT_SUFFIX = b" output:\r\n" | |
MSG_NEWLINE = b"\r\n" | |
MSG_AUTORELOAD_ON = b"Auto-reload is on. Simply save files over USB to run them or enter REPL to disable.\r\n" | |
MSG_AUTORELOAD_OFF = b"Auto-reload is off.\r\n" | |
MSG_SAFE_MODE_ON = b"Running in safe mode! Auto-reload is off.\r\n" | |
MSG_SAFE_MODE_NO_MAIN = b"Running in safe mode! Not running saved code.\r\n" | |
MSG_SAFE_MODE_USER_REQUESTED = b"You requested starting safe mode by " | |
MSG_SAFE_MODE_USER_EXIT = b"To exit, please reset the board without " | |
MSG_BAD_SAFE_MODE = b"You are running in safe mode which means something really bad happened." | |
MSG_SAFE_MODE_CRASH = b"Looks like our core CircuitPython code crashed hard. Whoops!" | |
MSG_SAFE_MODE_FILE_ISSUE = b"Please file an issue here with the contents of your CIRCUITPY drive:" | |
MSG_SAFE_MODE_ISSUE_LINK = b"https://github.com/adafruit/circuitpython/issues" | |
MSG_SAFE_MODE_BROWN_OUT_LINE_1 = b"The microcontroller's power dipped. Please make sure your power supply provides" | |
MSG_SAFE_MODE_BROWN_OUT_LINE_2 = b"enough power for the whole circuit and press reset (after ejecting CIRCUITPY)." | |
MSG_WAIT_BEFORE_REPL = b"Press any key to enter the REPL. Use CTRL-D to reload." | |
MSG_SOFT_REBOOT = b"soft reboot" | |
MSG_DOUBLE_FILE_EXTENSION = b"WARNING: Your code filename has two extensions\r\n" | |
class RawREPL: | |
@classmethod | |
def execute(cls, repl, code, timeout, async): | |
repl.read() # Throw away | |
repl.write(REPL.CHAR_CTRL_A) | |
repl.read_until(b'\r\n>') | |
repl.write(code) | |
repl.write(REPL.CHAR_CTRL_D) | |
if async: | |
return b'', b'' | |
repl.read_until(b'OK') | |
output = repl.read_until(b'\x04', timeout=timeout) | |
output = output[:-1] | |
error = repl.read_until(b'\x04') | |
error = error[:-1] | |
return output, error | |
class FriendlyREPL: | |
@classmethod | |
def execute(cls, repl, code, timeout, async): | |
repl.read() # Throw away | |
repl.write(REPL.CHAR_CTRL_E) | |
repl.read_until(b'=== ') | |
repl.write(code) | |
repl.write(b'\r\n#EOF_REPL') | |
repl.read_until(b'EOF_REPL') | |
repl.write(REPL.CHAR_CTRL_D) | |
if async: | |
return b'', b'' | |
output = repl.read_until(b'>>> ', timeout=timeout) | |
output = output[:-4] | |
if output.startswith(b'\r\n'): | |
output = output[2:] | |
traceback = b'Traceback (most recent call last):\r\n' | |
output, _, error = output.partition(traceback) | |
if error: | |
error = traceback + error | |
return output, error | |
class REPL: | |
CHAR_CTRL_A = b'\x01' | |
CHAR_CTRL_B = b'\x02' | |
CHAR_CTRL_C = b'\x03' | |
CHAR_CTRL_D = b'\x04' | |
CHAR_CTRL_E = b'\x05' | |
CHAR_CTRL_F = b'\x06' | |
CHAR_CTRL_K = b'\x11' | |
CHAR_CTRL_N = b'\x14' | |
CHAR_CTRL_P = b'\x16' | |
CHAR_CTRL_U = b'\x21' | |
def __init__(self, board): | |
self.board = board | |
self.write_chunk_size = 32 | |
self.mode_raw = False | |
self.safe_mode = False | |
self.session = b'' | |
def __enter__(self): | |
self.reset() | |
return self | |
def __exit__(self, exception_type, exception_value, traceback): | |
pass | |
@property | |
def serial(self): | |
return self.board.serial | |
@property | |
def raw(self): | |
self.mode_raw = True | |
return self | |
def read(self): | |
if self.serial.inWaiting(): | |
data = self.serial.read(self.serial.inWaiting()) | |
else: | |
data = b'' | |
self.session += data | |
#print('READ: ', data) | |
return data | |
def read_until(self, ending, timeout=10, data_consumer=None): | |
data = b'' | |
timeout_count = 0 | |
while True: | |
if data.endswith(ending): | |
break | |
elif self.serial.inWaiting() > 0: | |
new_data = self.serial.read(1) | |
data += new_data | |
self.session += new_data | |
if data_consumer: | |
data_consumer(new_data) | |
timeout_count = 0 | |
else: | |
timeout_count += 1 | |
if timeout is not None and timeout_count >= 100 * timeout: | |
print('SESSION: ', self.session) | |
raise TimeoutError(110, "timeout waiting for", ending) | |
time.sleep(0.01) | |
#print('READ_UNTIL: ', data) | |
return data | |
def write(self, data, chunk_size=None): | |
if chunk_size is None: | |
chunk_size = self.write_chunk_size | |
if not isinstance(data, bytes): | |
data = bytes(data, encoding='utf8') | |
for i in range(0, len(data), chunk_size): | |
chunk = data[i:min(i + chunk_size, len(data))] | |
self.session += chunk | |
self.serial.write(chunk) | |
time.sleep(0.01) | |
#print('WRITE: ', data) | |
def reset(self): | |
try: | |
self.serial.reset_input_buffer() | |
except termios.error: | |
pass | |
self.session = b'' | |
self.write(b'\r' + REPL.CHAR_CTRL_C + REPL.CHAR_CTRL_C) # interrupt any running program | |
self.write(b'\r' + REPL.CHAR_CTRL_B) # enter or reset friendly repl | |
data = self.read_until(b'>>> ') | |
def execute(self, code, timeout=10, async=False): | |
if self.mode_raw: | |
cls = RawREPL | |
else: | |
cls = FriendlyREPL | |
output, error = cls.execute(self, code, timeout, async) | |
return output, error | |
def run(self): | |
if self.mode_raw: | |
raise RuntimeError("Can't run in raw repl") | |
if self.safe_mode: | |
raise RuntimeError("Can't run in safe mode") | |
self.read() # Throw away | |
self.write(REPL.CHAR_CTRL_D) | |
data = self.read_until(b' output:\r\n') | |
if b'Running in safe mode' in data: | |
self.safe_mode = True | |
raise RuntimeError("Can't run in safe mode") | |
# TODO: MSG_SAFE_MODE_CRASH | |
# TODO: BROWNOUT | |
marker = MSG_NEWLINE + MSG_WAIT_BEFORE_REPL + MSG_NEWLINE | |
data = self.read_until(marker) | |
data = data.split(marker)[0] | |
#print('\nRUN: ', self.session) | |
#print('data: ', data) | |
# Don't know why I have strip off this... | |
if data.endswith(b'\r\n\r\n'): | |
data = data[:-4] | |
#print('data: ', data) | |
return data | |
class Disk: | |
def __init__(self, dev): | |
self.dev = os.path.realpath(dev) | |
self.mountpoint = None | |
with open('/etc/mtab', 'r') as f: | |
mtab = f.read() | |
mount = [mount.split(' ') for mount in mtab.splitlines() if mount.startswith(self.dev)] | |
if mount: | |
self._path = mount[0][1] | |
else: | |
name = os.path.basename(dev) | |
#print('name', name) | |
sh.pmount("-tvfat", dev, name) | |
self.mountpoint = "/media/" + name | |
self._path = self.mountpoint | |
def __enter__(self): | |
return self | |
def __exit__(self, exception_type, exception_value, traceback): | |
try: | |
self.close() | |
except: | |
pass | |
@property | |
def path(self): | |
return self._path | |
def close(self): | |
if not self.mountpoint: | |
return | |
#print("Mount.close") | |
mountpoint = self.mountpoint | |
self.mountpoint = None | |
start_time = time.monotonic() | |
unmounted = False | |
while not unmounted and start_time - time.monotonic() < 30: | |
try: | |
sh.pumount(mountpoint) | |
unmounted = True | |
except sh.ErrorReturnCode_5: | |
time.sleep(0.1) | |
def sync(self): | |
#disk_device = os.path.basename(os.readlink(self.dev))[:-1] | |
disk_device = os.path.basename(self.dev)[:-1] | |
os.sync() | |
# Monitor the block device so we know when the sync request is actually finished. | |
with open("/sys/block/" + disk_device + "/stat", "r") as f: | |
disk_inflight = 1 | |
last_wait_time = 0 | |
wait_time = 1 | |
while disk_inflight > 0 or wait_time > last_wait_time: | |
f.seek(0) | |
stats = f.read() | |
block_stats = stats.split() | |
disk_inflight = int(block_stats[8]) | |
last_wait_time = wait_time | |
wait_time = int(block_stats[9]) | |
def copy(self, src, dst=None, sync=True): | |
if dst is None: | |
dst = os.path.basename(src) | |
shutil.copy(src, os.path.join(self.path, dst)) | |
if sync: | |
self.sync() | |
class Firmware: | |
def __init__(self, board): | |
self.board = board | |
@property | |
def disk(self): | |
disks = self.board.get_disks() | |
if len(disks) != 1: | |
raise RuntimeError("Boot disk not found for: " + self.board.device) | |
return Disk(disks[0]) | |
@property | |
def info(self): | |
with self.disk as disk: | |
fname = os.path.join(disk.path, 'INFO_UF2.TXT') | |
with open(fname, 'r') as f: | |
info = f.read() | |
lines = info.splitlines() | |
res = {} | |
res['header'] = lines[0] | |
for line in lines[1:]: | |
k, _, v = line.partition(':') | |
res[k.replace(':', '')] = v.strip() | |
return res | |
def upload(self, fw): | |
with open(fw, 'rb') as f: | |
header = f.read(32) | |
if header[0:4] != b'UF2\n': | |
raise ValueError('Only UF2 files are supported') | |
self.board.close() | |
with self.disk as disk: | |
disk.copy(fw, sync=False) | |
class CPboard: | |
@classmethod | |
def from_try_all(cls, name, **kwargs): | |
try: | |
return CPboard.from_build_name(name, **kwargs) | |
except ValueError: | |
vendor, _, product = name.partition(':') | |
try: | |
return CPboard.from_usb(**kwargs, idVendor=int(vendor, 16), idProduct=int(product, 16)) | |
except ValueError: | |
raise | |
pass | |
return CPboard(name, **kwargs) | |
@classmethod | |
def from_build_name(cls, name, **kwargs): | |
boards = { | |
#'arduino_zero', (,) | |
'circuitplayground_express' : (0x239a, 0x8019), | |
#'feather_m0_adalogger', (,) | |
#'feather_m0_basic', (,) | |
'feather_m0_express' : (0x239a, 0x8023), | |
#'feather_m0_rfm69', (,) | |
#'feather_m0_rfm9x', (,) | |
#'feather_m0_supersized', (,) | |
#'feather_m4_express', (,) | |
#'gemma_m0', (,) | |
#'itsybitsy_m0_express', (,) | |
#'itsybitsy_m4_express', (,) | |
#'metro_m0_express', (,) | |
#'metro_m4_express', (,) | |
#'metro_m4_express_revb', (,) | |
#'pirkey_m0', (,) | |
#'trinket_m0', (,) | |
#'trinket_m0_haxpress', (,) | |
#'ugame10', (,) | |
} | |
try: | |
vendor, product = boards[name] | |
except KeyError: | |
raise ValueError("Unknown build name: " + name) | |
return CPboard.from_usb(**kwargs, idVendor=vendor, idProduct=product) | |
@classmethod | |
def from_usb(cls, baudrate=115200, wait=0, timeout=10, **kwargs): | |
import usb.core | |
dev = usb.core.find(**kwargs) | |
if not dev: | |
s = "Can't find USB device: " | |
args = [] | |
for x in kwargs.items(): | |
try: | |
args.append('%s=0x%x' % x) | |
except: | |
args.append('%s = %s' % x) | |
raise RuntimeError("Can't find USB device: " + ', '.join(args)) | |
return cls(dev, baudrate=baudrate, wait=wait, timeout=timeout) | |
def __init__(self, device, baudrate=115200, wait=0, timeout=10): | |
self.device = device | |
self.usb_dev = None | |
try: | |
# Is it a usb.core.Device? | |
portstr = ':' + '.'.join(map(str, device.port_numbers)) + ':' | |
except: | |
pass | |
else: | |
serials = [serial for serial in os.listdir("/dev/serial/by-path") if portstr in serial] | |
if len(serials) != 1: | |
raise RuntimeError("Can't find excatly one matching usb serial device") | |
self.device = os.path.realpath("/dev/serial/by-path/" + serials[0]) | |
self.usb_dev = device | |
self.baudrate = baudrate | |
self.wait = wait | |
self.timeout = timeout | |
self.debug = True | |
self.mount = None | |
self.serial = None | |
self._repl = REPL(self) | |
#self.open() | |
def __enter__(self): | |
self.open() | |
return self | |
def __exit__(self, exception_type, exception_value, traceback): | |
self.close() | |
def open(self, baudrate=None, wait=None): | |
if self.serial: | |
return | |
if baudrate is None: | |
baudrate = self.baudrate | |
if wait is None: | |
wait = self.wait | |
delayed = False | |
for attempt in range(wait + 1): | |
try: | |
self.serial = serial.Serial(self.device, baudrate=self.baudrate, timeout=self.timeout, write_timeout=self.timeout, interCharTimeout=1) | |
break | |
except (OSError, IOError): # Py2 and Py3 have different errors | |
if wait == 0: | |
continue | |
if attempt == 0: | |
sys.stdout.write('Waiting {} seconds for board '.format(wait)) | |
delayed = True | |
time.sleep(1) | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
else: | |
if delayed: | |
print('') | |
raise CPboardError('failed to access ' + self.device) | |
if delayed: | |
print('') | |
def close(self): | |
if self.serial: | |
self.serial.close() | |
self.serial = None | |
@property | |
def repl(self): | |
self._repl.mode_raw = False | |
return self._repl | |
@property | |
def rawrepl(self): | |
self._repl.mode_raw = True | |
return self._repl | |
@property | |
def exec_mode(self): | |
mode = os.environ.get('CPBOARD_EXEC_MODE') | |
if mode in ['raw', 'disk', 'friendly']: | |
return mode | |
else: | |
return 'raw' | |
@property | |
def repl_from_env(self): | |
if self.exec_mode == 'raw': | |
self._repl.mode_raw = True | |
else: | |
self._repl.mode_raw = False | |
return self._repl | |
def exec(self, command, timeout=10, async=False): | |
with self.repl_from_env as repl: | |
try: | |
output, error = repl.execute(command, timeout=timeout, async=async) | |
except OSError as e: | |
if self.debug: | |
print('exec: session: ', self.repl.session) | |
raise CPboardError('timeout', e) | |
if error: | |
raise CPboardError('exception', output, error) | |
return output | |
def eval(self, expression, timeout=10): | |
command = 'print({})'.format(expression) | |
with self.repl_from_env as repl: | |
output, error = repl.execute(command, timeout=timeout) | |
if error: | |
raise CPboardError('exception', output, error) | |
try: | |
res = eval(str(output, encoding='utf8')) | |
except: | |
raise CPboardError('failed to eval: %s' % output) | |
return res | |
def _reset(self, mode='NORMAL'): | |
self.exec("import microcontroller;microcontroller.on_next_reset(microcontroller.RunMode.%s)" % mode) | |
try: | |
self.exec("import microcontroller;microcontroller.reset()", async=True) | |
except OSError: | |
pass | |
def reset(self, safe_mode=False, delay=5): | |
self._reset('SAFE_MODE' if safe_mode else 'NORMAL') | |
self.close() | |
time.sleep(delay) | |
self.open(wait=10) | |
time.sleep(delay) | |
def reset_to_bootloader(self, repl=False): | |
if repl: | |
self._reset('BOOTLOADER') | |
self.close() | |
else: | |
self.close() | |
s = serial.Serial(self.device, 1200, write_timeout=4, timeout=4) | |
s.close() | |
def get_port_info(self): | |
portinfo = None | |
for port_iter in serial.tools.list_ports.comports(): | |
if port_iter.device == self.device: | |
portinfo = port_iter | |
break | |
return portinfo | |
@property | |
def serial_number(self): | |
try: # Maybe root permissions are needed to read the value | |
return self.usb_dev.serial_number | |
except: | |
pass | |
p = self.get_port_info() | |
return p.serial_number if p else None | |
def get_disks(self): | |
if self.usb_dev: | |
portstr = ':' + '.'.join(map(str, self.usb_dev.port_numbers)) + ':' | |
return ["/dev/disk/by-path/" + disk for disk in os.listdir("/dev/disk/by-path") if portstr in disk] | |
serial = self.serial_number | |
if not serial: | |
raise RuntimeError("Serial number not found for: " + self.device) | |
return ["/dev/disk/by-id/" + disk for disk in os.listdir("/dev/disk/by-id") if serial in disk] | |
@property | |
def disk(self): | |
disks = self.get_disks() | |
part = [part for part in disks if 'part1' in part] | |
#print(part) | |
if not part: | |
raise RuntimeError("Disk not found for: " + self.device) | |
disk = part[0] | |
#print(disk) | |
return Disk(disk) | |
@property | |
def firmware(self): | |
return Firmware(self) | |
def execfile_disk(self, filename): | |
with self.disk as disk: | |
disk.copy(filename, 'code.py') | |
with self.repl as repl: | |
try: | |
output = repl.run() | |
except OSError as e: | |
raise CPboardError('timeout', e) | |
except RuntimeError: | |
if self.repl.safe_mode: | |
raise PyboardError("Can't run in safe mode") | |
else: | |
raise | |
return output | |
def execfile(self, filename, timeout=10): | |
if self.exec_mode == 'disk': | |
return self.execfile_disk(filename) | |
else: | |
with open(filename, 'rb') as f: | |
pyfile = f.read() | |
ret = self.exec(pyfile, timeout=timeout) | |
#print('SESSION: ', self.repl.session) | |
return ret | |
# Just enough to make tests/run-tests work | |
PyboardError = CPboardError | |
class Pyboard: | |
def __init__(self, device, baudrate=115200, user='micro', password='python', wait=0): | |
self.board = CPboard(device, baudrate=baudrate, wait=wait) | |
with self.board.disk as disk: | |
disk.copy('skip_if.py') | |
def close(self): | |
self.board.close() | |
def enter_raw_repl(self): | |
self.board.open() | |
def execfile(self, filename): | |
#for f in ['repl_emacs_check.py', 'gen_stack_overflow.py']: | |
# if f in filename: | |
# return b'SKIP\n' | |
return self.board.execfile(filename) | |
def eval_namedtuple(board, command): | |
from collections import namedtuple | |
s = board.exec("print(%s)" % command) | |
s = s.decode().strip() | |
items = [key.split('=') for key in s[1:-1].split(', ')] | |
keys = [item[0] for item in items] | |
vals = [item[1] for item in items] | |
nt = namedtuple('eval', keys) | |
res = nt(*[eval(val) for val in vals]) | |
return res | |
def os_uname(board): | |
return eval_namedtuple(board, "__import__('os').uname()") | |
def upload(board, fw, verbose): | |
if verbose: | |
board.open() | |
if verbose > 1: | |
print("Serial number :", board.serial_number, flush=True) | |
if verbose: | |
print('Current version:', os_uname(board).version, flush=True) | |
if verbose > 1: | |
print('Reset to bootloader...', end='', flush=True) | |
board.reset_to_bootloader() | |
time.sleep(5) | |
if verbose > 1: | |
print('done', flush=True) | |
if verbose > 1: | |
print('Bootloader:', board.firmware.info) | |
if verbose > 1: | |
print('Upload firmware...', end='', flush=True) | |
board.firmware.upload(fw) | |
if verbose > 1: | |
print('done', flush=True) | |
if verbose > 1: | |
print('Wait for board...', end='', flush=True) | |
time.sleep(5) | |
if verbose > 1: | |
print('done', flush=True) | |
if verbose: | |
board.open(wait=10) | |
print('New version:', os_uname(board).version, flush=True) | |
def repl(board): | |
import subprocess | |
subprocess.call("screen " + board.device, shell=True) | |
def main(): | |
import argparse | |
cmd_parser = argparse.ArgumentParser(description='Circuit Python Board Tool') | |
cmd_parser.add_argument('board', help='build name, vid:pid or /dev/tty') | |
cmd_parser.add_argument('--firmware', '-f', help='upload UF2 firmware file') | |
cmd_parser.add_argument('--repl', action='store_true', help='enter REPL') | |
cmd_parser.add_argument('--tty', action='store_true', help='print tty') | |
cmd_parser.add_argument('--verbose', '-v', action='count', default=0, help='be verbose (-vv)') | |
args = cmd_parser.parse_args() | |
try: | |
board = CPboard.from_try_all(args.board) | |
except BaseException as e: | |
if args.verbose > 1: | |
raise | |
else: | |
print(e, file=sys.stderr) | |
sys.exit(1) | |
if args.verbose: | |
print('CPBOARD_EXEC_MODE =', os.environ.get('CPBOARD_EXEC_MODE')) | |
# Make sure we can open serial | |
if not args.firmware: | |
try: | |
with board: | |
pass | |
except BaseException as e: | |
if args.verbose > 1: | |
raise | |
else: | |
print(e, file=sys.stderr) | |
sys.exit(1) | |
if args.tty: | |
print(board.device) | |
elif args.repl: | |
repl(board) | |
elif args.firmware: | |
try: | |
upload(board, args.firmware, args.verbose) | |
except BaseException as e: | |
if args.verbose > 1: | |
raise | |
else: | |
print(e, file=sys.stderr) | |
sys.exit(1) | |
else: | |
with board as b: | |
print('board:', board.device) | |
uname = os_uname(b) | |
print('sysname:', uname.sysname) | |
print('nodename:', uname.nodename) | |
print('release:', uname.release) | |
print('version:', uname.version) | |
print('machine:', uname.machine) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment