Multiprocessing bruteforcing script to find a valid CRC32 checksum in a file which quotes said CRC32 value, i.e. a self-referential CRC32.
#!/usr/bin/python3 | |
import logging | |
import argparse | |
import sys | |
import binascii | |
import multiprocessing as mp | |
from time import sleep | |
SLEEP_TIME = 0.1 | |
CRC32_BYTES = 4 | |
CRC32_MAX = 2**32 | |
TEMPLATE = None | |
DESTINATION = None | |
CRC_QUEUE = None | |
def crc_pos_bytes_print(start_pos): | |
end_pos = start_pos + CRC32_BYTES | |
bytes_str = "{} >> {} << {}".format(TEMPLATE[start_pos-CRC32_BYTES:start_pos].hex(), TEMPLATE[start_pos:end_pos].hex(), TEMPLATE[end_pos:end_pos+CRC32_BYTES].hex()) | |
return "byte positions {} to {} ({})".format(start_pos, end_pos, bytes_str) | |
def crc_queue_handler(start_crc): | |
for crc in range(start_crc, CRC32_MAX): | |
CRC_QUEUE.put(crc) | |
# print("Queued CRC {}".format(crc)) | |
def bruteforce_crc_process(): | |
while True: | |
crc = CRC_QUEUE.get() | |
if crc == None: | |
sleep(SLEEP_TIME) | |
continue | |
crc_bytes = crc.to_bytes(4, 'little') | |
# print("Trying CRC {} ({})".format(str(crc), crc_bytes.hex())) | |
new_infile = TEMPLATE[0:qpos] + crc_bytes + TEMPLATE[qpos+CRC32_BYTES:pos] + crc_bytes + TEMPLATE[pos+CRC32_BYTES:] | |
new_infile_crc = binascii.crc32(new_infile) | |
# print("Modified source using CRC {} has the CRC value {}".format(str(crc), str(new_infile_crc))) | |
if crc == new_infile_crc: | |
return (crc, new_infile) | |
def crc_match(crc_and_new_infile): | |
crc = crc_and_new_infile[0] | |
new_infile = crc_and_new_infile[1] | |
print("CRC {} ({}) is a match!".format(crc, crc.to_bytes(4, 'little').hex())) | |
outfile = open(DESTINATION, 'wb') | |
outfile.write(new_infile) | |
outfile.close() | |
print("Infile with self-referencing CRC written to {}".format(DESTINATION)) | |
# pool and crc_queue_handler_process are visible because it's global in __main__ | |
pool.terminate() | |
crc_queue_handler_process.terminate() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('infile') | |
parser.add_argument('outfile') | |
parser.add_argument('pos', type=int) | |
parser.add_argument('qpos', type=int) | |
parser.add_argument('-s', '--start_at', type=int, default=0) | |
args = parser.parse_args(sys.argv[1:]) | |
source = open(args.infile, 'rb') | |
TEMPLATE = bytearray(source.read()) | |
source.close() | |
if len(TEMPLATE) == 0: | |
print("Infile contains no bytes") | |
sys.exit(-1) | |
DESTINATION = args.outfile | |
try: | |
dest = open(DESTINATION, 'w+') | |
dest.close() | |
except IOError: | |
print("Cannot open {} outfile".format(DESTINATION)) | |
sys.exit(-1) | |
pos = args.pos | |
qpos = args.qpos | |
print("Finding CRC for {}".format(crc_pos_bytes_print(pos))) | |
print("Quoted CRC at {}".format(crc_pos_bytes_print(qpos))) | |
start_crc = args.start_at | |
print("Starting at CRC {}".format(start_crc)) | |
cpus = mp.cpu_count() | |
CRC_QUEUE = mp.Queue(maxsize=cpus) | |
crc_queue_handler_process = mp.Process(target=crc_queue_handler, args=(start_crc,)) | |
crc_queue_handler_process.start() | |
pool = mp.Pool(cpus) | |
pool.apply_async(bruteforce_crc_process, callback=crc_match) | |
pool.close() | |
pool.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment