Skip to content

Instantly share code, notes, and snippets.

@KJTsanaktsidis
Created February 7, 2023 07:18
Show Gist options
  • Save KJTsanaktsidis/e02eb7145332cf8e0129addae7d541a8 to your computer and use it in GitHub Desktop.
Save KJTsanaktsidis/e02eb7145332cf8e0129addae7d541a8 to your computer and use it in GitHub Desktop.
RUBY_T_MASK = 0x1f
RUBY_T_STRING = 0x05
RUBY_T_ARRAY = 0x07
RSTRING_EMBED_LEN_SHIFT = 0x0e
RSTRING_EMBED_LEN_MASK = 0x7c000
RSTRING_NOEMBED = 0x2000
RARRAY_EMBED_FLAG = 0x2000
RARRAY_EMBED_LEN_SHIFT = 0x0f
RARRAY_EMBED_LEN_MASK = 0x18000
RUBY_ID_SCOPE_SHIFT = 0x4
ID_ENTRY_UNIT = 512
ID_ENTRY_SIZE = 2
RUBY_Qnil = 0x8
def get_ruby_thread():
this_thread_handle = int.from_bytes(gdb.selected_thread().handle(), "little")
rb_vm_t = gdb.lookup_type('rb_vm_t')
rb_thread_t = gdb.lookup_type('rb_thread_t')
vm_ptr = gdb.Value(
int(gdb.lookup_global_symbol('ruby_current_vm_ptr').value()).to_bytes(8, 'little'),
rb_vm_t.pointer()
)
thread_list_head = vm_ptr['living_threads']['n']
vmlt_offset = rb_thread_t['vmlt_node'].bitpos // 8
thread_list_el = thread_list_head['next']
while True:
thread_list_el_addr = thread_list_el.address
thread_el = gdb.Value(int(thread_list_el.address - vmlt_offset).to_bytes(8, 'little'), rb_thread_t.pointer())
thread_id = int(thread_el['thread_id'])
if thread_id == this_thread_handle:
return thread_el
thread_list_el = thread_list_el['next']
if thread_list_el == thread_list_head['next']:
break
return None
def ruby_value_flags(value):
flags_gdb_value = gdb.Value(
int(value).to_bytes(8, 'little'),
gdb.lookup_type('unsigned long').pointer()
)
return int(flags_gdb_value.dereference())
def ruby_value_type(value):
flags = ruby_value_flags(value)
return flags & RUBY_T_MASK
def ruby_value_is_nil(value):
if value is None:
return True
return int(value) == RUBY_Qnil
def ruby_array_length(value):
flags = ruby_value_flags(value)
if flags & RARRAY_EMBED_FLAG:
return (flags >> RARRAY_EMBED_LEN_SHIFT) & (RARRAY_EMBED_LEN_MASK >> RARRAY_EMBED_LEN_SHIFT)
else:
return int(value.cast(gdb.lookup_type('unsigned long').pointer())[2])
def ruby_array_ptr(value):
flags = ruby_value_flags(value)
if flags & RARRAY_EMBED_FLAG:
v2 = value.cast(gdb.lookup_type('unsigned long').pointer())[2]
return v2.address.cast(gdb.lookup_type('unsigned long').pointer())
else:
v4 = value.cast(gdb.lookup_type('unsigned long').pointer())[4]
return v4.cast(gdb.lookup_type('unsigned long').pointer())
def ruby_array_entry(value, index):
length = ruby_array_length(value)
ptr = ruby_array_ptr(value)
if length == 0:
return None
if index < 0:
index += length
if index < 0:
return None
elif length <= index:
return None
return ptr[index]
def read_ruby_string_embedded(value, flags):
length = (flags >> RSTRING_EMBED_LEN_SHIFT) & (RSTRING_EMBED_LEN_MASK >> RSTRING_EMBED_LEN_SHIFT)
ptr = gdb.Value(
(int(value) + 2 * 8).to_bytes(8, 'little'),
gdb.lookup_type('char').pointer()
)
return ptr.string(encoding='utf-8', errors='replace', length=length)
def read_ruby_string_heap(value, flags):
length = gdb.Value(
(int(value) + 2 * 8).to_bytes(8, 'little'),
gdb.lookup_type('long').pointer()
).dereference()
ptr = gdb.Value(
(int(value) + 3 * 8).to_bytes(8, 'little'),
gdb.lookup_type('char').pointer().pointer()
).dereference()
return ptr.string(encoding='utf-8', errors='replace', length=length)
def read_ruby_string(value):
flags = ruby_value_flags(value)
if flags & RSTRING_NOEMBED:
return read_ruby_string_heap(value, flags)
else:
return read_ruby_string_embedded(value, flags)
def iseq_location_get_file(iseq):
pathobj = iseq['body']['location']['pathobj']
pathobj_type = ruby_value_type(pathobj)
if pathobj_type == RUBY_T_ARRAY:
# It should be a two-element array, which is guaranteed to be embedded. Pick out
# the first VALUE from the array.
pathobj = ruby_array_entry(pathobj, 0)
pathobj_type = ruby_value_type(pathobj)
if pathobj_type != RUBY_T_STRING:
return f'(unknown pathobj, type {pathobj_type})'
return read_ruby_string(pathobj)
def iseq_location_get_label(iseq):
return read_ruby_string(iseq['body']['location']['label'])
def get_id_serial_entry(num, type_offset):
global_symbols = gdb.lookup_global_symbol('ruby_global_symbols').value()
if num != 0 and num <= int(global_symbols['last_id']):
idx = num // ID_ENTRY_UNIT
ids = global_symbols['ids'].cast(gdb.lookup_type('unsigned long'))
if idx < ruby_array_length(ids):
ary = ruby_array_entry(ids, idx)
if not ruby_value_is_nil(ary):
pos = (num % ID_ENTRY_UNIT) * ID_ENTRY_SIZE
return ruby_array_entry(ary, pos + type_offset)
def rb_id_to_serial(rbid):
last_op_id = int(gdb.lookup_static_symbol('tLAST_OP_ID').value())
if int(rbid) > last_op_id:
return int(rbid) >> RUBY_ID_SCOPE_SHIFT
else:
return int(rbid)
def ruby_id_to_str(rbid):
serial = rb_id_to_serial(rbid)
entry = get_id_serial_entry(serial, 0)
return read_ruby_string(entry)
def ruby_thread_backtrace(thread):
rb_control_frame_t = gdb.lookup_type('rb_control_frame_t')
rb_callable_method_entry_t = gdb.lookup_type('rb_callable_method_entry_t')
rb_method_definition_struct = gdb.lookup_type('struct rb_method_definition_struct')
ec = thread['ec']
cfp_ptr = ec['cfp']
end_cfp_addr = int(ec['vm_stack']) + int(ec['vm_stack_size'] * 8)
print(f'cfp_ptr: {int(cfp_ptr)}, end ptr: {end_cfp_addr}')
while int(cfp_ptr) != end_cfp_addr:
cfp = cfp_ptr.dereference()
if int(cfp['iseq']) != 0:
# Ruby frame
file = iseq_location_get_file(cfp['iseq'])
label = iseq_location_get_label(cfp['iseq'])
# Can do line numbers if we look at cfp->pc, but no big deal for now.
print(f"{file} in `{label}'")
else:
# C frame
method_entry = cfp['ep'][-2].cast(rb_callable_method_entry_t.pointer())
method_id = method_entry['def']['original_id']
method_name = ruby_id_to_str(method_id)
print(f"(cfunc) in `{method_name}'")
cfp_ptr = gdb.Value(
(int(cfp_ptr) + rb_control_frame_t.sizeof).to_bytes(8, 'little'),
rb_control_frame_t.pointer()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment