Created
May 16, 2024 19:19
-
-
Save breqdev/6968528b947ab1b3ddcaf0b69bc64e12 to your computer and use it in GitHub Desktop.
Sunburst visualization of bandwidth usage by topic/namespace in a ROS 2 system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Literal | |
import pcapng | |
topics: dict[bytes, bytes] = {} | |
hits: dict[bytes, int] = {} | |
def parse_submessage(data: bytes, offset: int) -> int: | |
msg_id = data[offset] | |
endian: Literal["little"] | Literal["big"] = "little" if data[offset + 1] else "big" | |
length = int.from_bytes( | |
data[offset + 2 : offset + 4], | |
endian, | |
) | |
if msg_id == 0x09: # INFO_TS | |
pass | |
elif msg_id == 0x0E: # INFO_DST | |
pass | |
elif msg_id == 0x07: # HEARTBEAT | |
pass | |
elif msg_id == 0x13: # HEARTBEAT_FRAG | |
pass | |
elif msg_id == 0x06: # ACKNACK | |
pass | |
elif msg_id == 0x12: # NACK_FRAG | |
pass | |
elif msg_id == 0x15: # DATA | |
entity_id = data[offset + 12 : offset + 16] | |
writer_id = guid_prefix + entity_id | |
if writer_id[-4:-1] == b"\x00\x00\x03": | |
# subscriptions writer | |
data_start = offset + 16 + 8 | |
# encapsulation kind | |
if data[data_start : data_start + 2] == b"\x00\x03": | |
# encapsulation options | |
assert data[data_start + 2 : data_start + 4] == b"\x00\x00" | |
data_start = data_start + 4 | |
topic_name = None | |
topic_guid = None | |
while data_start < offset + 4 + length: | |
parameter_id = int.from_bytes( | |
data[data_start : data_start + 2], | |
endian, | |
) | |
parameter_len = int.from_bytes( | |
data[data_start + 2 : data_start + 4], | |
endian, | |
) | |
if parameter_id == 0x0005: | |
# topic name | |
topic_name = data[ | |
data_start + 4 : data_start + 4 + parameter_len | |
][4:].rstrip(b"\x00") | |
elif parameter_id == 0x005A: | |
# topic guid | |
# 4 host, 4 app, 4 instance, 4 entity | |
assert parameter_len == 16 | |
# multiple parts, entity id starts 12 blocks in | |
topic_guid = data[data_start + 4 : data_start + 4 + 16] | |
data_start += 4 + parameter_len | |
if ( | |
topic_name is not None | |
and topic_name != b"ros_discovery_info" | |
and topic_guid is not None | |
and topic_guid[-1] == 0x03 | |
): | |
if topic_guid in topics and topics[topic_guid] != topic_name: | |
print( | |
f"[WARN] same guid, different names: {topic_name.decode()} vs {topics[topic_guid].decode()}" | |
) | |
if topic_name in topics.values() and topic_guid not in topics: | |
expected_guid = next( | |
guid for guid, name in topics.items() if name == topic_name | |
) | |
print( | |
f"[WARN] same topic, different guids: {topic_guid.hex()} vs {expected_guid.hex()}", | |
topic_name, | |
) | |
topics[topic_guid] = topic_name | |
if writer_id[-1] != 0x03: | |
pass | |
elif writer_id in topics: | |
hits[topics[writer_id]] = hits.get(topics[writer_id], 0) + 1 | |
else: | |
print( | |
"[WARN] unknown writer id", | |
writer_id.hex(), | |
) | |
topics[writer_id] = f"UNKNOWN ({writer_id.hex()})".encode() | |
elif msg_id == 0x16: # DATA_FRAG | |
# TODO | |
pass | |
elif msg_id == 0x08: # GAP | |
pass | |
elif msg_id == 0x01: # PAD | |
pass | |
else: | |
raise ValueError(f"UNKNOWN {msg_id}") | |
return offset + 4 + length | |
with open("2024-04-28-driving-2.pcapng", "rb") as f: | |
pcap = pcapng.FileScanner(f) | |
for block in pcap: | |
if isinstance(block, pcapng.blocks.BasePacketBlock): | |
# 14 byte Ethernet header | |
# 20 byte IP header | |
# 8 byte UDP header | |
RTPS_START = 14 + 20 + 8 | |
if block.packet_data[RTPS_START : RTPS_START + 4] == b"RTPS": | |
# 4 byte magic | |
# 2 byte version | |
# 2 byte vendor ID | |
# 12 byte GUID prefix (4 host, 4 app, 4 instance) | |
guid_prefix = block.packet_data[RTPS_START + 8 : RTPS_START + 8 + 12] | |
RTPS_SUBMESSAGE_START = 4 + 2 + 2 + 12 | |
submessage_idx = RTPS_START + RTPS_SUBMESSAGE_START | |
while submessage_idx < len(block.packet_data): | |
submessage_idx = parse_submessage(block.packet_data, submessage_idx) | |
# show top 10 | |
for i, key in enumerate(sorted(hits, key=lambda k: hits.get(k), reverse=True)[:50]): | |
print(f"{i + 1}. {key.decode('utf-8')} ({hits[key]})") | |
def get_parent(topic: str): | |
if topic.count("/") == 1: | |
return "" | |
return "/".join(topic.split("/")[:-1]) | |
topic_names = [k.decode() for k in hits if not k.decode().startswith("UNKNOWN")] | |
topic_parents = [get_parent(k) for k in topic_names] | |
topic_counts = [hits[k.encode()] for k in topic_names] | |
# to make plotly happy, we need to make sure the parent is in the list | |
for parent in topic_parents: | |
if parent not in topic_names: | |
topic_names.append(parent) | |
topic_parents.append(get_parent(parent)) | |
topic_counts.append(0) | |
# sunburst plot | |
import plotly.express as px | |
fig = px.sunburst( | |
names=topic_names, | |
parents=topic_parents, | |
values=topic_counts, | |
) | |
fig.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment