Skip to content

Instantly share code, notes, and snippets.

@calebccff
Last active April 24, 2024 12:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calebccff/435d944eb1a7c518d6029badf6894950 to your computer and use it in GitHub Desktop.
Save calebccff/435d944eb1a7c518d6029badf6894950 to your computer and use it in GitHub Desktop.
A script to watch U-Boot serial output and automatically decode register dumps when a synchronous abort occurs
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
# Copyright 2023 Linaro Ltd.
# Author: Caleb Connolly <caleb.connolly@linaro.org>
#
# Listens to a serial port and automatically detects and decodes
# Synchronous aborts. DO NOT run your serial monitor at the same
# time as this tool! It will configure a PTY and symlink it to
# /tmp/abortwatch.pty, you should point your serial monitor there
# instead!
from argparse import ArgumentParser
import serial
import os
import pty
import select
from threading import Thread
from queue import Queue, Empty
from time import sleep
from pathlib import Path
import re
def enqueue_output(in_fd, queue: Queue):
in_fh = os.fdopen(in_fd, 'rb')
while True:
b = in_fh.read(1)
queue.put(b)
class Port(serial.Serial):
def __init__(self, port: str, baud: int):
super().__init__(port, baud, timeout=0)
# Master , Slave
(pty_in, pty_out) = pty.openpty()
self.pty_name = os.ttyname(pty_out)
self.pty_out = os.fdopen(pty_in, 'ab', buffering=0)
self.pty_fd = pty_in
self.q = Queue()
self.t = Thread(target=enqueue_output, args=(pty_in, self.q))
self.t.daemon = True
self.t.start()
def __del__(self) -> None:
os.close(self.pty_fd) # This ought to cause the thread to die
return super().__del__()
# Wrap serial read to write all bytes read to
# the PTY
def read(self, size: int = 1) -> bytes:
data = super().read(size)
#print(f"READ {data}")
self.pty_out.write(data)
return data
def open_port(port: str, baud: int, wait_for_port: bool=True) -> Port:
retry = 3
if not os.path.exists(port):
if not wait_for_port:
print(f"Port '{port}' doesn't exit and --no-wait specified, exiting")
raise FileNotFoundError()
print(f"Waiting for port '{port}'...")
while not os.path.exists(port):
sleep(0.1)
error: Exception = None
for _ in range(retry):
try:
console = Port(port, baud)
print(f"Opened port '{port}'")
print(f"Port mirror is {console.pty_name}")
return console
except PermissionError as e:
error = e
sleep(0.05) # We might be racing with udev to set correct permissions
print(f"Failed to open port after {retry} tries...")
print(error)
def record_abort(console: Port, firstline: str) -> [str]:
print(f"{firstline}", end="")
abort = [firstline]
console.timeout = None # Blocking read
state = 0
while True:
line = console.readline().decode()
abort.append(line)
if state == 1:
if line.startswith("Backtrace:"):
pass
elif line.strip().startswith("<0x"):
pass
elif len(line.strip()) == 0:
pass
else:
console.timeout = 0
return abort
if line.startswith("Code: "):
state = 1
# console.timeout = 0 # Non-blocking again
# return abort
print(f"{line}", end="")
if len(abort) > 24 + 10 * state:
print("!!! Code line not found after 24 lines, giving up")
console.timeout = 0
return abort
def detect_abort(console: Port) -> [str]:
line = ""
while True:
(fds, _, _) = select.select([console.fd], [], [], 0.05)
for fd in fds:
match fd:
case console.fd:
data = console.read_until()
try:
line += data.decode()
except UnicodeDecodeError:
line += repr(data)
while True:
try:
c = console.q.get_nowait()
console.write(c)
except Empty:
break
if len(line) == 0 or line[-1] != '\n':
continue
# If we see a new line, check if the last line was an abort
if "\"Synchronous Abort\" handler" not in line:
line = ""
continue
print("\n\n")
# Strip the start of the line
line = line[line.index("\"Synchronous Abort\" handler"):]
return record_abort(console, line)
# Parse the u-boot.map file and find address, offset, file, and function
# that the LR register address corresponds to (ie, where u-boot crashed)
def find_func_file(addr: int, outdir: str) -> tuple[int, int, str, str]:
syms = open(os.path.join(outdir, "u-boot.sym")).readlines()
syms.sort()
with open(os.path.join(outdir, "u-boot.sym.sorted"), "w") as f:
f.write("".join(syms))
func = None
last_func = None
fn_addr = 0
last_addr = 0
file = None
match_sym = re.compile(r"^([a-fA-F0-9]+).* ([a-zA-Z0-9_.-]+)$")
while len(syms) > 0:
line = syms.pop(0)
m = match_sym.search(line)
#print(f"{line} -- {m}")
if not m:
continue
fn_addr = int(m.group(1), 16)
if fn_addr == 0:
continue
last_func = func
func = m.group(2)
#print(m.groups())
if fn_addr > addr:
break
last_addr = fn_addr
if len(syms) == 0:
print(f"Couldn't find function for address {addr:#x}")
return (0, 0, "", "")
# now find the file...
lines = open(os.path.join(outdir, "u-boot.map")).readlines()
while "Discarded input sections" not in lines.pop(0):
continue
is_next = False
for line in lines:
if is_next:
file = line.strip().split()[-1]
break
if last_func in line:
is_next = True
return (last_addr, addr - last_addr, file, last_func)
# Parse the register dump, find the nearest function symbol and decode the object
# file it belongs to, then find the line number in the file of the instruction that
# caused the abort and print it. Or just print the file path
def decode_abort(abort: [str], outdir: str):
cross_compile = ""
if "CROSS_COMPILE" in os.environ:
cross_compile = os.environ["CROSS_COMPILE"]
code = os.popen(f"bash -c 'echo \"{abort[-1]}\" | scripts/decodecode'").readlines()
for line in code:
print(f"\t{line}", end="")
print()
elr = int(abort[1].split(":")[1].strip().split(" ")[0].strip(), 16)
#print(f"Searching for function @ [{elr:#x}]")
(addr, offset, file, func) = find_func_file(elr, outdir)
if addr == 0:
return
print(f"Crashed in {func}() +{offset:#x} (symbol addr {addr:#x})")
decomp_path = os.path.join("/tmp/", f"{os.path.basename(file)[:-2]}.S")
os.popen(f"bash -c '{cross_compile}objdump -Sdl {os.path.join(outdir, file)} > {decomp_path}'").readlines()
decomp = open(decomp_path).readlines()
m_func = None
path = None
match_path = re.compile(r"^([/a-zA-Z0-9_.-]+:\d+)")
match_func = re.compile(r"^[a-fA-F0-9]+ <([a-zA-Z0-9_]+)>:$")
match_off = re.compile(r"^\s*([a-fA-F0-9]+):")
for (i, line) in enumerate(decomp):
# Keep track of the last source path in the decomp
m = match_path.search(line)
if m:
path = m.group(1)
continue
m = match_func.search(line)
if not m and not m_func:
continue
elif m:
m_func = m.group(1)
if m_func != func:
continue
# if ":" not in line:
# continue
m = match_off.search(line)
if not m:
continue
off = int(m.group(1), 16)
if off == offset:
print()
if path:
print(f"U-Boot crashed in {Path(path).resolve()}")
print(f"Disassembly: {decomp_path}:{i+1}")
return
print(f"Couldn't automatically determine crashing line in {decomp_path}")
if __name__ == '__main__':
ap = ArgumentParser(description='Synchronous abort decoder')
ap.add_argument('--outdir', '-o', help='U-Boot output directory', default='.')
ap_action = ap.add_subparsers(dest='action', required=True)
ap_decode = ap_action.add_parser('decode', help='Decode a single abort from a file')
ap_decode.add_argument('decode', help='File to decode')
ap_port = ap_action.add_parser('port', help='Serial port to watch on')
ap_port.add_argument('--baudrate', '-b', help='Serial port baud rate', type=int,
default=115200)
ap_port.add_argument('--no-wait', help="Don't wait for serial port device",
action='store_false', default=False)
ap_port.add_argument('port', help='Serial port device')
args = ap.parse_args()
if args.action == 'decode':
abort = open(args.decode).readlines()
decode_abort(abort, args.outdir)
exit(0)
console = None
while True:
if console is None:
console = open_port(args.port, args.baudrate, not args.no_wait)
abort = ""
try:
abort = detect_abort(console)
decode_abort(abort, args.outdir)
except serial.serialutil.SerialException as e:
print(f"Serial exception {e}")
if not args.no_wait:
console = None
continue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment