Last active
August 3, 2023 15:22
-
-
Save willcl-ark/d1dd7d80d4581671aa38157960a87ba7 to your computer and use it in GitHub Desktop.
xor a Bitcoin Core blocks directory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import logging | |
import os | |
import random | |
import traceback | |
from functools import partial | |
from multiprocessing import Pool, cpu_count | |
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) | |
XOR_KEY_NAME = "xor.key" | |
XOR_KEY_SIZE_BYTES = 8 | |
# Block files can be 134MB each, opening one per CPU could use a lot of memory | |
# Read in chunks to reduce resource usage | |
DEFAULT_CHUNK_SIZE_BYTES = 8192 | |
DEFAULT_XOR_KEY = b'\x00\x00\x00\x00\x00\x00\x00\x00' | |
def generate_xor_key(directory_path): | |
key =b"" | |
xor_key_path = os.path.join(directory_path, XOR_KEY_NAME) | |
if os.path.exists(xor_key_path): | |
logging.info(f"Found xor key at {xor_key_path}") | |
with open(xor_key_path, "rb") as key_file: | |
key = key_file.readline() | |
if not key == DEFAULT_XOR_KEY: | |
raise FileExistsError(f"xor key '{XOR_KEY_NAME}' already exists in {directory_path}. Won't xor already xored files :)") | |
else: | |
logging.info(f"Existing key uses default value of {DEFAULT_XOR_KEY.hex()} (no effective xor)") | |
logging.info(f"Creating new xor key") | |
key = bytes([random.getrandbits(8) for _ in range(XOR_KEY_SIZE_BYTES)]) | |
with open(xor_key_path, "wb") as key_file: | |
key_file.write(key) | |
os.chmod(xor_key_path, 0o600) | |
logging.info(f"Written new {XOR_KEY_SIZE_BYTES} byte xor key to {xor_key_path}") | |
return key | |
def xor_file(filename, key, chunk_size): | |
with open(filename, "rb+") as f: | |
while True: | |
chunk = f.read(chunk_size) | |
if not chunk: | |
break | |
xored_chunk = bytes([byte ^ key[i % 8] for i, byte in enumerate(chunk)]) | |
f.seek(f.tell() - len(chunk)) | |
f.write(xored_chunk) | |
logging.debug(f"xor of {filename} complete") | |
def xor_dir(directory_path, chunk_size): | |
key = generate_xor_key(directory_path) | |
files_to_process = [ | |
f | |
for f in os.listdir(directory_path) | |
if (f.startswith("blk") or f.startswith("rev")) and f.endswith(".dat") | |
] | |
logging.debug(f"files to xor: {files_to_process}") | |
xor_file_with_key = partial(xor_file, key=key, chunk_size=chunk_size) | |
with Pool(cpu_count()) as pool: | |
pool.map( | |
xor_file_with_key, | |
[os.path.join(directory_path, f) for f in files_to_process], | |
) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(prog="xor-blocks", description="XOR all blk*.dat and rev*.dat files in a directory.\nXOR key will be written to \"xor.key\" file in given directory.") | |
parser.add_argument("directory", type=str, help="Directory containing the files to XOR.") | |
parser.add_argument("-c", "--chunk_size", type=int, default=DEFAULT_CHUNK_SIZE_BYTES, help="Chunk size used for processing files. Decrease to reduce RAM usage.") | |
parser.add_argument('-v', '--verbose', action='store_true', help='Enable debug logging.') | |
args = parser.parse_args() | |
if args.verbose: | |
logging.getLogger().setLevel(logging.DEBUG) | |
else: | |
logging.getLogger().setLevel(logging.INFO) | |
logging.info(f"Using directory: {args.directory}") | |
logging.info(f"Using chunk size: {args.chunk_size}") | |
directory_path = os.path.abspath(args.directory) | |
if not os.path.exists(directory_path) or not os.path.isdir(directory_path): | |
logging.error(f"Provided directory '{directory_path}' does not exist or is not a directory.") | |
exit(1) | |
try: | |
xor_dir(directory_path, args.chunk_size) | |
except FileExistsError as e: | |
logging.error(e) | |
exit(1) | |
except Exception: | |
logging.error(f"Unexpected error\n{traceback.format_exc()}") | |
exit(1) | |
logging.info(f"Successfully xored block and rev files in {args.directory}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment