Skip to content

Instantly share code, notes, and snippets.

@paulo-raca
Last active February 14, 2020 02:17
Show Gist options
  • Save paulo-raca/dfffdd1740e299d1000d5fcdd3964b92 to your computer and use it in GitHub Desktop.
Save paulo-raca/dfffdd1740e299d1000d5fcdd3964b92 to your computer and use it in GitHub Desktop.
Quick and dirty parser for Android traces
#!/usr/bin/env python3
"""
Parser for Android Traces.
Based on documentation from https://rhye.org/post/android-profiling-flamegraphs/
"""
from dataclasses import dataclass
from enum import Enum
import struct
import typing
from io import StringIO
from collections import defaultdict
import sys
class JavaUtil:
@staticmethod
def short_typename(classname):
return classname.split(".")[-1]
@staticmethod
def typename(signature):
primitive_type_names = {
"V": "void",
"Z": "boolean",
"B": "byte",
"C": "char",
"S": "short",
"I": "int",
"J": "long",
"F": "float",
"D": "double",
}
if signature in primitive_type_names:
return primitive_type_names[signature]
elif signature.startswith("["):
return JavaUtil.typename(signature[1:]) + "[]"
elif signature.startswith("L") and signature.endswith(";"):
return signature[1:-1].replace("/", ".")
else:
raise ValueError(f"Unknown Java type '{signature}'")
@staticmethod
def _read_signature_chunk(io):
ret = io.read(1)
while ret[-1] == "[":
ret += io.read(1)
if ret[-1] == "L":
while ret[-1] != ';':
ret += io.read(1)
return ret
@staticmethod
def parse_signature(signature):
io = StringIO(signature)
chunk = JavaUtil._read_signature_chunk(io)
if chunk != "(":
raise ValueError(f"Invalid method signature: {signature}")
args = []
while True:
chunk = JavaUtil._read_signature_chunk(io)
if chunk == ")":
break
else:
args.append(JavaUtil.typename(chunk))
ret = JavaUtil.typename(JavaUtil._read_signature_chunk(io))
return args, ret
@dataclass(frozen=True)
class Method:
id: int
file: str
classname: str
name: str
args: typing.List[str]
ret: str
def __str__(self):
return f"{JavaUtil.short_typename(self.classname)}.{self.name}({', '.join([JavaUtil.short_typename(arg) for arg in self.args])})"
@dataclass(frozen=True)
class Thread:
id: int
name: str
def __str__(self):
return f"{self.id:04} {self.name}"
class MethodAction(Enum):
Enter = 0
Exit = 1
Unroll = 2
@dataclass(frozen=True)
class TraceRecord:
thread: Thread
method: Method
action: MethodAction
cpu_time: int
wall_time: int
def __str__(self):
return f"[{self.cpu_time*1e-6:04.03}] {'+' if self.action == MethodAction.Enter else '-'} {self.method} "
class TraceParser:
def __init__(self, trace_data):
self.methods = {}
self.threads = {}
self.attrs = {}
self.trace = []
self._parse_text_section(trace_data)
self._parse_binary_section(trace_data)
def _parse_text_section(self, trace_data):
# Parse text prefix
section = None
while True:
line = trace_data.readline().decode("utf-8").strip()
if line.startswith("*"):
section = line[1:]
#print(section)
if section == "version":
version = int(trace_data.readline().decode("utf-8").strip())
if version != 3:
raise Exception("Expected trace file version 3")
elif section == "end":
break
elif section is None:
raise Exception("Not a section")
elif section == "version":
key, value = line.split("=", 1)
self.attrs[key] = value
#print(f" - {key} = {value}")
elif section == "threads":
tid, tname = line.split("\t", 1)
thread = Thread(id=int(tid), name=tname)
self.threads[thread.id] = thread
#print(f" - {thread.id} = {thread.name}")
#print(f" - {thread}")
elif section == "methods":
mid, mclass, mname, msig, mfile = line.split("\t", 4)
margs, mret = JavaUtil.parse_signature(msig)
method = Method(
id=int(mid, 0),
file=mfile,
classname=mclass,
name=mname,
args = margs,
ret = mret)
self.methods[method.id] = method
#print(f" - {method}")
def _parse_binary_section(self, trace_data):
#Parse binary call log
magic, version, offset, start_wall_time, record_size = struct.unpack("<4sHHQH", trace_data.read(18))
if magic != b"SLOW":
raise Exception("Invalid magic prefix of binary data")
if version != 3:
raise Exception(f"Expected version 3, got {version}")
#print(f"offset={offset}, start_wall_time={start_wall_time}, record_size={record_size}")
if offset < 18:
raise Exception("Offset smaller than header size! Should be at least 18")
trace_data.read(offset - 18)
while True:
try:
blob = trace_data.read(record_size)
if blob == b"":
break
if self.attrs["clock"] == "dual":
tid, mid, cputime_delta, walltime_delta = struct.unpack("<HIII", blob)
else:
tid, mid, walltime_delta = struct.unpack("<HII", blob)
cputime_delta = walltime_delta
record = TraceRecord(
thread=self.threads.get(tid),
method=self.methods.get(mid & ~3),
action=MethodAction(mid & 3),
cpu_time=cputime_delta,
wall_time=walltime_delta + start_wall_time
)
#print(record)
self.trace.append(record)
except EOFError:
break
def main(filename):
parser = TraceParser(open(filename, "rb"))
traces_by_thread = {
thread: []
for thread in list(parser.threads.values()) + [None]
}
for record in parser.trace:
traces_by_thread[record.thread].append(record)
for thread, thread_traces in traces_by_thread.items():
if not thread_traces:
continue
print()
print(f"Thread {'Unknown thread' if thread is None else thread.name} {{")
level = 1
for trace in thread_traces:
if trace.action == MethodAction.Enter:
print(f"[{trace.cpu_time*1e-6:06.03f}] {' ' * level}{trace.method} {{")
level += 1
else:
level -= 1
print(f"[{trace.cpu_time*1e-6:06.03f}] {' ' * level}}} // {trace.method}")
print(f"}} // Thread {'Unknown thread' if thread is None else thread.name} {{")
main(sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment