Skip to content

Instantly share code, notes, and snippets.

@sfan5 sfan5/mapblock-parse.py
Last active May 9, 2017

Embed
What would you like to do?
parses serialized MapBlock
#!/usr/bin/env python3
import sys
import struct
import binascii
import zlib
import getopt
BS = 16 # MapBlock side length (not to be confused with the BS for visuals)
ENDIAN = "!"
class BufferConsumer():
def __init__(self, buf):
assert(type(buf) == bytes)
self.buf = buf
self.off = 0
def remaining(self):
return len(self.buf) - self.off
def skip(self, length):
if length > self.remaining():
raise EOFError()
self.off += length
def read(self, length):
if length > self.remaining():
raise EOFError()
ret = self.buf[self.off:self.off+length]
self.off += length
return ret
def read_a(self, length):
return self.read(length).decode("ascii", "ignore")
def read_al(self, nl='\n'):
ret = ""
while True:
c = self.read_a(1)
if c == nl:
break
ret += c
return ret
def read_s(self, fmt):
fmt = ENDIAN + fmt
return struct.unpack(fmt, self.read(struct.calcsize(fmt)))
def read_z(self):
d = zlib.decompressobj()
ret = d.decompress(self.buf[self.off:])
ret += d.flush()
# stuff below is non-ideal
self.off = 0
self.buf = d.unused_data
return ret
def deflag(val, tbl):
r = []
for flag, name in tbl.items():
if val & flag:
r.append(name)
return " | ".join(r)
def decombine_pos(n):
z = n // BS**2
n &= BS**2 - 1
y = n // BS
n &= BS - 1
x = n
return (x, y, z)
ilevel = 0
def o(a1, *args):
print(" " * ilevel * 2 + "- " + a1 % args)
def ol(l):
global ilevel
ilevel += l
def usage():
print("%s [options] [hex file.txt]" % sys.argv[0])
print("-m Display contents of metadata")
print("-M <file> Write uncompressed metadata to file")
exit(1)
def optval(name):
global opts
for e in opts:
if e[0] == name:
return e[1]
return None
def optvalb(name):
return not optval(name) is None
try:
opts, args = getopt.getopt(sys.argv[1:], "hmM:")
except getopt.GetoptError as e:
print(e)
usage()
if optvalb("-h"):
usage()
if len(args) > 0:
f = open(args[0], "r")
data = binascii.unhexlify(f.read().strip("\n"))
f.close()
else:
sys.stderr.write("Feed me hex data from sqlite [e.g. SELECT HEX(data) FROM blocks WHERE pos = -1234455;]\n")
data = binascii.unhexlify(sys.stdin.read().strip("\n"))
buf = BufferConsumer(data)
def decode_mapmeta(buf):
ver = buf.read_s("B")[0]
if ver == 0:
o("Metadata version: 0 (empty)")
return
assert(ver in (1, 2))
o("Metadata version: %d", ver)
count = buf.read_s("H")[0]
o("Metadata count: %d", count)
ol(1)
for i in range(count):
pos, num_vars = buf.read_s("HI")
px, py, pz = decombine_pos(pos)
o("Position: %d, %d, %d", px, py, pz)
o("Variable count: %d", num_vars)
ol(1)
for j in range(num_vars):
len1 = buf.read_s("H")[0]
name = buf.read_a(len1)
len2 = buf.read_s("I")[0]
buf.skip(len2)
if ver >= 2:
priv = buf.read_s("B")[0] == 1
else:
priv = False
o("'%s' / (%d bytes)%s", name, len2, " (private)" if priv else "")
ol(-1)
ulen = buf.remaining()
while buf.read_al() != "EndInventory":
pass
ulen = ulen - buf.remaining()
o("Inventory data (%d bytes)", ulen)
ol(-1)
o("MapBlock (%d bytes)", len(data))
ol(1)
ver, flags = buf.read_s("BB")
assert(ver >= 22)
o("Version: %d", ver)
o("Flags: %s", deflag(flags, {
0x01: "is_underground",
0x02: "day_night_differs",
0x04: "lighting_expired", # deprecated
0x08: "generated",
}))
if ver >= 27:
lighting_complete = buf.read_s("H")[0]
o("Lighting complete: 0x%04x", lighting_complete)
content_width, params_width = buf.read_s("BB")
assert(content_width in (1, 2))
assert(params_width == 2)
o("Content/Params width: %d / %d", content_width, params_width)
ulen = buf.remaining()
nodedata = buf.read_z()
ulen = ulen - buf.remaining()
assert(len(nodedata) == BS**3 * (content_width + params_width))
o("Compressed node data (%d bytes, %d bytes uncompressed)", ulen, len(nodedata))
ulen = buf.remaining()
mapmeta = buf.read_z()
if optvalb("-M"):
with open(optval("-M"), "wb") as f:
f.write(mapmeta)
ulen = ulen - buf.remaining()
o("Compressed map metadata (%d bytes, %d bytes uncompressed)", ulen, len(mapmeta))
if optvalb("-m"):
ol(1)
decode_mapmeta(BufferConsumer(mapmeta))
ol(-1)
if ver == 23:
buf.skip(1)
o("Unused data (1 byte)")
elif ver == 24:
ver2, length = buf.read_s("BH")
assert(ver2 == 1)
o("Node timers version: %d", ver2)
o("Node timers data (%d bytes)", length*10)
buf.skip(length*10) # TODO: display contents of these
ver2, count = buf.read_s("BH")
assert(ver2 == 0)
o("Static objects version: %d", ver2)
o("Static objects count: %d", count)
ol(1)
for i in range(count):
stype, px, py, pz = buf.read_s("Biii")
px /= 1000; py /= 1000; pz /= 1000 # decode F1000
px /= 10; py /= 10; pz /= 10 # floatToInt() with BS=10
o("Type: %s", ({
1: "ACTIVEOBJECT_TYPE_TEST",
2: "ACTIVEOBJECT_TYPE_ITEM", # deprecated
3: "ACTIVEOBJECT_TYPE_RAT",
4: "ACTIVEOBJECT_TYPE_OERKKI1",
5: "ACTIVEOBJECT_TYPE_FIREFLY",
6: "ACTIVEOBJECT_TYPE_MOBV2", # deprecated end
7: "ACTIVEOBJECT_TYPE_LUAENTITY",
100: "ACTIVEOBJECT_TYPE_PLAYER ", # should never appear (not stored)
101: "ACTIVEOBJECT_TYPE_GENERIC", # should never appear (client-side)
}).get(stype))
o("Position: %.2f, %.2f, %.2f", px, py, pz)
length = buf.read_s("H")[0]
o("Data (%d bytes)", length)
ol(1)
if stype == 7: # luaentity
ver2, nlen = buf.read_s("BH")
assert(ver2 == 1)
name = buf.read_a(nlen)
o("Entity name: '%s'", name)
buf.skip(length - 3 - nlen)
else:
buf.skip(length)
ol(-1)
ol(-1)
ts = buf.read_s("I")[0]
o("Timestamp: %s", "undefined" if ts == 0xffffffff else str(ts))
ver2, count = buf.read_s("BH")
assert(ver2 == 0)
o("Node/ID mapping version: %d", ver2)
o("Node/ID mapping count: %d", count)
ol(1)
for i in range(count):
nid, nlen = buf.read_s("HH")
name = buf.read_a(nlen)
o("0x%04x / '%s'", nid, name)
ol(-1)
if ver >= 25:
length, count = buf.read_s("BH")
assert(length == 10)
o("Node timers length: %d", length)
o("Node timers count: %d", count)
ol(1)
for i in range(count):
pos, timeout, elapsed = buf.read_s("HII")
px, py, pz = decombine_pos(pos)
timeout /= 1000; elapsed /= 1000 # decode F1000
o("Position: %d, %d, %d", px, py, pz)
o("Timeout / Elapsed: %.2f / %.2f", timeout, elapsed)
ol(-1)
ol(-1)
assert(buf.remaining() == 0)
#include <iostream>
#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <leveldb/db.h>
// Compilation:
// $ g++ -std=c++11 leveldb_read.cpp -o leveldb_read -lleveldb -O2
typedef uint64_t u64;
typedef int64_t s64;
struct v3s16 { int16_t X, Y, Z; };
inline std::string i64tos(s64 i) {
return std::to_string(i);
}
s64 getBlockAsInteger(const v3s16 &pos) {
return (u64) pos.Z * 0x1000000 +
(u64) pos.Y * 0x1000 +
(u64) pos.X;
}
int main(int argc, char *argv[])
{
if(argc < 5) {
std::cerr << "Usage: leveldb_read <world path> <x> <y> <z>" << std::endl;
return 1;
}
leveldb::DB *database;
leveldb::Status status;
status = leveldb::DB::Open(leveldb::Options(), std::string(argv[1]) + "/map.db", &database);
if(!status.ok()) {
std::cerr << "Error:" << status.ToString() << std::endl;
return 1;
}
v3s16 pos;
std::string data;
pos.X = atoi(argv[2]);
pos.Y = atoi(argv[3]);
pos.Z = atoi(argv[4]);
status = database->Get(leveldb::ReadOptions(), i64tos(getBlockAsInteger(pos)), &data);
if(!status.ok())
return 0;
for(int i = 0; i < data.size(); i++)
printf("%02X", (int) ((uint8_t*) data.c_str())[i]);
printf("\n");
return 0;
}
#!/bin/bash -e
SQLITE_FILE=./worlds/test/map.sqlite
SIZE_THRESHOLD=5000
MBPARSE=./mapblock-parse.py
####################
sqlite3 $SQLITE_FILE \
"SELECT pos FROM blocks WHERE LENGTH(data) > $SIZE_THRESHOLD ORDER BY LENGTH(data) DESC;" \
| while read pos; do
printf -- "------------- pos = %12s -------------\n" "$pos"
sqlite3 $SQLITE_FILE "SELECT HEX(data) FROM blocks WHERE pos = $pos;" \
| python3 $MBPARSE
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.