Skip to content

Instantly share code, notes, and snippets.

@siku2
Last active January 6, 2023 18:54
Show Gist options
  • Save siku2/a33be938efb2b9bd54c4d3d7e9c004db to your computer and use it in GitHub Desktop.
Save siku2/a33be938efb2b9bd54c4d3d7e9c004db to your computer and use it in GitHub Desktop.
import asyncio
import concurrent.futures
import dataclasses
import re
import subprocess
import sys
import typing
import urllib.parse
import urllib.request
from http.client import HTTPResponse
_SERVER_BASE_URL = "http://localhost:19080"
_RTT_URL = f"{_SERVER_BASE_URL}/rtt.htm"
_RTT_SSE_OUTPUT_URL = f"{_SERVER_BASE_URL}/TabX_SSEOutput.cgi?rtt"
class Sse:
...
@dataclasses.dataclass(kw_only=True)
class Event(Sse):
kind: str
data: bytes
@dataclasses.dataclass(kw_only=True)
class Retry(Sse):
value: int
_MAX_LINE_LEN = 8 * 1024 * 1024
async def _sse_line_reader() -> typing.AsyncIterator[tuple[str | None, bytes]]:
loop = asyncio.get_running_loop()
# single-threaded executor
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
def inner():
resp: HTTPResponse
with urllib.request.urlopen(_RTT_SSE_OUTPUT_URL) as resp:
assert resp.status == 200
while True:
line = resp.readline(_MAX_LINE_LEN)
if line == b"":
break
# trim newline
line = line[:-1]
yield line
# create the iterator in the thread
line_iter = await loop.run_in_executor(executor, inner)
sentinel = object()
while True:
line: typing.Any = await loop.run_in_executor(
executor, next, line_iter, sentinel
)
if line is sentinel:
break
if line == b"":
yield None, b""
continue
key, _, value = line.partition(b": ")
yield key.decode("ascii"), value
async def _iter_sse() -> typing.AsyncIterator[Sse]:
line_iter = _sse_line_reader()
async for key, value in line_iter:
if key is None:
continue
match key:
case "retry":
yield Retry(value=int(value))
case "event":
kind = value.decode("utf-8")
key2, value2 = await anext(line_iter)
assert key2 == "data"
yield Event(kind=kind, data=value2)
case _:
raise TypeError(f"unknown type: {key}")
async def _rtt_post(data: dict[str, str]) -> None:
loop = asyncio.get_running_loop()
raw_data = urllib.parse.urlencode(data).encode("utf-8")
def inner():
resp: HTTPResponse
with urllib.request.urlopen(_RTT_URL, data=raw_data) as resp:
assert resp.status == 200
await loop.run_in_executor(None, inner)
async def rtt_start_stop() -> None:
print("INFO: starting RTT")
await _rtt_post(
{
"RTT_RTTAddrSet": "",
"RTT_BtnStartStop": "",
}
)
async def rtt_set_hex_format() -> None:
print("INFO: setting format to HEX")
await _rtt_post(
{
"RTT_ComboBoxDataFormat": "0",
}
)
async def rtt_clear() -> None:
print("INFO: clearing buffer")
await _rtt_post(
{
"RTT_BtnClearRx": "1",
}
)
# server caps out at 64 KiB
_MAX_LEN_BEFORE_CLEAR = 48 * 1024
_RE_HEX_BYTE = re.compile(
rb"(?:^|\s|>)\b([0-9A-F]{2})(?=\s|$)", re.MULTILINE | re.IGNORECASE
)
async def _defmt_feeder(
defmt_proc: subprocess.Popen[bytes],
) -> typing.AsyncGenerator[None, bytes | None]:
assert defmt_proc.stdin
prev_match_count = 0
clearing_fut: asyncio.Future[None] | None = None
while True:
rtt_hex = yield
assert isinstance(rtt_hex, bytes)
hex_byte_matches: list[bytes] = _RE_HEX_BYTE.findall(rtt_hex)
match_count = len(hex_byte_matches)
if match_count > prev_match_count:
new_hex_byte_matches = hex_byte_matches[prev_match_count:]
elif match_count < prev_match_count:
# assume we were cleared, so all the bytes we have can be considered new
new_hex_byte_matches = hex_byte_matches
else:
# same length, nothing new
new_hex_byte_matches = []
prev_match_count = match_count
if prev_match_count > _MAX_LEN_BEFORE_CLEAR:
if clearing_fut is None or clearing_fut.done():
clearing_fut = asyncio.create_task(rtt_clear())
if new_hex_byte_matches:
rtt_data = bytes(map(lambda b: int(b, 16), new_hex_byte_matches))
defmt_proc.stdin.write(rtt_data)
defmt_proc.stdin.flush()
async def main():
target = sys.argv[1]
defmt_proc = subprocess.Popen(
["defmt-print", "-e", target],
stdin=subprocess.PIPE,
)
defmt_feeder = _defmt_feeder(defmt_proc)
await defmt_feeder.asend(None)
wait_is_running = False
wait_is_hex = False
wait_control_block = False
async for sse in _iter_sse():
if isinstance(sse, Event) and sse.kind == "RTT_SSE_CSVPageData":
fields = sse.data.split(b"$")
is_running = fields[1] == b"1"
if not is_running:
if wait_is_running:
continue
await rtt_start_stop()
wait_is_running = True
else:
wait_is_running = False
data_format_hex = b";0|selected" in fields[21]
if not data_format_hex:
if wait_is_hex:
continue
await rtt_set_hex_format()
wait_is_hex = True
else:
wait_is_hex = False
looking_for_control_block = fields[7] == b"Not valid (yet)"
if looking_for_control_block:
if wait_control_block:
continue
print("INFO: searching for control block...")
wait_control_block = True
elif wait_control_block:
print("INFO: control block found")
wait_control_block = False
rtt_hex = fields[23]
await defmt_feeder.asend(rtt_hex)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment