Skip to content

Instantly share code, notes, and snippets.

@khenidak
Created October 6, 2018 16:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save khenidak/cf3d769b3ab4fdb4eb9eb7a143d94a8d to your computer and use it in GitHub Desktop.
Save khenidak/cf3d769b3ab4fdb4eb9eb7a143d94a8d to your computer and use it in GitHub Desktop.
# simple linux tracepoint parser that converts
# /sys/kernel/debug/tracing/events/<sub sys>/<tracepoint>/format
# into a meaningful json object for further processing.
# each event is has subsystem, id, fields, fmt (original fmt in format file)
# each field has name, size, is_array, array_size, is_pointer
# stderr has max tracepoint record size, subsystem, name
# run with `python linux_tracepoint_parser.py | jq`
import os
from pathlib import Path
import sys
import jsonpickle
all_events = list()
eventsDir = '/sys/kernel/debug/tracing/events/'
formatFileName = 'format'
class event:
def __init__(self, pathObject):
self.fields = {}
self.path = str(pathObject.absolute())
self.subsystem = pathObject.parts[len(pathObject.parts) - 2]
self.event_id = ""
self.name = ""
# short cuts
def _set_name(self, name):
self.name = name
def _set_event_id(self, event_id):
self.event_id = event_id
def _add_event_field(self, line):
parts = line.split('\t')
ef = event_field()
ef.field_def = line
# all fields are terminated with ;
# field:unsigned char common_flags
field_name_dt = parts[0].split(':')[1]
field_name_dt = field_name_dt[:-1]
field_name_dt = field_name_dt.split(' ')
field_name = field_name_dt[-1]
if '[' in field_name:
ef.array_length = field_name[field_name.find("[")+1:
field_name.find("]")]
ef.is_array = True
ef.field_name = field_name.split('[')[0]
else:
ef.field_name = field_name
ef.data_type = " ".join(field_name_dt[:-1])
if '*' in ef.data_type:
ef.is_pointer = True
# offset
ef.offset = parts[1].split(':')[1]
ef.offset = ef.offset[:-1]
# size
ef.size = parts[2].split(':')[1]
ef.size = ef.size[:-1]
# is_signed
signed = parts[3].split(':')[1]
signed = signed[:-1]
ef.is_signed = bool(int(signed))
self.fields[ef.field_name] = ef
def _line_processor(self, line):
return {
'nam': lambda o, l: (o._set_name(l.split(':')[1])), # name:
'ID:': lambda o, l: (o._set_event_id(l.split(': ')[1])), # ID:
'for': lambda o, l: (), # format:
'fie': lambda o, l: (o._add_event_field(l)),
}.get(line, None)
def parse(self, lines):
self._fmt = lines
for line in lines.splitlines():
line = line.lstrip()
processor = self._line_processor(line[:3])
if processor is not None:
processor(self, line)
class event_field:
def __init__(self):
self.field_name = ""
self.is_array = False
self.array_length = 0
self.is_pointer = False
self.field_def = ""
self.size = 0
def walk_events(path):
for dirName in os.listdir(path):
p = os.path.join(path, dirName)
if os.path.isdir(p):
format_file = Path(os.path.join(p, formatFileName))
if format_file.is_file():
with open(str(format_file.resolve()), mode='r') as fd:
lines = fd.read()
e = event(format_file)
e.parse(lines)
all_events.append(e)
else:
print('dir name: %s is not an event' % dirName)
def walk_subsystems():
for dirName in os.listdir(eventsDir):
p = os.path.join(eventsDir, dirName)
if os.path.isdir(p):
walk_events(p)
def main():
walk_subsystems()
all_as_json = jsonpickle.encode(all_events)
print(all_as_json)
max_record_size = 0
max_record_name = ""
for e in all_events:
this_record_size = 0
for f in e.feilds:
this_record_size = this_record_size + f.size
if max_record_size < this_record_size:
max_record_size = this_record_size
max_record_name = 'Subsystem:%s Event:%s Size: %d' % e.subsystem, e.name, max_record_size
# sys.stderr.write('max record: %s\n' % max_record_name)
# sys.stderr.write('total event count: %d\n' % len(all_events))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment