Skip to content

Instantly share code, notes, and snippets.

@MatthewWilkes
Created August 20, 2018 22:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save MatthewWilkes/f1231ea94188ed160634aa83a8f84dbd to your computer and use it in GitHub Desktop.
Save MatthewWilkes/f1231ea94188ed160634aa83a8f84dbd to your computer and use it in GitHub Desktop.
A custom ZODB recovery script for ZODB
# ZODB recovery script that 'rebases' transactions onto an older copy of the same
# FileStorage. This is useful in the case where a FileStorage is lightly corrupted
# but that is not detected and it remains in production, garnering new transactions.
#
# The corruption in the storage may have prevented various transactions for completing
# and prevents packing or incremental backups, so backup taken before the corruption
# happened is used as a graft for the newer transactions. This only works if all
# the corruption happens to transactions covered by the backup, corruption that affects
# the transactions being appended is not recoverable in this fashion.
#
# (c) Matthew Wilkes, 2018. GPLv2.
import math
import os
import shutil
import sys
import time
from ZODB.fstools import TxnHeader
from ZODB.FileStorage.format import FileStorageFormatter
from ZODB.fsIndex import fsIndex
from ZODB.serialize import referencesf
# A file that represents the newest state of the database, that may be corrupted
new_fs = '/mnt/zeo/blob/filestorage/Data.fs'
# A known good state, from an old backup
old_fs = '/mnt/zeo/bak/for_recovery/mwilkes/weekly_datafs_old/Data.fs.old'
# Output filename
restore_fs = '/mnt/zeo/bak/for_recovery/mwilkes/patched-20180818.fs'
# Last common transaction id
boundary_txn = '03c66e29bfc50dbb'.decode('hex')
SHOULD_COPY = True
last_progress = 0
if SHOULD_COPY:
print "Starting initial copy"
# Copy the old FS entirely into the restore FS, as this
# section does not need any modification. We try and get
# the OS to handle this, to avoid userspace slowdown
shutil.copy(old_fs, restore_fs)
print "Copied old fs"
# Open the restore file unbuffered, as we will be
# doing lots of small edits and re-reads, causing the
# buffer to always be invalidated
restore_file = open(restore_fs, 'rb+', 0)
try:
if SHOULD_COPY:
# Seek to the end of the new file, so writes append
restore_file.seek(0, 2)
new_file = open(new_fs, 'rb')
try:
base_txn = TxnHeader(new_file, 4)
current = base_txn
reached_boundary = False
# Scan through transactions looking for the known good one.
# We could equally do this by using the last transaction, but
# this method allows us to pick different graft points
while current:
if current.tid == boundary_txn:
reached_boundary = current.next_txn()._pos
print "Reached boundary txn"
break
else:
current = current.next_txn()
new_file.seek(reached_boundary)
while True:
# Copy a megabyte over at a time, as that's the default
# buffer size
data = new_file.read(2**20)
if not data:
break
restore_file.write(data)
sys.stdout.write(".")
sys.stdout.flush()
finally:
new_file.close()
print "Rewriting data header pointers"
restore_file.seek(0, 2)
final_pos = restore_file.tell()
formatter = FileStorageFormatter()
formatter._file = restore_file
# There is a check that timestamps are increasing, populate the last
# seen timestamp with 0 to fix the initial case
formatter.ltid = 0
index = fsIndex()
if not SHOULD_COPY:
# Rewrite from the start of the file
rewrite_offset = 4
else:
# Rewrite from the graft point
rewrite_offset = reached_boundary
current = base_txn = TxnHeader(restore_file, rewrite_offset)
while current is not None:
# Show one . for each 1% of the file processed
current_progress = (current._pos / float(final_pos)) * 100.0
current_progress = math.floor(current_progress)
if current_progress > last_progress:
sys.stdout.write("." * int(current_progress - last_progress))
sys.stdout.flush()
last_progress = current_progress
# Verify the txn header
formatted_txn = formatter._read_txn_header(current._pos)
formatter.checkTxn(formatted_txn, current._pos)
# Find the current transaction boundary, so we know when we have
# processed all data records
next_txn = current._pos + current.length + 8
# Rewrite the internal data headers
data_offset = current.get_data_offset()
# Continue looping over data records until we exceed the end of the
# transaction. The trailing 8 bytes is the length of the transaction,
# to allow backtracking. That is the relevant boundary.
while data_offset < (next_txn - 8):
current_data = formatter._read_data_header(data_offset)
# Rewrite this data header
current_data.tloc = current._pos
# Recalculate the previous pointers in this txn
current_data.prev = index.get(current_data.oid, 0)
new_header = current_data.asString()
# Overwrite the data header
restore_file.seek(data_offset)
restore_file.write(new_header)
# Check that our new header is valid
formatter.checkData(formatted_txn, current._pos, current_data, data_offset)
# Index this oid position, so we can fix future prev pointers
index[current_data.oid] = data_offset
# Advance to the next data record
data_offset = data_offset + current_data.recordlen()
# Advance to the next transaction record
current = current.next_txn()
finally:
restore_file.close()
print
print "Writing index"
fs = FileStorage(restore_fs)
print "Packing FileStorage"
fs.pack(time.time(), referencesf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment