Created
July 22, 2022 15:09
-
-
Save ml31415/b4d00251e76afe35358070564864287a to your computer and use it in GitHub Desktop.
Toolkit for repairing and converting a ZODB file with all kinds of errors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Tool to verify, update, migrate and pack the ZODB database. Ignores and or fix possible read errors in the process. | |
usage: zodbtool {verify,repair,convert,pack} [{verify,repair,convert,pack} ...] | |
[-h] [-f FILE | -c CONFIG] [-D] [-o OID] [-p] [--zlib] [--convert-py3] | |
[--encoding ENCODING] [--encoding-fallback [ENCODING_FALLBACKS ...]] [-v] | |
Verifies, converts, migrates or packs a ZODB database. | |
positional arguments: | |
{verify,repair,convert,pack} | |
actions to be performed | |
options: | |
-h, --help show this help message and exit | |
-f FILE, --file FILE load storage from given file | |
-c CONFIG, --config CONFIG | |
load storage from config file | |
-D, --debug pause to debug broken pickles | |
-o OID, --oid OID oid to start working | |
-p, --pack pack the database after inspection | |
--zlib use zlibstorage | |
--convert-py3 convert the database from python2 to python3 | |
--encoding ENCODING used for decoding pickled strings in py3 | |
--encoding-fallback [ENCODING_FALLBACKS ...] | |
Older databases may have other encoding stored than 'utf-8', like latin1.If an | |
encoding error occurs, fallback to the given encodings and issue a warning. | |
-v, --verbose more verbose output | |
""" | |
import argparse | |
import copyreg | |
import logging | |
import os | |
import pdb | |
import pickle | |
import six | |
import struct | |
import sys | |
import time | |
import zlib | |
from collections import defaultdict | |
from contextlib import closing | |
from ZODB.FileStorage import FileStorage, packed_version as FS_packed_version | |
from ZODB.FileStorage.FileStorage import FileStorageFormatError, read_index | |
from ZODB.FileStorage.format import DATA_HDR, DATA_HDR_LEN, CorruptedDataError, u64, DataHeader | |
from ZODB.FileStorage.fspack import FileStoragePacker, GC | |
from ZODB.blob import BlobStorage | |
from ZODB.broken import PersistentBroken | |
from ZODB.config import storageFromFile | |
from ZODB.Connection import TransactionMetaData | |
from ZODB.interfaces import IStorageCurrentRecordIteration, IStorageIteration, IStorageUndoable | |
from ZODB.POSException import POSKeyError | |
from ZODB.serialize import PersistentUnpickler, PersistentPickler, referencesf | |
from ZODB.utils import oid_repr, repr_to_oid, load_current | |
from zodbpickle import pickle as zodbpickle | |
from zodbupdate.main import format_renames, load_renames | |
from zodbupdate.utils import get_zodb_magic, DEFAULT_PROTOCOL | |
from zodbupdate import serialize, convert as zodbconvert | |
# Need to import all modules of the project present in the database | |
##### import all all serialized classes here ##### | |
logger = logging.getLogger('zodbtool') | |
class FTGC(GC): | |
def findrefs(self, pos): | |
"""Return a list of oids referenced as of packtime.""" | |
try: | |
return GC.findrefs(self, pos) | |
except (pickle.UnpicklingError, zodbpickle.UnpicklingError, CorruptedDataError, zlib.error) as e: | |
logger.warning('Skipping unreadable record: {} {}'.format(type(e).__name__, str(e))) | |
return [] | |
class FTFileStoragePacker(FileStoragePacker): | |
"""Fault-tolerant packer.""" | |
def __init__(self, storage, referencesf, stop, gc=True): | |
FileStoragePacker.__init__(self, storage, referencesf, stop, gc) | |
self.gc = FTGC(self._file, self.file_end, self._stop, gc, referencesf) | |
class FTFileStorage(FileStorage): | |
"""Fault-tolerant FileStorage, which just reports any possibly found versioned objects and other issues instead of crashing.""" | |
def _read_data_header(self, pos, oid=None, _file=None): | |
if _file is None: | |
_file = self._file | |
_file.seek(pos) | |
s = _file.read(DATA_HDR_LEN) | |
if len(s) != DATA_HDR_LEN: | |
raise CorruptedDataError(oid, s, pos) | |
h = FTDataHeader(*struct.unpack(DATA_HDR, s)) | |
if oid is not None and oid != h.oid: | |
raise CorruptedDataError(oid, s, pos) | |
if not h.plen: | |
h.back = u64(_file.read(8)) | |
return h | |
def reload_index(self): | |
index, tindex = self._newIndexes() | |
self._initIndex(index, tindex) | |
self._pos, self._oid, tid = read_index(self._file, self._file_name, index, tindex, read_only=self._is_read_only) | |
self._save_index() | |
@staticmethod | |
def packer(storage, referencesf, stop, gc): | |
p = FTFileStoragePacker(storage, referencesf, stop, gc) | |
try: | |
opos = p.pack() | |
if opos is None: | |
return None | |
return opos, p.index | |
finally: | |
p.close() | |
class FTDataHeader(DataHeader): | |
"""Fault-tolerant DataHeader""" | |
def __init__(self, oid, tid, prev, tloc, vlen, plen): | |
if vlen: | |
logger.warning("Non-zero version length. Versions aren't supported. OID: {}".format(oid_repr(oid))) | |
self.oid = oid | |
self.tid = tid | |
self.prev = prev | |
self.tloc = tloc | |
self.plen = plen | |
self.back = 0 # default | |
def ft_storage_iter(storage, start_at=None, debug=False, error_callback=None): | |
"""Fault-tolerant iterator for all records within a storage.""" | |
if isinstance(storage, BlobStorage): | |
# If we've got a BlobStorage wrapper, let's | |
# actually iterate through the storage it wraps. | |
storage = storage._BlobStorage__storage | |
base_storage = getattr(storage, 'base', storage) | |
def _next_oid(oid): | |
"""Determine the next oid without loading a possibly broken object.""" | |
next_oid = struct.pack(">Q", struct.unpack(">Q", oid)[0] + 1) | |
try: | |
return base_storage._index.minKey(next_oid) | |
except ValueError: # "empty tree" error | |
return None | |
if IStorageCurrentRecordIteration.providedBy(storage): | |
next_oid = repr_to_oid(start_at if start_at else '0x00') | |
while next_oid is not None: | |
# Try to handle common issues | |
try: | |
oid, tid, data, next_oid = storage.record_iternext(next_oid) | |
yield oid, tid, six.BytesIO(data) | |
except POSKeyError as e: | |
if debug: | |
logger.exception('Missing record ({}): {}'.format(oid_repr(next_oid), str(e))) | |
pdb.set_trace() | |
if error_callback: | |
error_callback(next_oid, e) | |
next_oid = _next_oid(next_oid) | |
except zlib.error as e: | |
if debug: | |
logger.exception('Broken zlib compressed record ({}): {}'.format(oid_repr(next_oid), str(e))) | |
pdb.set_trace() | |
if error_callback: | |
try: | |
tid = storage.base.record_iternext(next_oid)[1] | |
except Exception: | |
tid = None | |
error_callback(next_oid, e, tid=tid) | |
next_oid = _next_oid(next_oid) | |
except OSError as e: | |
# Something might be wrong with the stored file offset | |
if debug: | |
logger.exception("Broken record found ({}): {}".format(oid_repr(next_oid), str(e))) | |
pdb.set_trace() | |
if error_callback: | |
error_callback(next_oid, e) | |
next_oid = _next_oid(next_oid) | |
except Exception as e: | |
if debug: | |
logger.exception("Could not open oid ({}): {}".format(oid_repr(next_oid), str(e))) | |
pdb.set_trace() | |
else: | |
# This is likely something serious and should not pass unhandled | |
raise | |
# Keep this in case while after debugging the loop goes on | |
next_oid = _next_oid(next_oid) | |
elif (IStorageIteration.providedBy(storage) and | |
(not IStorageUndoable.providedBy(storage) or | |
not storage.supportsUndo())): | |
# If we can't iterate only through the recent records, | |
# iterate on all. Of course doing a pack before help :). | |
for transaction_ in storage.iterator(): | |
for rec in transaction_: | |
# This contains not a bit of fault-tolerance, but better than nothing | |
yield rec.oid, rec.tid, six.BytesIO(rec.data) | |
else: | |
raise TypeError("Don't know how to iterate through this storage type") | |
class FTUpdater(object): | |
"""Fault-tolerant version of zodbupdate.update.Updater.""" | |
TRANSACTION_COUNT = 100000 | |
def __init__( | |
self, storage, dry=False, renames=None, decoders=None, | |
start_at='0x00', debug=False, repickle_all=False, | |
pickle_protocol=DEFAULT_PROTOCOL, | |
encoding='ASCII'): | |
self.dry = dry | |
self.storage = storage | |
self.processor = serialize.ObjectRenamer( | |
renames=renames, | |
decoders=decoders, | |
pickle_protocol=pickle_protocol, | |
repickle_all=repickle_all, | |
encoding=encoding, | |
) | |
self._protocol = pickle_protocol | |
self.start_at = start_at | |
self.debug = debug | |
def _broken_marker(self, persistent_id=lambda obj: None): | |
"""Create a broken persistent object, that can replace unreadable records.""" | |
broken = PersistentBroken() | |
output_file = six.BytesIO() | |
p = PersistentPickler(persistent_id, output_file, self._protocol) | |
p.dump((broken.__class__, None)) | |
p.dump(broken.__getstate__()) | |
output_file.truncate() | |
return output_file | |
def _error_callback(self, oid, error, tid=None): | |
"""Insert a broken marker object into the storage.""" | |
logger.warning('Replacing not loadable object ({}): {} {}'.format(oid_repr(oid), type(error).__name__, str(error))) | |
bm = self._broken_marker() | |
self.storage.store(oid, tid, bm.getvalue(), '', self.__transaction) | |
def __new_transaction(self): | |
t = TransactionMetaData() | |
self.storage.tpc_begin(t) | |
t.note(six.u('Batch updated factory references.')) | |
return t | |
def __commit_transaction(self, t, changed, commit_count, oid=None): | |
# Be verbose with the commit messages, so that operations in case of an error | |
# can be easily resumed from the last committed oid | |
if self.dry or not changed: | |
logger.info( | |
'Dry run selected or no changes, ' | |
'aborting transaction. (#{} oid {})'.format(commit_count, oid_repr(oid))) | |
self.storage.tpc_abort(t) | |
else: | |
logger.info('Committing changes (#{} oid {}).'.format(commit_count, oid_repr(oid))) | |
self.storage.tpc_vote(t) | |
self.storage.tpc_finish(t) | |
def __call__(self): | |
commit_count = 0 | |
oid = 0 | |
try: | |
record_count = 0 | |
self.__transaction = self.__new_transaction() | |
for oid, tid, data_file in ft_storage_iter(self.storage, self.start_at, error_callback=self._error_callback): | |
try: | |
new = self.processor.rename(data_file) | |
except (pickle.UnpicklingError, zodbpickle.UnpicklingError, CorruptedDataError) as e: | |
logger.warning('Replacing corrupted object ({}): {} {}'.format(oid_repr(oid), type(e).__name__, str(e))) | |
new = self._broken_marker() | |
if new is None: | |
continue | |
self.storage.store(oid, tid, new.getvalue(), '', self.__transaction) | |
record_count += 1 | |
if record_count > self.TRANSACTION_COUNT: | |
record_count = 0 | |
commit_count += 1 | |
self.__commit_transaction(self.__transaction, True, commit_count, oid=oid) | |
self.__transaction = self.__new_transaction() | |
commit_count += 1 | |
self.__commit_transaction(self.__transaction, record_count != 0, commit_count, oid=oid) | |
except Exception as e: | |
logger.exception("Unexpected failure while loading oid {}: {} {}".format(oid_repr(oid), type(e).__name__, str(e))) | |
if self.debug: | |
pdb.post_mortem() | |
sys.exit() | |
def verify_record(oid, data_file, persistent_load=lambda ref: None): | |
unpickler = PersistentUnpickler(None, persistent_load, data_file) | |
class_info = unpickler.load() | |
pos = data_file.tell() | |
return unpickler.load() | |
def verify(storage, start_at=None, debug=False): | |
if start_at: | |
logger.info('Verifying storaged objects starting from {} ...'.format(start_at)) | |
else: | |
logger.info('Verifying all storaged objects ...') | |
issues = defaultdict(list) | |
def error_callback(oid, error, tid=None): | |
logger.warning('Skipping unreadable record ({}): {} {}'.format(oid_repr(oid), type(error).__name__, str(error))) | |
issues["{}: {}".format(type(error).__name__, str(error))].append(oid_repr(oid)) | |
count = -1 | |
for count, (oid, tid, data_file) in enumerate( | |
ft_storage_iter(storage, start_at, debug=debug, error_callback=error_callback)): | |
try: | |
verify_record(oid, data_file) | |
except (pickle.UnpicklingError, zodbpickle.UnpicklingError) as e: | |
if debug: | |
logger.exception("Unpickleable record ({}): {} {}".format(oid_repr(oid), type(e).__name__, str(e))) | |
pdb.set_trace() | |
else: | |
error_callback(oid, e) | |
logger.debug("Unreadable data:\n{}", repr(data_file.getvalue())) | |
except Exception as e: | |
if debug: | |
logger.exception("Unloadable record found ({}): {} {}".format(oid_repr(oid), type(e).__name__, str(e))) | |
pdb.set_trace() | |
else: | |
error_callback(oid, e) | |
logger.debug("Unreadable data:\n{}", repr(data_file.getvalue())) | |
error_count = sum(len(oids) for oids in issues.values()) | |
logger.info("\nScanned {} records. Found {} records that could not be loaded.".format(count + 1, error_count)) | |
if error_count: | |
msg = "" | |
for key in sorted(issues, key=lambda k: len(issues[k]), reverse=True): | |
oids = issues[key] | |
msg += "{} ({} oids): {}\n".format(key, len(oids), ' '.join(oids)) | |
logger.info("Exceptions, how often they happened and which oids are affected:\n\n{}".format(msg)) | |
def convert(storage, start_at='0x00', convert_py3=True, encoding=None, encoding_fallbacks=None, debug=False): | |
if six.PY2 and encoding: | |
raise AssertionError('Unpickling with a default encoding is only supported in Python 3.') | |
decoders = {} | |
renames = load_renames() | |
repickle_all = False | |
pickle_protocol = DEFAULT_PROTOCOL | |
if convert_py3: | |
pickle_protocol = 3 | |
repickle_all = True | |
decoders.update( | |
zodbconvert.load_decoders( | |
encoding_fallbacks=encoding_fallbacks | |
) | |
) | |
renames.update(zodbconvert.default_renames()) | |
updater = FTUpdater( | |
storage, | |
dry=False, | |
renames=renames, | |
decoders=decoders, | |
start_at=start_at, | |
debug=debug, | |
repickle_all=repickle_all, | |
pickle_protocol=pickle_protocol, | |
encoding=encoding) | |
implicit_renames = format_renames(updater.processor.get_rules(implicit=True, explicit=True)) | |
if implicit_renames: | |
logger.info('Applying rename rules:\n{}'.format(implicit_renames)) | |
logger.info('Converting storage ...') | |
try: | |
updater() | |
except Exception as e: | |
logger.exception('Stopped processing due to: {} {}'.format(type(e).__name__, str(e))) | |
if debug: | |
pdb.set_trace() | |
else: | |
sys.exit() | |
def reload_index(storage): | |
logger.info('Restoring index from storage ...') | |
getattr(storage, 'base', storage).reload_index() | |
def pack(storage): | |
logger.info('Packing storage ...') | |
storage.pack(time.time(), referencesf) | |
def repair(storage, oid, replacement=None, debug=False): | |
oid_repr = oid | |
oid = repr_to_oid(oid) | |
try: | |
old_data, old_tid = load_current(storage, oid) | |
except POSKeyError: | |
logger.info("No previous object with oid {} found".format(oid_repr)) | |
old_tid = None | |
except zlib.error as e: | |
logger.info("Specified object ({}) could not be decompressed: {}".format(oid_repr, str(e))) | |
old_data, old_tid = load_current(storage.base, oid) | |
logger.debug("Indecompressible data:\n{}".format(repr(old_data))) | |
else: | |
try: | |
obj = verify_record(oid, six.BytesIO(old_data)) | |
except (pickle.UnpicklingError, zodbpickle.UnpicklingError): | |
# Overwriting such broken objects is what we want to do | |
pass | |
except Exception as e: | |
if debug: | |
pdb.set_trace() | |
else: | |
raise | |
else: | |
logger.info("Object could be successfully loaded: {}".format(repr(obj))) | |
logger.info("Aborting replacement!".format(repr(obj))) | |
return | |
logger.info("Replacing stored object ({}) with a place holder".format(oid_repr)) | |
if replacement is None: | |
replacement = PersistentBroken() | |
replacement._p_oid = oid | |
def persistent_id(ref): | |
return None | |
# See zc.zodbgc for reference | |
buffer = six.BytesIO() | |
p = PersistentPickler(persistent_id, buffer, 1) | |
p.dump((replacement.__class__, None)) | |
p.dump(replacement.__getstate__()) | |
t = TransactionMetaData() | |
t.note('Replace broken record') | |
try: | |
storage.tpc_begin(t) | |
storage.store(oid, old_tid, buffer.getvalue(), '', t) | |
storage.tpc_vote(t) | |
storage.tpc_finish(t) | |
except Exception as e: | |
if debug: | |
pdb.set_trace() | |
else: | |
raise | |
def get_storage(fpath, zodbconfig, read_only=True, zlib=False, check_version=True): | |
if fpath and zodbconfig: | |
raise AssertionError('Exactly one of --file or --config must be given.') | |
if fpath and check_version: | |
zodb_magic = get_zodb_magic(fpath) | |
if zodb_magic != FS_packed_version: | |
sys.exit('Verification needts to run with the same python version that the database was created with') | |
logger.info('Loading storage ...') | |
if fpath: | |
try: | |
storage = FTFileStorage(fpath, read_only=read_only) | |
except FileStorageFormatError: | |
sys.exit("Can not open a storage created with another python version.") | |
if zlib: | |
from zc.zlibstorage import ZlibStorage | |
storage = ZlibStorage(storage) | |
elif zodbconfig: | |
with open(zodbconfig) as config: | |
storage = storageFromFile(config) | |
else: | |
raise AssertionError('Exactly one of --file or --config must be given.') | |
return storage | |
def get_parser(): | |
parser = argparse.ArgumentParser( | |
prog="zodbtool", | |
description="Verifies, converts, migrates or packs a ZODB database.", | |
) | |
parser.add_argument( | |
"action", | |
choices=["verify", "repair", "convert", "pack"], | |
nargs='+', | |
help="actions to be performed" | |
) | |
exclusive_group = parser.add_mutually_exclusive_group() | |
exclusive_group.add_argument( | |
"-f", "--file", | |
help="load storage from given file", | |
) | |
exclusive_group.add_argument( | |
"-c", "--config", | |
help="load storage from config file", | |
) | |
parser.add_argument( | |
"-D", "--debug", | |
action="store_true", | |
dest="debug", | |
help="pause to debug broken pickles", | |
) | |
parser.add_argument( | |
"-o", "--oid", | |
action="store", | |
dest="oid", | |
help="oid to start working", | |
) | |
parser.add_argument( | |
"--zlib", | |
action="store_true", | |
dest="zlib", | |
help="use zlibstorage", | |
) | |
parser.add_argument( | |
"--convert-py3", | |
action="store_true", | |
dest="convert_py3", | |
help="convert the database from python2 to python3" | |
) | |
parser.add_argument( | |
"--encoding", | |
dest="encoding", | |
default="utf8", | |
help="used for decoding pickled strings in py3" | |
) | |
parser.add_argument( | |
"--encoding-fallback", | |
dest="encoding_fallbacks", | |
nargs="*", | |
help="Older databases may have other encoding stored than 'utf-8', like latin1." | |
"If an encoding error occurs, fallback to the given encodings " | |
"and issue a warning.", | |
) | |
parser.add_argument( | |
'-v', '--verbose', | |
action="store_true", | |
help="more verbose output" | |
) | |
return parser | |
def patch_copyreg_reconstructor(): | |
def _reconstructor(cls, base, state): | |
"""Replacement for copyreg._reconstructor to handle some pesky data types.""" | |
if base is object: | |
obj = object.__new__(cls) | |
elif cls.__name__ == '_ElementStringResult' and isinstance(state, str): | |
return cls(state, 'utf8') | |
else: | |
obj = base.__new__(cls, state) | |
if base.__init__ != object.__init__: | |
base.__init__(obj, state) | |
return obj | |
# Make sure, it gets found in the copyreg module and not here by ObjectRenamer | |
_reconstructor.__module__ = 'copyreg' | |
copyreg._reconstructor = _reconstructor | |
del _reconstructor | |
def main(): | |
patch_copyreg_reconstructor() | |
args = get_parser().parse_args() | |
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) | |
# Capture the output from zodbupdate.serialize | |
serialize.logger = logger | |
if 'verify' in args.action: | |
with closing(get_storage(args.file, args.config, read_only=True, zlib=args.zlib)) as storage: | |
verify(storage, start_at=args.oid, debug=args.debug) | |
if 'repair' in args.action: | |
with closing(get_storage(args.file, args.config, read_only=False, zlib=args.zlib)) as storage: | |
repair(storage, oid=args.oid, debug=args.debug) | |
if 'convert' in args.action: | |
if not args.file: | |
sys.exit("Storage file for conversion needs to be defined using parameter -f") | |
if args.convert_py3: | |
if not six.PY3: | |
sys.exit("Conversion to python3 readable format should be done with python3") | |
# Magic bytes need to be converted at the end when running in Python 2 | |
# but at the beginning when running in Python 3 so that FileStorage | |
# doesn't complain. | |
fpath = args.file | |
fpath_index = fpath + '.index' | |
if os.path.exists(fpath_index): | |
os.rename(fpath_index, fpath_index + '.bak') | |
logger.info("Storage index file {} was renamed in order not to cause issues with the conversion".format(fpath_index)) | |
zodbconvert.update_magic_data_fs(fpath) | |
with closing(get_storage(args.file, args.config, read_only=False, zlib=args.zlib, check_version=False)) as storage: | |
convert(storage, | |
start_at=args.oid, | |
convert_py3=args.convert_py3, | |
encoding=args.encoding, | |
encoding_fallbacks=args.encoding_fallbacks, | |
debug=args.debug) | |
reload_index(storage) | |
if 'pack' in args.action: | |
with closing(get_storage(args.file, args.config, read_only=False, zlib=args.zlib)) as storage: | |
pack(storage) | |
if __name__ == '__main__': | |
try: | |
main() | |
except AssertionError as e: | |
sys.exit(str(e)) | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment