Skip to content

Instantly share code, notes, and snippets.

@yohanesyuen
Created April 3, 2021 16:04
Show Gist options
  • Save yohanesyuen/95ba4117656f0b32f06d10dce6b9ede8 to your computer and use it in GitHub Desktop.
Save yohanesyuen/95ba4117656f0b32f06d10dce6b9ede8 to your computer and use it in GitHub Desktop.
import secrets
import os
import sys
import argparse
import sqlite3
import time
import hashlib
from hash_hunter.models import Session, HashChain
from threading import Thread
from queue import Queue
import logging
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class DatabaseManager(object, metaclass=Singleton):
def __init__(self):
self.cache = {}
self._session = self.get_session()
self.load_cache()
def load_cache(self):
session = self._session
chains = session.query(HashChain).all()
for chain in chains:
self.cache[chain.last] = chain
def add_chain(self, chain: HashChain):
cache = self.cache
session = self._session
session.add(chain)
session.commit()
cache[chain.last] = chain
def query(self, digest):
if digest in self.cache:
return self.cache[digest]
else:
return None
def delete(self, chain):
session = self._session
session.query(HashChain).filter(HashChain.last == chain.last).delete(synchronize_session=False)
session.expire_all()
del self.cache[chain.last]
def get_session(self):
if not hasattr(self, '_session'):
self._session = Session()
return self._session
def hash_str(s):
m = hashlib.sha256()
m.update(s.encode('utf8'))
return m.hexdigest()
def get_new_start_str():
return '{:x}'.format(secrets.randbits(256))
def generate_chain(start, iterations=2**8-1):
v = start
checkpoint = iterations // 5
for i in range(iterations):
if i % (checkpoint) == 0:
pct_complete = (i / iterations) * 100
logging.debug(f'{pct_complete} %')
v = hash_str(v)
return start, v, iterations
def crack(v):
logging.debug(f'Cracking {v}')
db = DatabaseManager()
h = hash_str(v)
chain = None
iterations = 2**23-1
checkpoint = iterations // 5
for i in range(iterations):
if i % (checkpoint) == 0:
pct_complete = (i / iterations) * 100
logging.debug(f'{pct_complete} %')
if h in db.cache.keys():
chain = db.cache[h]
break
h = hash_str(h)
if chain:
print(f'Found chain: {repr(chain)}')
plaintext = chain.start
for i in range(iterations):
h = hash_str(plaintext)
if plaintext == v:
print(f'Plaintext: {plaintext}\nHash: {h}')
plaintext = h
else:
print(f'No chains found for {v}')
class Task(Thread):
def __init__(self):
Thread.__init__(self)
self.running = False
self.complete = False
def run(self):
if hasattr(self, '_run'):
self._run()
else:
raise NotImplementedError()
def join(self):
self.running = False
while not self.complete:
time.sleep(5)
super().join()
class CleanupThread(Task):
def __init__(self):
super().__init__()
def run(self):
db = DatabaseManager()
has_dups = Queue()
logging.debug('Cleaning database...')
for last, chain in db.cache.items():
logging.debug(f'Checking {chain.start} -> {chain.last}')
current = chain.start
for i in range(chain.iterations):
if db.query(current) is not None:
has_dups.put(chain)
current = hash_str(current)
while has_dups.qlen() > 0:
logging.debug(f'Deleting {chain.start} -> {chain.last}')
db.delete(has_dups.get())
logging.debug(f'Deleted {chain.start} -> {chain.last}')
self.complete = True
class ChainGenerationThread(Task):
def __init__(self, queue):
super().__init__()
self.queue = queue
def run(self):
self.running = True
while self.running:
logging.debug('Generating chain...')
start, end, iterations = generate_chain(get_new_start_str(), 2**23-1)
chain = HashChain(
start=start,
last=end,
iterations=iterations)
logging.debug('Generated chain: ' + str(chain))
self.queue.put(chain)
self.complete = True
class ChainAdderThread(Task):
def __init__(self, queue: Queue):
super().__init__()
self.queue = queue
def run(self):
self.running = True
while self.running or not self.queue.empty():
logging.debug('Queue Polling...')
if self.queue.empty():
time.sleep(5)
continue
chain = self.queue.get()
db = DatabaseManager()
logging.debug('Adding chain: ' + str(chain))
db.add_chain(chain)
logging.debug('Added chain: ' + str(chain))
self.complete = True
def mt_generate():
queue = Queue()
threads = []
try:
logging.debug('Creating threads...')
# for i in range(2):
# threads.append(ChainGenerationThread(queue))
# threads.append(ChainAdderThread(queue))
threads.append(CleanupThread())
logging.debug('Starting threads...')
for t in threads:
t.start()
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.debug('Joining threads...')
for t in threads:
t.join()
def main(args):
logging.basicConfig(level=logging.DEBUG, format='%(threadName)s %(message)s')
v = 'cdea5f0088c4f3bee5611db0e79c552d8728f1b5c260da9d49abb38756646855'
action = sys.argv.pop(1)
if action == 'crack':
crack(sys.argv.pop(1))
elif action == 'generate':
mt_generate()
else:
print('Unknown action: %s' % action)
if __name__ == '__main__':
main(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment