Skip to content

Instantly share code, notes, and snippets.

@breqdev
Created May 16, 2024 19:19
Show Gist options
  • Save breqdev/6968528b947ab1b3ddcaf0b69bc64e12 to your computer and use it in GitHub Desktop.
Save breqdev/6968528b947ab1b3ddcaf0b69bc64e12 to your computer and use it in GitHub Desktop.
Sunburst visualization of bandwidth usage by topic/namespace in a ROS 2 system
from typing import Literal
import pcapng
topics: dict[bytes, bytes] = {}
hits: dict[bytes, int] = {}
def parse_submessage(data: bytes, offset: int) -> int:
msg_id = data[offset]
endian: Literal["little"] | Literal["big"] = "little" if data[offset + 1] else "big"
length = int.from_bytes(
data[offset + 2 : offset + 4],
endian,
)
if msg_id == 0x09: # INFO_TS
pass
elif msg_id == 0x0E: # INFO_DST
pass
elif msg_id == 0x07: # HEARTBEAT
pass
elif msg_id == 0x13: # HEARTBEAT_FRAG
pass
elif msg_id == 0x06: # ACKNACK
pass
elif msg_id == 0x12: # NACK_FRAG
pass
elif msg_id == 0x15: # DATA
entity_id = data[offset + 12 : offset + 16]
writer_id = guid_prefix + entity_id
if writer_id[-4:-1] == b"\x00\x00\x03":
# subscriptions writer
data_start = offset + 16 + 8
# encapsulation kind
if data[data_start : data_start + 2] == b"\x00\x03":
# encapsulation options
assert data[data_start + 2 : data_start + 4] == b"\x00\x00"
data_start = data_start + 4
topic_name = None
topic_guid = None
while data_start < offset + 4 + length:
parameter_id = int.from_bytes(
data[data_start : data_start + 2],
endian,
)
parameter_len = int.from_bytes(
data[data_start + 2 : data_start + 4],
endian,
)
if parameter_id == 0x0005:
# topic name
topic_name = data[
data_start + 4 : data_start + 4 + parameter_len
][4:].rstrip(b"\x00")
elif parameter_id == 0x005A:
# topic guid
# 4 host, 4 app, 4 instance, 4 entity
assert parameter_len == 16
# multiple parts, entity id starts 12 blocks in
topic_guid = data[data_start + 4 : data_start + 4 + 16]
data_start += 4 + parameter_len
if (
topic_name is not None
and topic_name != b"ros_discovery_info"
and topic_guid is not None
and topic_guid[-1] == 0x03
):
if topic_guid in topics and topics[topic_guid] != topic_name:
print(
f"[WARN] same guid, different names: {topic_name.decode()} vs {topics[topic_guid].decode()}"
)
if topic_name in topics.values() and topic_guid not in topics:
expected_guid = next(
guid for guid, name in topics.items() if name == topic_name
)
print(
f"[WARN] same topic, different guids: {topic_guid.hex()} vs {expected_guid.hex()}",
topic_name,
)
topics[topic_guid] = topic_name
if writer_id[-1] != 0x03:
pass
elif writer_id in topics:
hits[topics[writer_id]] = hits.get(topics[writer_id], 0) + 1
else:
print(
"[WARN] unknown writer id",
writer_id.hex(),
)
topics[writer_id] = f"UNKNOWN ({writer_id.hex()})".encode()
elif msg_id == 0x16: # DATA_FRAG
# TODO
pass
elif msg_id == 0x08: # GAP
pass
elif msg_id == 0x01: # PAD
pass
else:
raise ValueError(f"UNKNOWN {msg_id}")
return offset + 4 + length
with open("2024-04-28-driving-2.pcapng", "rb") as f:
pcap = pcapng.FileScanner(f)
for block in pcap:
if isinstance(block, pcapng.blocks.BasePacketBlock):
# 14 byte Ethernet header
# 20 byte IP header
# 8 byte UDP header
RTPS_START = 14 + 20 + 8
if block.packet_data[RTPS_START : RTPS_START + 4] == b"RTPS":
# 4 byte magic
# 2 byte version
# 2 byte vendor ID
# 12 byte GUID prefix (4 host, 4 app, 4 instance)
guid_prefix = block.packet_data[RTPS_START + 8 : RTPS_START + 8 + 12]
RTPS_SUBMESSAGE_START = 4 + 2 + 2 + 12
submessage_idx = RTPS_START + RTPS_SUBMESSAGE_START
while submessage_idx < len(block.packet_data):
submessage_idx = parse_submessage(block.packet_data, submessage_idx)
# show top 10
for i, key in enumerate(sorted(hits, key=lambda k: hits.get(k), reverse=True)[:50]):
print(f"{i + 1}. {key.decode('utf-8')} ({hits[key]})")
def get_parent(topic: str):
if topic.count("/") == 1:
return ""
return "/".join(topic.split("/")[:-1])
topic_names = [k.decode() for k in hits if not k.decode().startswith("UNKNOWN")]
topic_parents = [get_parent(k) for k in topic_names]
topic_counts = [hits[k.encode()] for k in topic_names]
# to make plotly happy, we need to make sure the parent is in the list
for parent in topic_parents:
if parent not in topic_names:
topic_names.append(parent)
topic_parents.append(get_parent(parent))
topic_counts.append(0)
# sunburst plot
import plotly.express as px
fig = px.sunburst(
names=topic_names,
parents=topic_parents,
values=topic_counts,
)
fig.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment