Skip to content

Instantly share code, notes, and snippets.

@ml31415
Created July 22, 2022 15:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ml31415/b4d00251e76afe35358070564864287a to your computer and use it in GitHub Desktop.
Save ml31415/b4d00251e76afe35358070564864287a to your computer and use it in GitHub Desktop.
Toolkit for repairing and converting a ZODB file with all kinds of errors
"""
Tool to verify, update, migrate and pack the ZODB database. Ignores and or fix possible read errors in the process.
usage: zodbtool {verify,repair,convert,pack} [{verify,repair,convert,pack} ...]
[-h] [-f FILE | -c CONFIG] [-D] [-o OID] [-p] [--zlib] [--convert-py3]
[--encoding ENCODING] [--encoding-fallback [ENCODING_FALLBACKS ...]] [-v]
Verifies, converts, migrates or packs a ZODB database.
positional arguments:
{verify,repair,convert,pack}
actions to be performed
options:
-h, --help show this help message and exit
-f FILE, --file FILE load storage from given file
-c CONFIG, --config CONFIG
load storage from config file
-D, --debug pause to debug broken pickles
-o OID, --oid OID oid to start working
-p, --pack pack the database after inspection
--zlib use zlibstorage
--convert-py3 convert the database from python2 to python3
--encoding ENCODING used for decoding pickled strings in py3
--encoding-fallback [ENCODING_FALLBACKS ...]
Older databases may have other encoding stored than 'utf-8', like latin1.If an
encoding error occurs, fallback to the given encodings and issue a warning.
-v, --verbose more verbose output
"""
import argparse
import copyreg
import logging
import os
import pdb
import pickle
import six
import struct
import sys
import time
import zlib
from collections import defaultdict
from contextlib import closing
from ZODB.FileStorage import FileStorage, packed_version as FS_packed_version
from ZODB.FileStorage.FileStorage import FileStorageFormatError, read_index
from ZODB.FileStorage.format import DATA_HDR, DATA_HDR_LEN, CorruptedDataError, u64, DataHeader
from ZODB.FileStorage.fspack import FileStoragePacker, GC
from ZODB.blob import BlobStorage
from ZODB.broken import PersistentBroken
from ZODB.config import storageFromFile
from ZODB.Connection import TransactionMetaData
from ZODB.interfaces import IStorageCurrentRecordIteration, IStorageIteration, IStorageUndoable
from ZODB.POSException import POSKeyError
from ZODB.serialize import PersistentUnpickler, PersistentPickler, referencesf
from ZODB.utils import oid_repr, repr_to_oid, load_current
from zodbpickle import pickle as zodbpickle
from zodbupdate.main import format_renames, load_renames
from zodbupdate.utils import get_zodb_magic, DEFAULT_PROTOCOL
from zodbupdate import serialize, convert as zodbconvert
# Need to import all modules of the project present in the database
##### import all all serialized classes here #####
logger = logging.getLogger('zodbtool')
class FTGC(GC):
def findrefs(self, pos):
"""Return a list of oids referenced as of packtime."""
try:
return GC.findrefs(self, pos)
except (pickle.UnpicklingError, zodbpickle.UnpicklingError, CorruptedDataError, zlib.error) as e:
logger.warning('Skipping unreadable record: {} {}'.format(type(e).__name__, str(e)))
return []
class FTFileStoragePacker(FileStoragePacker):
"""Fault-tolerant packer."""
def __init__(self, storage, referencesf, stop, gc=True):
FileStoragePacker.__init__(self, storage, referencesf, stop, gc)
self.gc = FTGC(self._file, self.file_end, self._stop, gc, referencesf)
class FTFileStorage(FileStorage):
"""Fault-tolerant FileStorage, which just reports any possibly found versioned objects and other issues instead of crashing."""
def _read_data_header(self, pos, oid=None, _file=None):
if _file is None:
_file = self._file
_file.seek(pos)
s = _file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos)
h = FTDataHeader(*struct.unpack(DATA_HDR, s))
if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos)
if not h.plen:
h.back = u64(_file.read(8))
return h
def reload_index(self):
index, tindex = self._newIndexes()
self._initIndex(index, tindex)
self._pos, self._oid, tid = read_index(self._file, self._file_name, index, tindex, read_only=self._is_read_only)
self._save_index()
@staticmethod
def packer(storage, referencesf, stop, gc):
p = FTFileStoragePacker(storage, referencesf, stop, gc)
try:
opos = p.pack()
if opos is None:
return None
return opos, p.index
finally:
p.close()
class FTDataHeader(DataHeader):
"""Fault-tolerant DataHeader"""
def __init__(self, oid, tid, prev, tloc, vlen, plen):
if vlen:
logger.warning("Non-zero version length. Versions aren't supported. OID: {}".format(oid_repr(oid)))
self.oid = oid
self.tid = tid
self.prev = prev
self.tloc = tloc
self.plen = plen
self.back = 0 # default
def ft_storage_iter(storage, start_at=None, debug=False, error_callback=None):
"""Fault-tolerant iterator for all records within a storage."""
if isinstance(storage, BlobStorage):
# If we've got a BlobStorage wrapper, let's
# actually iterate through the storage it wraps.
storage = storage._BlobStorage__storage
base_storage = getattr(storage, 'base', storage)
def _next_oid(oid):
"""Determine the next oid without loading a possibly broken object."""
next_oid = struct.pack(">Q", struct.unpack(">Q", oid)[0] + 1)
try:
return base_storage._index.minKey(next_oid)
except ValueError: # "empty tree" error
return None
if IStorageCurrentRecordIteration.providedBy(storage):
next_oid = repr_to_oid(start_at if start_at else '0x00')
while next_oid is not None:
# Try to handle common issues
try:
oid, tid, data, next_oid = storage.record_iternext(next_oid)
yield oid, tid, six.BytesIO(data)
except POSKeyError as e:
if debug:
logger.exception('Missing record ({}): {}'.format(oid_repr(next_oid), str(e)))
pdb.set_trace()
if error_callback:
error_callback(next_oid, e)
next_oid = _next_oid(next_oid)
except zlib.error as e:
if debug:
logger.exception('Broken zlib compressed record ({}): {}'.format(oid_repr(next_oid), str(e)))
pdb.set_trace()
if error_callback:
try:
tid = storage.base.record_iternext(next_oid)[1]
except Exception:
tid = None
error_callback(next_oid, e, tid=tid)
next_oid = _next_oid(next_oid)
except OSError as e:
# Something might be wrong with the stored file offset
if debug:
logger.exception("Broken record found ({}): {}".format(oid_repr(next_oid), str(e)))
pdb.set_trace()
if error_callback:
error_callback(next_oid, e)
next_oid = _next_oid(next_oid)
except Exception as e:
if debug:
logger.exception("Could not open oid ({}): {}".format(oid_repr(next_oid), str(e)))
pdb.set_trace()
else:
# This is likely something serious and should not pass unhandled
raise
# Keep this in case while after debugging the loop goes on
next_oid = _next_oid(next_oid)
elif (IStorageIteration.providedBy(storage) and
(not IStorageUndoable.providedBy(storage) or
not storage.supportsUndo())):
# If we can't iterate only through the recent records,
# iterate on all. Of course doing a pack before help :).
for transaction_ in storage.iterator():
for rec in transaction_:
# This contains not a bit of fault-tolerance, but better than nothing
yield rec.oid, rec.tid, six.BytesIO(rec.data)
else:
raise TypeError("Don't know how to iterate through this storage type")
class FTUpdater(object):
"""Fault-tolerant version of zodbupdate.update.Updater."""
TRANSACTION_COUNT = 100000
def __init__(
self, storage, dry=False, renames=None, decoders=None,
start_at='0x00', debug=False, repickle_all=False,
pickle_protocol=DEFAULT_PROTOCOL,
encoding='ASCII'):
self.dry = dry
self.storage = storage
self.processor = serialize.ObjectRenamer(
renames=renames,
decoders=decoders,
pickle_protocol=pickle_protocol,
repickle_all=repickle_all,
encoding=encoding,
)
self._protocol = pickle_protocol
self.start_at = start_at
self.debug = debug
def _broken_marker(self, persistent_id=lambda obj: None):
"""Create a broken persistent object, that can replace unreadable records."""
broken = PersistentBroken()
output_file = six.BytesIO()
p = PersistentPickler(persistent_id, output_file, self._protocol)
p.dump((broken.__class__, None))
p.dump(broken.__getstate__())
output_file.truncate()
return output_file
def _error_callback(self, oid, error, tid=None):
"""Insert a broken marker object into the storage."""
logger.warning('Replacing not loadable object ({}): {} {}'.format(oid_repr(oid), type(error).__name__, str(error)))
bm = self._broken_marker()
self.storage.store(oid, tid, bm.getvalue(), '', self.__transaction)
def __new_transaction(self):
t = TransactionMetaData()
self.storage.tpc_begin(t)
t.note(six.u('Batch updated factory references.'))
return t
def __commit_transaction(self, t, changed, commit_count, oid=None):
# Be verbose with the commit messages, so that operations in case of an error
# can be easily resumed from the last committed oid
if self.dry or not changed:
logger.info(
'Dry run selected or no changes, '
'aborting transaction. (#{} oid {})'.format(commit_count, oid_repr(oid)))
self.storage.tpc_abort(t)
else:
logger.info('Committing changes (#{} oid {}).'.format(commit_count, oid_repr(oid)))
self.storage.tpc_vote(t)
self.storage.tpc_finish(t)
def __call__(self):
commit_count = 0
oid = 0
try:
record_count = 0
self.__transaction = self.__new_transaction()
for oid, tid, data_file in ft_storage_iter(self.storage, self.start_at, error_callback=self._error_callback):
try:
new = self.processor.rename(data_file)
except (pickle.UnpicklingError, zodbpickle.UnpicklingError, CorruptedDataError) as e:
logger.warning('Replacing corrupted object ({}): {} {}'.format(oid_repr(oid), type(e).__name__, str(e)))
new = self._broken_marker()
if new is None:
continue
self.storage.store(oid, tid, new.getvalue(), '', self.__transaction)
record_count += 1
if record_count > self.TRANSACTION_COUNT:
record_count = 0
commit_count += 1
self.__commit_transaction(self.__transaction, True, commit_count, oid=oid)
self.__transaction = self.__new_transaction()
commit_count += 1
self.__commit_transaction(self.__transaction, record_count != 0, commit_count, oid=oid)
except Exception as e:
logger.exception("Unexpected failure while loading oid {}: {} {}".format(oid_repr(oid), type(e).__name__, str(e)))
if self.debug:
pdb.post_mortem()
sys.exit()
def verify_record(oid, data_file, persistent_load=lambda ref: None):
unpickler = PersistentUnpickler(None, persistent_load, data_file)
class_info = unpickler.load()
pos = data_file.tell()
return unpickler.load()
def verify(storage, start_at=None, debug=False):
if start_at:
logger.info('Verifying storaged objects starting from {} ...'.format(start_at))
else:
logger.info('Verifying all storaged objects ...')
issues = defaultdict(list)
def error_callback(oid, error, tid=None):
logger.warning('Skipping unreadable record ({}): {} {}'.format(oid_repr(oid), type(error).__name__, str(error)))
issues["{}: {}".format(type(error).__name__, str(error))].append(oid_repr(oid))
count = -1
for count, (oid, tid, data_file) in enumerate(
ft_storage_iter(storage, start_at, debug=debug, error_callback=error_callback)):
try:
verify_record(oid, data_file)
except (pickle.UnpicklingError, zodbpickle.UnpicklingError) as e:
if debug:
logger.exception("Unpickleable record ({}): {} {}".format(oid_repr(oid), type(e).__name__, str(e)))
pdb.set_trace()
else:
error_callback(oid, e)
logger.debug("Unreadable data:\n{}", repr(data_file.getvalue()))
except Exception as e:
if debug:
logger.exception("Unloadable record found ({}): {} {}".format(oid_repr(oid), type(e).__name__, str(e)))
pdb.set_trace()
else:
error_callback(oid, e)
logger.debug("Unreadable data:\n{}", repr(data_file.getvalue()))
error_count = sum(len(oids) for oids in issues.values())
logger.info("\nScanned {} records. Found {} records that could not be loaded.".format(count + 1, error_count))
if error_count:
msg = ""
for key in sorted(issues, key=lambda k: len(issues[k]), reverse=True):
oids = issues[key]
msg += "{} ({} oids): {}\n".format(key, len(oids), ' '.join(oids))
logger.info("Exceptions, how often they happened and which oids are affected:\n\n{}".format(msg))
def convert(storage, start_at='0x00', convert_py3=True, encoding=None, encoding_fallbacks=None, debug=False):
if six.PY2 and encoding:
raise AssertionError('Unpickling with a default encoding is only supported in Python 3.')
decoders = {}
renames = load_renames()
repickle_all = False
pickle_protocol = DEFAULT_PROTOCOL
if convert_py3:
pickle_protocol = 3
repickle_all = True
decoders.update(
zodbconvert.load_decoders(
encoding_fallbacks=encoding_fallbacks
)
)
renames.update(zodbconvert.default_renames())
updater = FTUpdater(
storage,
dry=False,
renames=renames,
decoders=decoders,
start_at=start_at,
debug=debug,
repickle_all=repickle_all,
pickle_protocol=pickle_protocol,
encoding=encoding)
implicit_renames = format_renames(updater.processor.get_rules(implicit=True, explicit=True))
if implicit_renames:
logger.info('Applying rename rules:\n{}'.format(implicit_renames))
logger.info('Converting storage ...')
try:
updater()
except Exception as e:
logger.exception('Stopped processing due to: {} {}'.format(type(e).__name__, str(e)))
if debug:
pdb.set_trace()
else:
sys.exit()
def reload_index(storage):
logger.info('Restoring index from storage ...')
getattr(storage, 'base', storage).reload_index()
def pack(storage):
logger.info('Packing storage ...')
storage.pack(time.time(), referencesf)
def repair(storage, oid, replacement=None, debug=False):
oid_repr = oid
oid = repr_to_oid(oid)
try:
old_data, old_tid = load_current(storage, oid)
except POSKeyError:
logger.info("No previous object with oid {} found".format(oid_repr))
old_tid = None
except zlib.error as e:
logger.info("Specified object ({}) could not be decompressed: {}".format(oid_repr, str(e)))
old_data, old_tid = load_current(storage.base, oid)
logger.debug("Indecompressible data:\n{}".format(repr(old_data)))
else:
try:
obj = verify_record(oid, six.BytesIO(old_data))
except (pickle.UnpicklingError, zodbpickle.UnpicklingError):
# Overwriting such broken objects is what we want to do
pass
except Exception as e:
if debug:
pdb.set_trace()
else:
raise
else:
logger.info("Object could be successfully loaded: {}".format(repr(obj)))
logger.info("Aborting replacement!".format(repr(obj)))
return
logger.info("Replacing stored object ({}) with a place holder".format(oid_repr))
if replacement is None:
replacement = PersistentBroken()
replacement._p_oid = oid
def persistent_id(ref):
return None
# See zc.zodbgc for reference
buffer = six.BytesIO()
p = PersistentPickler(persistent_id, buffer, 1)
p.dump((replacement.__class__, None))
p.dump(replacement.__getstate__())
t = TransactionMetaData()
t.note('Replace broken record')
try:
storage.tpc_begin(t)
storage.store(oid, old_tid, buffer.getvalue(), '', t)
storage.tpc_vote(t)
storage.tpc_finish(t)
except Exception as e:
if debug:
pdb.set_trace()
else:
raise
def get_storage(fpath, zodbconfig, read_only=True, zlib=False, check_version=True):
if fpath and zodbconfig:
raise AssertionError('Exactly one of --file or --config must be given.')
if fpath and check_version:
zodb_magic = get_zodb_magic(fpath)
if zodb_magic != FS_packed_version:
sys.exit('Verification needts to run with the same python version that the database was created with')
logger.info('Loading storage ...')
if fpath:
try:
storage = FTFileStorage(fpath, read_only=read_only)
except FileStorageFormatError:
sys.exit("Can not open a storage created with another python version.")
if zlib:
from zc.zlibstorage import ZlibStorage
storage = ZlibStorage(storage)
elif zodbconfig:
with open(zodbconfig) as config:
storage = storageFromFile(config)
else:
raise AssertionError('Exactly one of --file or --config must be given.')
return storage
def get_parser():
parser = argparse.ArgumentParser(
prog="zodbtool",
description="Verifies, converts, migrates or packs a ZODB database.",
)
parser.add_argument(
"action",
choices=["verify", "repair", "convert", "pack"],
nargs='+',
help="actions to be performed"
)
exclusive_group = parser.add_mutually_exclusive_group()
exclusive_group.add_argument(
"-f", "--file",
help="load storage from given file",
)
exclusive_group.add_argument(
"-c", "--config",
help="load storage from config file",
)
parser.add_argument(
"-D", "--debug",
action="store_true",
dest="debug",
help="pause to debug broken pickles",
)
parser.add_argument(
"-o", "--oid",
action="store",
dest="oid",
help="oid to start working",
)
parser.add_argument(
"--zlib",
action="store_true",
dest="zlib",
help="use zlibstorage",
)
parser.add_argument(
"--convert-py3",
action="store_true",
dest="convert_py3",
help="convert the database from python2 to python3"
)
parser.add_argument(
"--encoding",
dest="encoding",
default="utf8",
help="used for decoding pickled strings in py3"
)
parser.add_argument(
"--encoding-fallback",
dest="encoding_fallbacks",
nargs="*",
help="Older databases may have other encoding stored than 'utf-8', like latin1."
"If an encoding error occurs, fallback to the given encodings "
"and issue a warning.",
)
parser.add_argument(
'-v', '--verbose',
action="store_true",
help="more verbose output"
)
return parser
def patch_copyreg_reconstructor():
def _reconstructor(cls, base, state):
"""Replacement for copyreg._reconstructor to handle some pesky data types."""
if base is object:
obj = object.__new__(cls)
elif cls.__name__ == '_ElementStringResult' and isinstance(state, str):
return cls(state, 'utf8')
else:
obj = base.__new__(cls, state)
if base.__init__ != object.__init__:
base.__init__(obj, state)
return obj
# Make sure, it gets found in the copyreg module and not here by ObjectRenamer
_reconstructor.__module__ = 'copyreg'
copyreg._reconstructor = _reconstructor
del _reconstructor
def main():
patch_copyreg_reconstructor()
args = get_parser().parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
# Capture the output from zodbupdate.serialize
serialize.logger = logger
if 'verify' in args.action:
with closing(get_storage(args.file, args.config, read_only=True, zlib=args.zlib)) as storage:
verify(storage, start_at=args.oid, debug=args.debug)
if 'repair' in args.action:
with closing(get_storage(args.file, args.config, read_only=False, zlib=args.zlib)) as storage:
repair(storage, oid=args.oid, debug=args.debug)
if 'convert' in args.action:
if not args.file:
sys.exit("Storage file for conversion needs to be defined using parameter -f")
if args.convert_py3:
if not six.PY3:
sys.exit("Conversion to python3 readable format should be done with python3")
# Magic bytes need to be converted at the end when running in Python 2
# but at the beginning when running in Python 3 so that FileStorage
# doesn't complain.
fpath = args.file
fpath_index = fpath + '.index'
if os.path.exists(fpath_index):
os.rename(fpath_index, fpath_index + '.bak')
logger.info("Storage index file {} was renamed in order not to cause issues with the conversion".format(fpath_index))
zodbconvert.update_magic_data_fs(fpath)
with closing(get_storage(args.file, args.config, read_only=False, zlib=args.zlib, check_version=False)) as storage:
convert(storage,
start_at=args.oid,
convert_py3=args.convert_py3,
encoding=args.encoding,
encoding_fallbacks=args.encoding_fallbacks,
debug=args.debug)
reload_index(storage)
if 'pack' in args.action:
with closing(get_storage(args.file, args.config, read_only=False, zlib=args.zlib)) as storage:
pack(storage)
if __name__ == '__main__':
try:
main()
except AssertionError as e:
sys.exit(str(e))
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment