Skip to content

Instantly share code, notes, and snippets.

@dogtopus
Last active March 15, 2024 08:40
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dogtopus/aba02ea5abaae6d0e66552b50ba21ca9 to your computer and use it in GitHub Desktop.
Save dogtopus/aba02ea5abaae6d0e66552b50ba21ca9 to your computer and use it in GitHub Desktop.
Convert sigrok JSON trace containing UART HCI traffic to HCI PCAP file. Requires scapy.
#!/usr/bin/env python3
from __future__ import annotations
'''
Convert sigrok JSON trace containing UART HCI traffic to HCI PCAP file.
The JSON trace files are typically acquired with:
sigrok-cli -i some-sigrok-session.sr -P uart:baudrate=<hci-baud> -A uart=tx-data:rx-data --protocol-decoder-jsontrace > some-json-trace-file.json
Calling with piping is also possibe:
sigrok-cli -i some-sigrok-session.sr -P uart:baudrate=<hci-baud> -A uart=tx-data:rx-data --protocol-decoder-jsontrace | python3 trace2pcap.py - some-pcap.pcap
If pins are not named as "RX" and "TX" in the sigrok session file ("RXD" and "TXD" might also work. Not sure), extra parameters may needed for -P in order for the decoder to recognize the signals.
Note that the trace file should only contain TX and RX channels or unexpected things may happen. Therefore the -A parameter is necessary.
This can probably be easily adapted to also generate USBLL PCAP files from sigrok sessions but such possibility is not explored (and Wireshark support for USBLL is still slightly inferior compare to even sigrok which is already somewhat baremetal).
'''
from typing import (
TypedDict,
NamedTuple,
Tuple,
List,
Dict,
Iterator,
TextIO,
Optional,
Sequence,
)
import argparse
import json
import sys
import time
from scapy.all import wrpcap, Packet
from scapy.layers.bluetooth import HCI_PHDR_Hdr, HCI_Hdr
class TraceEvent(TypedDict):
ph: str
ts: float
pid: str
tid: str
name: str
class ByteEvent(NamedTuple):
byte: int
span_us: Tuple[float, float]
decoder: str
channel: str
class ChannelState(TypedDict):
last_byte_ends_at: float
pkt_starts: float
pkt_content: bytearray
EventKey = Tuple[str, str, str]
def parse_args():
p = argparse.ArgumentParser(description='Convert sigrok JSON trace containing UART HCI traffic to HCI PCAP file.')
p.add_argument('jsontrace', help='Chromium JSON trace file generated with sigrok-cli using --protocol-decoder-jsontrace. Use - to read from stdin.')
p.add_argument('pcap', help='Output PCAP file.')
p.add_argument('-0', '--epoch', type=float, default=time.time(), help='Set epoch for the PCAP dump (default is now).')
p.add_argument('-b', '--break-threshold', type=float, default=25.0, help='Set break threshold in microseconds (max amount of time between 2 bytes for the program to consider them in different packets) (default is 25.0).')
p.add_argument('-d', '--decoder-name', dest='decoder_names', action='append', help='Name of the sigrok protocol decoder responsible for UART decoding. Can be specified multiple times. Defaults to including all.')
p.add_argument('-r', '--reverse-rxtx', '--assume-controller', dest='reverse_rxtx', action='store_true', default=False, help='Assume the captured data were seen at the controller\'s perspective (i.e. TX=controller to host and RX=host to controller).')
return p, p.parse_args()
def load_jsontrace(jsontrace_file: TextIO) -> Iterator[ByteEvent]:
jsontrace_obj = json.load(jsontrace_file)
jsontrace: List[TraceEvent] = jsontrace_obj['traceEvents']
openings: Dict[EventKey, float] = {}
for ev in jsontrace:
ev_key: EventKey = ev['pid'], ev['tid'], ev['name']
if ev['ph'] == 'B':
if ev_key in openings:
raise ValueError(f'Overlapping event on the same channel detected: {ev}.')
openings[ev_key] = ev['ts']
elif ev['ph'] == 'E':
if ev_key not in openings:
raise ValueError(f'Unmatched event closing: {ev}.')
yield ByteEvent(int(ev['name'], base=16), (openings[ev_key], ev['ts']), ev['pid'], ev['tid'])
del openings[ev_key]
def generate_scapy_packets(events: Iterator[ByteEvent], decoder_names: Optional[Sequence[str]], break_threshold_us: float, epoch: float, reverse_rxtx: bool) -> Iterator[Packet]:
rxtx_mapping: Dict[str, int] = (
{'RX': 1, 'TX': 0},
{'RX': 0, 'TX': 1},
)[reverse_rxtx]
channel_states: Dict[str, ChannelState] = {
'RX': {'last_byte_ends_at': -1, 'pkt_starts': -1, 'pkt_content': bytearray()},
'TX': {'last_byte_ends_at': -1, 'pkt_starts': -1, 'pkt_content': bytearray()},
}
for byte in events:
if decoder_names is not None and len(decoder_names) != 0 and byte.decoder not in decoder_names:
continue
channel_state = channel_states[byte.channel]
# init packet start time
if channel_state['pkt_starts'] < 0:
channel_state['pkt_starts'] = byte.span_us[0]
# if we have recorded the ending time for last byte and the difference
# with the current byte's opening time is over the break threshold,
# split and yield the packet
if channel_state['last_byte_ends_at'] >= 0 and byte.span_us[0] - channel_state['last_byte_ends_at'] >= break_threshold_us:
scapy_packet = HCI_PHDR_Hdr(direction=rxtx_mapping[byte.channel]) / HCI_Hdr(channel_state['pkt_content'])
scapy_packet.time = epoch + channel_state['pkt_starts'] / 1000 / 1000
yield scapy_packet
channel_state['pkt_starts'] = byte.span_us[0]
channel_state['pkt_content'].clear()
# update ending time and buffer the byte
channel_state['pkt_content'].append(byte.byte)
channel_state['last_byte_ends_at'] = byte.span_us[1]
# handle last packets for each channel (earlier one first)
for channel, state in sorted(channel_states.items(), key=lambda state: state[1]['pkt_starts']):
if len(state['pkt_content']) != 0:
scapy_packet = HCI_PHDR_Hdr(direction=rxtx_mapping[channel]) / HCI_Hdr(state['pkt_content'])
scapy_packet.time = epoch + state['pkt_starts'] / 1000 / 1000
yield scapy_packet
if __name__ == '__main__':
_, args = parse_args()
if args.jsontrace != '-':
f = open(args.jsontrace, 'r')
else:
f = sys.stdin
with f:
wrpcap(args.pcap, generate_scapy_packets(load_jsontrace(f), args.decoder_names, args.break_threshold, args.epoch, args.reverse_rxtx))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment