Skip to content

Instantly share code, notes, and snippets.

@cfbolz
Created March 24, 2024 20:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cfbolz/13cadcbbef321d93fc9790dff6f60a6a to your computer and use it in GitHub Desktop.
Save cfbolz/13cadcbbef321d93fc9790dff6f60a6a to your computer and use it in GitHub Desktop.
gdb commands for debugging pypy GC problems
import gdb
import sys
import re
import sys
import os.path
# the following is a hacked copy of pypy/tool/gdb_pypy.py
MAX_DISPLAY_LENGTH = 100 # maximum number of characters displayed in rpy_string
def find_field_with_suffix(val, suffix):
"""
Return ``val[field]``, where ``field`` is the only one whose name ends
with ``suffix``. If there is no such field, or more than one, raise KeyError.
"""
names = []
for field in val.type.fields():
if field.name.endswith(suffix):
names.append(field.name)
#
if len(names) == 1:
return val[names[0]]
elif len(names) == 0:
raise KeyError("cannot find field *%s" % suffix)
else:
raise KeyError("too many matching fields: %s" % ', '.join(names))
def lookup(val, suffix):
"""
Lookup a field which ends with ``suffix`` following the rpython struct
inheritance hierarchy (i.e., looking both at ``val`` and
``val['*_super']``, recursively.
"""
try:
return find_field_with_suffix(val, suffix)
except KeyError:
baseobj = find_field_with_suffix(val, '_super')
return lookup(baseobj, suffix)
class ReloadingCommand(gdb.Command):
""" abstract baseclass for gdb commands that will re-load the file at every
invocation of the comannd, to make interactive development simpler. """
def __init__(self, gdb=None):
# dependency injection, for tests
if gdb is None:
import gdb
self.gdb = gdb
gdb.Command.__init__(self, self.COMMAND_STRING, self.gdb.COMMAND_NONE)
def invoke(self, arg, from_tty):
# some magic code to automatically reload the python file while developing
try:
import crashgdb
import importlib
importlib.reload(crashgdb)
self.__class__ = getattr(crashgdb, self.__class__.__name__)
result = self.do_invoke(arg, from_tty)
if not isinstance(result, str) and result is not None:
result = result.decode('utf-8')
print(result)
except:
import traceback
traceback.print_exc()
class RPyType(ReloadingCommand):
"""
Prints the RPython type of the expression.
E.g.:
(gdb) rpy_type l_v123
GcStruct pypy.foo.Bar { super, inst_xxx, inst_yyy }
"""
COMMAND_STRING = "rpy_type"
prog2typeids = {}
def do_invoke(self, arg, from_tty):
try:
offset = int(arg)
except ValueError:
obj = self.gdb.parse_and_eval(arg)
if obj.type.code == self.gdb.TYPE_CODE_PTR:
obj = obj.dereference()
hdr = lookup(obj, '_gcheader')
tid = hdr['h_tid']
if tid == -42: # forwarded?
return 'Forwarded'
if sys.maxsize < 2**32:
offset = tid & 0xFFFF # 32bit
else:
offset = tid & 0xFFFFFFFF # 64bit
offset = int(offset) # convert from gdb.Value to python int
typeids = self.get_typeids()
if offset in typeids:
return typeids[offset]
else:
return 'Cannot find the type with offset 0x%x' % offset
def get_typeids(self):
try:
progspace = self.gdb.current_progspace()
except AttributeError:
progspace = None
try:
return self.prog2typeids[progspace]
except KeyError:
typeids = self.load_typeids(progspace)
self.prog2typeids[progspace] = typeids
return typeids
def load_typeids(self, progspace=None):
"""
Returns a mapping offset --> description
"""
import tempfile
import zlib
vname = 'pypy_g_rpython_memory_gctypelayout_GCData.gcd_inst_typeids_z'
length = int(self.gdb.parse_and_eval('*(long*)%s' % vname))
vstart = '(char*)(((long*)%s)+1)' % vname
fname = tempfile.mktemp()
try:
self.gdb.execute('dump binary memory %s %s %s+%d' %
(fname, vstart, vstart, length))
with open(fname, 'rb') as fobj:
data = fobj.read()
return TypeIdsMap(zlib.decompress(data).splitlines(True), self.gdb)
finally:
os.remove(fname)
class TypeIdsMap(object):
def __init__(self, lines, gdb):
self.lines = lines
self.gdb = gdb
self.line2offset = {0: 0}
self.offset2descr = {0: "(null typeid)"}
def __getitem__(self, key):
value = self.get(key)
if value is None:
raise KeyError(key)
return value
def __contains__(self, key):
return self.get(key) is not None
def _fetchline(self, linenum):
if linenum in self.line2offset:
return self.line2offset[linenum]
line = self.lines[linenum]
member, descr = [x.strip() for x in line.split(None, 1)]
if sys.maxsize < 2**32:
TIDT = "int*"
else:
TIDT = "char*"
expr = ("((%s)(&pypy_g_typeinfo.%s)) - (%s)&pypy_g_typeinfo"
% (TIDT, member.decode("latin-1"), TIDT))
offset = int(self.gdb.parse_and_eval(expr))
self.line2offset[linenum] = offset
self.offset2descr[offset] = descr
#print '%r -> %r -> %r' % (linenum, offset, descr)
return offset
def get(self, offset, default=None):
# binary search through the lines, asking gdb to parse stuff lazily
if offset in self.offset2descr:
return self.offset2descr[offset]
if not (0 < offset < sys.maxsize):
return None
linerange = (0, len(self.lines))
while linerange[0] < linerange[1]:
linemiddle = (linerange[0] + linerange[1]) >> 1
offsetmiddle = self._fetchline(linemiddle)
if offsetmiddle == offset:
return self.offset2descr[offset]
elif offsetmiddle < offset:
linerange = (linemiddle + 1, linerange[1])
else:
linerange = (linerange[0], linemiddle)
return None
def is_ptr(type, gdb):
if gdb is None:
import gdb # so we can pass a fake one from the tests
return type.code == gdb.TYPE_CODE_PTR
class RPyStringPrinter(object):
"""
Pretty printer for rpython strings.
Note that this pretty prints *pointers* to strings: this way you can do "p
val" and see the nice string, and "p *val" to see the underyling struct
fields
"""
def __init__(self, val):
self.val = val
@classmethod
def lookup(cls, val, gdb=None):
t = val.type
if is_ptr(t, gdb) and t.target().tag == 'pypy_rpy_string0':
return cls(val)
return None
def to_string(self):
chars = self.val['rs_chars']
length = int(chars['length'])
items = chars['items']
res = []
for i in range(min(length, MAX_DISPLAY_LENGTH)):
c = items[i]
try:
res.append(chr(c))
except ValueError:
# it's a gdb.Value so it has "121 'y'" as repr
try:
res.append(chr(int(str(c).split(" ")[0])))
except ValueError:
# meh?
res.append(repr(c))
if length > MAX_DISPLAY_LENGTH:
res.append('...')
string = ''.join(res)
return 'r' + repr(string)
class RPyListPrinter(object):
"""
Pretty printer for rpython lists
Note that this pretty prints *pointers* to lists: this way you can do "p
val" and see the nice repr, and "p *val" to see the underyling struct
fields
"""
recursive = False
def __init__(self, val):
self.val = val
@classmethod
def lookup(cls, val, gdb=None):
t = val.type
if (is_ptr(t, gdb) and t.target().tag is not None and
re.match(r'pypy_(list|array)\d*', t.target().tag)):
return cls(val)
return None
def to_string(self):
t = self.val.type
if t.target().tag.startswith(r'pypy_array'):
if not self.val:
return 'r(null_array)'
length = int(self.val['length'])
items = self.val['items']
allocstr = ''
else:
if not self.val:
return 'r(null_list)'
length = int(self.val['l_length'])
array = self.val['l_items']
allocated = int(array['length'])
items = array['items']
allocstr = ', alloc=%d' % allocated
if RPyListPrinter.recursive:
str_items = '...'
else:
RPyListPrinter.recursive = True
try:
itemlist = []
for i in range(min(length, MAX_DISPLAY_LENGTH)):
item = items[i]
itemlist.append(str(item)) # may recurse here
if length > MAX_DISPLAY_LENGTH:
itemlist.append("...")
str_items = ', '.join(itemlist)
finally:
RPyListPrinter.recursive = False
return 'r[%s] (len=%d%s)' % (str_items, length, allocstr)
rpy_type_singleton = RPyType()
gdb.pretty_printers = [
RPyStringPrinter.lookup,
RPyListPrinter.lookup
] + gdb.pretty_printers
# ________________________________________________________________
# the rest is new stuff
class GCConstants:
LONG_BIT = 64
first_gcflag = 1 << (LONG_BIT//2)
GCFLAG_TRACK_YOUNG_PTRS = first_gcflag << 0
GCFLAG_NO_HEAP_PTRS = first_gcflag << 1
GCFLAG_VISITED = first_gcflag << 2
GCFLAG_HAS_SHADOW = first_gcflag << 3
GCFLAG_FINALIZATION_ORDERING = first_gcflag << 4
GCFLAG_EXTRA = first_gcflag << 5
GCFLAG_HAS_CARDS = first_gcflag << 6
GCFLAG_CARDS_SET = first_gcflag << 7 # <- at least one card bit is set
GCFLAG_VISITED_RMY = first_gcflag << 8
GCFLAG_PINNED = first_gcflag << 9
GCFLAG_PINNED_OBJECT_PARENT_KNOWN = GCFLAG_PINNED
GCFLAG_IGNORE_FINALIZER = first_gcflag << 10
GCFLAG_SHADOW_INITIALIZED = first_gcflag << 11
GCFLAG_DUMMY = first_gcflag << 12
gcflag_names = {name: value for name, value in locals().items() if name.startswith("GCFLAG")}
T_MEMBER_INDEX = 0xffff
T_IS_VARSIZE = 0x010000
T_HAS_GCPTR_IN_VARSIZE = 0x020000
T_IS_GCARRAY_OF_GCPTR = 0x040000
T_IS_WEAKREF = 0x080000
T_IS_RPYTHON_INSTANCE = 0x100000 # the type is a subclass of OBJECT
T_HAS_CUSTOM_TRACE = 0x200000
T_HAS_OLDSTYLE_FINALIZER = 0x400000
T_HAS_GCPTR = 0x1000000
T_HAS_MEMORY_PRESSURE = 0x2000000 # first field is memory pressure field
T_ANY_SLOW_FLAG = (T_HAS_GCPTR_IN_VARSIZE |
T_IS_GCARRAY_OF_GCPTR |
T_HAS_CUSTOM_TRACE)
gc_states = ['SCANNING', 'MARKING', 'SWEEPING', 'FINALIZING']
class RPyGCInfo(ReloadingCommand):
"""
Prints GC info about the type
"""
COMMAND_STRING = "rpy_gc_info"
def do_invoke(self, arg, from_tty):
obj = self.gdb.parse_and_eval(arg)
if obj.type.code == self.gdb.TYPE_CODE_PTR:
obj = obj.dereference()
hdr = lookup(obj, '_gcheader')
tid = hdr['h_tid']
if tid == -42: # forwarded?
return 'Forwarded'
assert sys.maxsize > 2**32 # only 64 bit atm
offset = tid & 0xFFFFFFFF
offset = int(offset) # convert from gdb.Value to python int
flags = int(tid)
typeids = rpy_type_singleton.get_typeids()
result = []
if offset in typeids:
result.append(typeids[offset].decode('ascii'))
else:
return 'Cannot find the type with offset 0x%x' % offset
result.append("Flags set:")
for name, flag in GCConstants.gcflag_names.items():
if flags & flag:
result.append(name)
gc_state = gdb.parse_and_eval("pypy_g_rpython_memory_gctypelayout_GCData->gcd_inst_gc->immgc_inst_gc_state")
result.append("GC State: " + GCConstants.gc_states[gc_state])
return "\n".join(result)
class RPyGCTest(ReloadingCommand):
COMMAND_STRING = "rpy_gc_search_heap"
def do_invoke(self, arg, from_tty):
if arg:
if arg.startswith("0x"):
arg1, arg2 = arg.split(" ")
intobj = int(arg1, 16)
arg2 = int(arg2, 16)
start = gdb.parse_and_eval("(struct pypy_object0*)0x%x" % arg2)
iterator = walk_heap_reference_pairs([start])
print(f"searching for {hex(intobj)} starting from {hex(arg2)} (very slow)")
else:
obj = self.gdb.parse_and_eval(arg)
intobj = int(obj)
iterator = walk_heap_reference_pairs()
for source, target in iterator:
if int(target) == intobj:
print(f"found reference to {target} in object at {source}")
else:
for source, target in walk_heap_reference_pairs():
print(source, target)
def root_stack_walker():
""" walk the shadow stack, which has somewhat complicated encoding """
addr = gdb.parse_and_eval("((void**)pypy_g_rpython_memory_gctypelayout_GCData->gcd_inst_root_stack_top)")
base = gdb.parse_and_eval("((void**)pypy_g_rpython_memory_gctypelayout_GCData->gcd_inst_root_stack_base)")
skip = 0
while addr != base:
addr -= 1
if skip & 1 == 0:
content = addr.dereference()
n = int(content)
if n & 1 == 0:
if content: # non-0, non-odd: a regular ptr
yield content
else:
# odd number: a skip bitmask
if n > 0:
skip = n
else:
skip = -n
skip >>= 1
def root_walker():
# first the static gc roots
prebuilt = gdb.parse_and_eval("pypy_g_rpython_memory_gctypelayout_GCData->gcd_inst_gc->immgc_inst_prebuilt_root_objects")
yield from walk_addr_stack(prebuilt)
yield from root_stack_walker()
def walk_heap_reference_pairs(startlist=None):
""" yields pairs of "source" (either a value or a string) and target """
import time
t1 = time.time()
seen = set()
if startlist:
todo = list(startlist)
else:
todo = list(root_walker())
numroots = len(todo)
pypyobjtype = gdb.parse_and_eval("(struct pypy_object0*)0").type
edges_seen = 0
while todo:
value = todo.pop()
if not value:
continue
intvalue = int(value)
if intvalue in seen:
continue
seen.add(intvalue)
try:
for target in walk_object(value.cast(pypyobjtype)):
edges_seen += 1
if edges_seen % 50000 == 0:
print(f"{edges_seen} object graph edges traced")
yield value, target
todo.append(target)
except (gdb.MemoryError, ValueError) as e:
print(f"error {e} ignored when tracing {value}")
t2 = time.time()
print(f"seen {len(seen)} objects, from {numroots} roots in {round(t2 - t1, 2)} seconds")
def walk_addr_stack(obj):
""" walk an instance of the AddressStack class (which is a linked list of
arrays of 1019 pointers).
the first of the arrays is only partially filled with used_in_last_chunk
items, all the other chunks are full."""
if obj.type.code == gdb.TYPE_CODE_PTR:
obj = obj.dereference()
used_in_last_chunk = lookup(obj, "used_in_last_chunk")
chunk = lookup(obj, "inst_chunk").dereference()
while 1:
items = lookup(chunk, "items")
for i in range(used_in_last_chunk):
yield items[i]
chunk = lookup(chunk, "next")
if not chunk:
break
chunk = chunk.dereference()
used_in_last_chunk = 1019
def walk_object(obj):
hdr = lookup(obj.dereference(), '_gcheader')
tid = hdr['h_tid'] & 0xFFFFFFFF
type_info = gdb.parse_and_eval("*(struct pypy_type_info0*)(((char*)&pypy_g_typeinfo) + %s)" % int(tid))
infobits = lookup(type_info, "infobits")
if infobits & GCConstants.T_HAS_CUSTOM_TRACE:
raise ValueError("complicated case %s" % bin(infobits))
ofstoptrs = lookup(type_info, "ofstoptrs").dereference()
length = lookup(ofstoptrs, "length")
items = lookup(ofstoptrs, "items")
voidstarstartype = gdb.parse_and_eval("(void**)0").type
charstartype = gdb.parse_and_eval("(char*)0").type
longstartype = gdb.parse_and_eval("(long*)0").type
charobj = obj.cast(charstartype)
for index in range(length):
yield (charobj + items[index]).cast(voidstarstartype).dereference()
voidstarstarobj = obj.cast(voidstarstartype)
if infobits & GCConstants.T_IS_GCARRAY_OF_GCPTR:
length = obj.cast(longstartype)[1]
for i in range(length):
item = voidstarstarobj[i + 2]
yield voidstarstarobj[i + 2]
rpy_gc_info_singleton = RPyGCInfo()
rpy_gc_test_singleton = RPyGCTest()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment