Skip to content

Instantly share code, notes, and snippets.

@willcl-ark
Last active August 3, 2023 15:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save willcl-ark/d1dd7d80d4581671aa38157960a87ba7 to your computer and use it in GitHub Desktop.
Save willcl-ark/d1dd7d80d4581671aa38157960a87ba7 to your computer and use it in GitHub Desktop.
xor a Bitcoin Core blocks directory
#!/usr/bin/env python3
import argparse
import logging
import os
import random
import traceback
from functools import partial
from multiprocessing import Pool, cpu_count
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)
XOR_KEY_NAME = "xor.key"
XOR_KEY_SIZE_BYTES = 8
# Block files can be 134MB each, opening one per CPU could use a lot of memory
# Read in chunks to reduce resource usage
DEFAULT_CHUNK_SIZE_BYTES = 8192
DEFAULT_XOR_KEY = b'\x00\x00\x00\x00\x00\x00\x00\x00'
def generate_xor_key(directory_path):
key =b""
xor_key_path = os.path.join(directory_path, XOR_KEY_NAME)
if os.path.exists(xor_key_path):
logging.info(f"Found xor key at {xor_key_path}")
with open(xor_key_path, "rb") as key_file:
key = key_file.readline()
if not key == DEFAULT_XOR_KEY:
raise FileExistsError(f"xor key '{XOR_KEY_NAME}' already exists in {directory_path}. Won't xor already xored files :)")
else:
logging.info(f"Existing key uses default value of {DEFAULT_XOR_KEY.hex()} (no effective xor)")
logging.info(f"Creating new xor key")
key = bytes([random.getrandbits(8) for _ in range(XOR_KEY_SIZE_BYTES)])
with open(xor_key_path, "wb") as key_file:
key_file.write(key)
os.chmod(xor_key_path, 0o600)
logging.info(f"Written new {XOR_KEY_SIZE_BYTES} byte xor key to {xor_key_path}")
return key
def xor_file(filename, key, chunk_size):
with open(filename, "rb+") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
xored_chunk = bytes([byte ^ key[i % 8] for i, byte in enumerate(chunk)])
f.seek(f.tell() - len(chunk))
f.write(xored_chunk)
logging.debug(f"xor of {filename} complete")
def xor_dir(directory_path, chunk_size):
key = generate_xor_key(directory_path)
files_to_process = [
f
for f in os.listdir(directory_path)
if (f.startswith("blk") or f.startswith("rev")) and f.endswith(".dat")
]
logging.debug(f"files to xor: {files_to_process}")
xor_file_with_key = partial(xor_file, key=key, chunk_size=chunk_size)
with Pool(cpu_count()) as pool:
pool.map(
xor_file_with_key,
[os.path.join(directory_path, f) for f in files_to_process],
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="xor-blocks", description="XOR all blk*.dat and rev*.dat files in a directory.\nXOR key will be written to \"xor.key\" file in given directory.")
parser.add_argument("directory", type=str, help="Directory containing the files to XOR.")
parser.add_argument("-c", "--chunk_size", type=int, default=DEFAULT_CHUNK_SIZE_BYTES, help="Chunk size used for processing files. Decrease to reduce RAM usage.")
parser.add_argument('-v', '--verbose', action='store_true', help='Enable debug logging.')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
logging.info(f"Using directory: {args.directory}")
logging.info(f"Using chunk size: {args.chunk_size}")
directory_path = os.path.abspath(args.directory)
if not os.path.exists(directory_path) or not os.path.isdir(directory_path):
logging.error(f"Provided directory '{directory_path}' does not exist or is not a directory.")
exit(1)
try:
xor_dir(directory_path, args.chunk_size)
except FileExistsError as e:
logging.error(e)
exit(1)
except Exception:
logging.error(f"Unexpected error\n{traceback.format_exc()}")
exit(1)
logging.info(f"Successfully xored block and rev files in {args.directory}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment