Skip to content

Instantly share code, notes, and snippets.

@ants
Created December 1, 2015 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ants/83aae98380f0fa09c43e to your computer and use it in GitHub Desktop.
Save ants/83aae98380f0fa09c43e to your computer and use it in GitHub Desktop.
import os, struct, re
def cached_property(accessor):
cache_name = '_cached_%s' % accessor.__name__
def property_cacher(obj):
try:
return getattr(obj, cache_name)
except AttributeError:
value = accessor(obj)
setattr(obj, cache_name, value)
return value
return property(property_cacher)
class Cluster(object):
def __init__(self, path):
self.path = path
self.page_size = 0x2000
def database(self, oid):
return Database(self, oid)
@property
def databases(self):
for filename in os.listdir(os.path.join(self.path, 'base')):
yield filename
class Database(object):
def __init__(self, cluster, oid):
self.cluster = cluster
self.oid = oid
@property
def path(self):
return os.path.join(self.cluster.path, 'base', str(self.oid))
def filenode(self, oid):
return FileNode(self, oid)
@property
def filenodes(self):
for filename in os.listdir(self.path):
if re.match('[0-9]+$', filename):
yield filename
class FileNode(object):
def __init__(self, db, oid):
self.db = db
self.oid = oid
self._fd = None
self._size = None
@property
def path(self):
return os.path.join(self.db.path, str(self.oid))
@cached_property
def size(self):
return os.path.getsize(self.path)
@cached_property
def num_pages(self):
return self.size / self.db.cluster.page_size
def open(self):
self._fd = open(self.path)
def close(self):
self._fd.close()
def page(self, index):
if index >= self.num_pages:
return None
if not self._fd:
self.open()
self._fd.seek(index * self.db.cluster.page_size)
data = self._fd.read(self.db.cluster.page_size)
return Page(self, index, data)
def outLSN(v):
return "%x/%08x" % (v>>32,v & 0xFFFFFFFF)
def outXID(x):
return "%08x (%d)" % (x,x)
pg_flag_defs = [
(0x1, "PD_HAS_FREE_LINES"),
(0x2, "PD_PAGE_FULL"),
(0x4, "PD_ALL_VISIBLE"),
(~0x7, "!EXTRA GARBAGE!"),
]
def flags_to_text(flags, defs):
return ", ".join(name for f, name in defs if flags & f)
page_version_numbers = {
0: '<7.3',
1: '7.3-7.4',
2: '8.0',
3: '8.1-8.2',
4: '8.3+',
}
def page_version_to_text(ver):
return "%d (%s)" % (ver, page_version_numbers.get(ver, "!!! INVALID !!!"))
def warning(cond, msg):
return (" !!! %s !!!" % msg) if cond else ""
def info(cond, msg):
return (" %s"%msg) if cond else ""
def group_entries(seq, key=lambda x:x):
i = 0
prev_key = None
prev_value = None
cur_start = None
for i, cur_value in enumerate(seq):
cur_key = key(cur_value)
if cur_key != prev_key:
if i > 0:
yield (cur_start,i-1), prev_value
cur_start = i
prev_key = cur_key
prev_value = cur_value
if cur_start is not None:
yield (cur_start, i), prev_value
class Page(object):
def __init__(self, filenode, index, data):
self.filenode = filenode
self.index = index
self.data = data
lsn_a, lsn_b, self.tli, self.flags, self.pd_lower, self.pd_upper, \
self.pd_special, self.pd_pagesize_version, self.pd_prune_xid = \
struct.unpack("IIHHHHHHI", data[0:24])
self.lsn = (lsn_a << 32) + lsn_b
@property
def page_size(self):
return self.pd_pagesize_version & 0xFF00
@property
def page_version(self):
return self.pd_pagesize_version & 0xFF
@property
def line_pointers(self):
for i in xrange(24,self.pd_lower,4):
yield LinePointer(self.data[i:i+4])
def tuple(self, lp):
return Tuple(self.data[lp.lp_off:lp.lp_off+lp.lp_len])
def debug(self):
s = "Page 0x%06x of filenode %d:\n" % (self.index, self.filenode.oid)
s += " LSN: %s\n" % outLSN(self.lsn)
s += " TLI: %d\n" % self.tli
s += " Flags: %02x (%s)\n" % (self.flags, flags_to_text(self.flags, pg_flag_defs))
s += " Space: 0x%04x - 0x%04x (%d bytes)%s%s\n" % (
self.pd_lower, self.pd_upper, self.pd_upper - self.pd_lower,
warning(self.pd_lower > self.pd_upper, "Negative free space"),
info(self.pd_special < self.filenode.db.cluster.page_size,
"Special: 0x%04x (%d bytes)" % (
self.pd_special,
self.filenode.db.cluster.page_size - self.pd_special)),
)
s += " Page size: %d bytes Version: %s\n" % (self.page_size, page_version_to_text(self.page_version))
s += " Prune XID: %s\n" % outXID(self.pd_prune_xid)
s += " Tuples:\n"
for (lp_s,lp_e),lp in group_entries(self.line_pointers, lambda lp: lp.data):
s += " %7s: %s\n" % ("%d-%d" % (lp_s, lp_e) if lp_s != lp_e else lp_s, lp.debug())
if lp.lp_len and lp.lp_off:
s += "%s\n" % self.tuple(lp).debug(indent=' ')
return s
lp_flag_defs = {
0: 'LP_UNUSED',
1: 'LP_NORMAL',
2: 'LP_REDIRECT',
3: 'LP_DEAD',
}
class LinePointer(object):
def __init__(self, data):
self.data = data
self.lp_off, self.lp_len = struct.unpack('HH', data)
self.lp_flags = int(self.lp_off & 0x8000 > 0)
self.lp_off &= 0x7FFF
self.lp_flags |= (self.lp_len & 1)*2
self.lp_len &= ~1
def debug(self):
return "%s: %04x-%04x (%dB)%s%s" % (lp_flag_defs[self.lp_flags],
self.lp_off, self.lp_off+self.lp_len, self.lp_len,
warning((self.LP_UNUSED or self.LP_REDIRECT) and self.lp_len, "lp_len != 0"),
warning(self.LP_NORMAL and not self.lp_len, "No storage"),
)
def register_enum_getters(cls, flags, flag_attr):
for flag_value, flag_name in flags:
setattr(cls, flag_name, property((lambda fv: lambda s: getattr(s, flag_attr) == fv)(flag_value)))
def register_flag_getters(cls, flags, flag_attr):
for flag_value, flag_name in flags:
setattr(cls, flag_name, property((lambda fv: lambda s: bool(getattr(s, flag_attr) & fv))(flag_value)))
register_enum_getters(LinePointer, lp_flag_defs.items(), 'lp_flags')
class Tuple(object):
def __init__(self, data):
self.data = data
# 8.3
# self.t_xmin, self.t_xmax, self.t_cid, ctid_p, ctid_t, self.t_infomask2, \
# self.t_infomask, self.t_hoff = struct.unpack("IIIIHHHB", data[0:23])
# 8.1
self.t_xmin, self.t_cmin, self.t_xmax, self.t_cid, ctid_p, ctid_t, self.t_infomask2, \
self.t_infomask, self.t_hoff = struct.unpack("IIIIIHHHB", data[0:27])
self.t_xvac = self.t_cid
self.t_ctid = ctid_p, ctid_t
@property
def num_attrs(self):
return self.t_infomask2 & 0x07FF
@property
def oid(self):
return struct.unpack('I', self.data[24:28])[0]
def debug(self, indent=''):
s = (
"%sxmin: %-20s cmin: %-20s xmax: %-20s cid: %s\n"
"%sctid: %-20s attrs: %-19s%s\n"
"%sFlags: info: %s info2: %s"
) % (
indent, outXID(self.t_xmin), self.t_cmin, outXID(self.t_xmax), self.t_cid,
indent, self.t_ctid, self.num_attrs, (" oid: %s" % self.oid if self.HEAP_HASOID else ''),
indent, flags_to_text(self.t_infomask, t_infomask_defs).replace('HEAP_',''),
flags_to_text(self.t_infomask2, t_infomask2_defs).replace('HEAP_',''),
)
return s
t_infomask_defs = [
(0x0001, 'HEAP_HASNULL'),
(0x0002, 'HEAP_HASVARWIDTH'),
(0x0004, 'HEAP_HASEXTERNAL'),
(0x0008, 'HEAP_HASOID'),
(0x0010, 'HEAP_XMAX_KEYSHR_LOCK'),
(0x0020, 'HEAP_COMBOCID'),
(0x0040, 'HEAP_XMAX_EXCL_LOCK'),
(0x0080, 'HEAP_XMAX_LOCK_ONLY'),
(0x0410, 'HEAP_XMAX_SHR_LOCK'),
(0x0100, 'HEAP_XMIN_COMMITTED'),
(0x0200, 'HEAP_XMIN_INVALID'),
(0x0400, 'HEAP_XMAX_COMMITTED'),
(0x0800, 'HEAP_XMAX_INVALID'),
(0x1000, 'HEAP_XMAX_IS_MULTI'),
(0x2000, 'HEAP_UPDATED'),
(0x4000, 'HEAP_MOVED_OFF'),
(0x8000, 'HEAP_MOVED_IN'),
]
register_flag_getters(Tuple, t_infomask_defs, 't_infomask')
t_infomask2_defs = [
(0x2000, 'HEAP_KEYS_UPDATED'),
(0x4000, 'HEAP_HOT_UPDATED'),
(0x8000, 'HEAP_ONLY_TUPLE'),
(~(0x7FFF | 0xE000), '!EXTRA GARBAGE!')
]
register_flag_getters(Tuple, t_infomask2_defs, 't_infomask2')
if __name__ == '__main__':
import sys
c = Cluster(sys.argv[1])
db = c.database(int(sys.argv[2]))
fn = db.filenode(int(sys.argv[3]))
print fn
if len(sys.argv) <= 4:
for i in xrange(fn.num_pages):
print fn.page(i).debug()
else:
print fn.page(int(sys.argv[4])).debug()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment