Skip to content

Instantly share code, notes, and snippets.

@sfan5
Created November 2, 2016 22:10
Show Gist options
  • Save sfan5/3c04c625453ec95c6a6df4d7e2c05885 to your computer and use it in GitHub Desktop.
Save sfan5/3c04c625453ec95c6a6df4d7e2c05885 to your computer and use it in GitHub Desktop.
Removes a single node from a whole Minetest world (SQLite3 backend only)
#!/usr/bin/env python3
import sys
import os.path
import struct
import zlib
import sqlite3
class BufferConsumer():
def __init__(self, buf):
assert(type(buf) == bytes)
self.buf = buf
self.off = 0
def remaining(self):
return len(self.buf) - self.off
def tell(self):
return self.off
def skip(self, length):
if length > self.remaining():
raise EOFError()
self.off += length
def read(self, length):
if length > self.remaining():
raise EOFError()
ret = self.buf[self.off:self.off+length]
self.off += length
return ret
def read_a(self, length):
return self.read(length).decode("ascii", "ignore")
def read_s(self, fmt):
fmt = "!" + fmt
return struct.unpack(fmt, self.read(struct.calcsize(fmt)))
def read_z(self):
d = zlib.decompressobj()
clen = self.remaining()
ret = d.decompress(self.buf[self.off:])
ret += d.flush()
# special impl. so tell() works correctly
clen = clen - len(d.unused_data)
self.off += clen
return ret
def process_block(data, to_replace):
copybefore, newidmap, copyafter = 0, b"", 0
trig = False
###
buf = BufferConsumer(data)
ver, _, _, _ = buf.read_s("BBBB")
assert(ver >= 22)
_ = buf.read_z() # node data
_ = buf.read_z() # metadata
if ver == 23:
buf.skip(1)
elif ver == 24:
ver2, length = buf.read_s("BH")
assert(ver2 == 1)
buf.skip(length*10)
ver2, count = buf.read_s("BH")
assert(ver2 == 0)
for i in range(count):
_ = buf.read_s("Biii")
length = buf.read_s("H")[0]
buf.skip(length)
_ = buf.read_s("I")[0]
copybefore = buf.tell() #
ver2, count = buf.read_s("BH")
assert(ver2 == 0)
newidmap += struct.pack("!BH", ver2, count) #
for i in range(count):
nid, nlen = buf.read_s("HH")
name = buf.read_a(nlen)
if name == to_replace: #
name = "air" #
trig = True #
newidmap += struct.pack("!HH", nid, len(name)) #
newidmap += name.encode("ascii") #
copyafter = buf.tell() #
if ver >= 25:
length, count = buf.read_s("BH")
assert(length == 10)
for i in range(count):
_ = buf.read_s("HII")
assert(buf.remaining() == 0)
###
return (1 if trig else 0), data[:copybefore] + newidmap + data[copyafter:]
if len(sys.argv) < 3:
print("Usage: blacknode.py <map dir> <node name>")
exit(1)
if not os.path.isfile(sys.argv[1] + "/map.sqlite"):
print("Map database doesn't exist?!")
exit(1)
conn = sqlite3.connect(sys.argv[1] + "/map.sqlite")
nodename = sys.argv[2]
c_total, c_rem = 0, 0
for row in conn.execute("SELECT pos FROM blocks"):
blockdata = next(conn.execute("SELECT data FROM blocks WHERE pos = ?", row))[0]
try:
ret, blockdata = process_block(blockdata, nodename)
except AssertionError:
print("Skipping a MapBlock because we can't decode it..")
continue
conn.execute("UPDATE blocks SET data = ? WHERE pos = ?", (blockdata, row[0]))
c_total += 1
c_rem += ret
conn.commit()
print("Removed node from %d of total %d MapBlocks" % (c_rem, c_total))
conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment