Skip to content

Instantly share code, notes, and snippets.

@m33x
Last active November 28, 2019 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save m33x/9632d4c47483e5a6768865a485e71f40 to your computer and use it in GitHub Desktop.
Save m33x/9632d4c47483e5a6768865a485e71f40 to your computer and use it in GitHub Desktop.
Markov model to predict next char(s) based on a given prefix
#!/usr/bin/env pypy
# -*- coding: utf-8 -*-
''' This script loads the training and predicts the next most likely passwords
:author: Maximilian Golla
:contact: maximilian.golla@rub.de
:version: 0.0.1, 2019-11-28
'''
# Load external modules
from configs.configure import *
''' Loads the training data from disk '''
def worker(length):
ngram_creator = NGramCreator({
"name": CONFIG.NAME,
"alphabet": CONFIG.ALPHABET,
"ngram_size": CONFIG.NGRAM_SIZE,
"training_file": "input/"+CONFIG.TRAINING_FILE,
"length": length,
"progress_bar": CONFIG.PROGRESS_BAR
})
logging.debug("Thread: {} - ip_list load() ...".format(length))
ngram_creator.load("ip_list")
logging.debug("Thread: {} - cp_list load() ...".format(length))
ngram_creator.load("cp_list")
logging.debug("Thread: {} - ep_list load() ...".format(length))
ngram_creator.load("ep_list")
logging.debug("Thread: {} - Loading done ...".format(length))
MARKOV_MODELS.append(ngram_creator)
''' Every length has its own model, we select the correct model for every password '''
def _select_correct_markov_model(pw_length, markov_models):
result = markov_models[0] # Fallback solution, if there is no model for the selected length
for model in markov_models:
if model.length == pw_length:
result = model
return result
''' This function predicts the next likely password, given a prefix '''
def predict(pw_prefix):
# ngram creator
global MARKOV_MODELS
MARKOV_MODELS = []
threads = []
for length in CONFIG.LENGTHS:
# Using threads is not beneficial, because it's a disk intensive task
thread = Thread(target = worker, args = (length,))
thread.start()
threads.append(thread)
# Wait for all threads to finish
for thread in threads:
thread.join()
logging.debug("Training loaded from disk ...")
logging.debug("Number of Markov models: "+str(len(MARKOV_MODELS)))
# Determine correct model
ngram_creator = _select_correct_markov_model(len(pw_prefix) + 1 , MARKOV_MODELS) ## +1 For the prediction
results = dict() # Maintain all probs, and sort them at the end
# Generate all possible next passwords
for c in ngram_creator.alphabet:
line = pw_prefix + c
if len(line) != ngram_creator.length: # Important to prevent generating "passwor", or "iloveyo", or "babygir"
sys.stderr.write("\x1b[1;%dm" % (31) + "Info: No Markov model for this length: {} {}\n".format(len(line),line) + "\x1b[0m")
sys.exit(-1)
if ngram_creator._is_in_alphabet(line): # Filter non-printable
ip = line[:ngram_creator.ngram_size-1]
ip_prob = ngram_creator.ip_list[ngram_creator._n2iIP(ip)]
ep = line[len(line)-(ngram_creator.ngram_size-1):]
ep_prob = ngram_creator.ep_list[ngram_creator._n2iIP(ep)]
old_pos = 0
cp_probs = []
for new_pos in range(ngram_creator.ngram_size, len(line)+1, 1):
cp = line[old_pos:new_pos]
cp_probs.append(ngram_creator.cp_list[ngram_creator._n2iCP(cp)])
old_pos += 1
pw_prob = ip_prob * ep_prob
for cp_prob in cp_probs:
pw_prob = pw_prob * cp_prob
results[line] = pw_prob
# Sort all possible probs
results_sorted = sorted(results.items(), key=lambda kv: kv[1], reverse=True)
# Output
print("Given '{}' the top 5 most likely predictions are:".format(pw_prefix))
for i in range(0, 5):
print(results_sorted[i])
def main():
try:
global CONFIG
CONFIG = Configure({"name":"My Config"})
pw_prefix = "passwor"
## NGRAM_SIZE = 4
# PASS: passwor?
# INTR: ^passwor?$
#
# IP: pas
# CP1: pass
# CP2: assw
# CP3: sswo
# CP4: swor
# CP5: wor?
# EP: or?
#
'''
Given 'passwor' the top 5 most likely predictions are:
('password', 5.217937750136535e-07)
('passwork', 5.0743563645302264e-09)
('passwort', 1.311252190776853e-09)
('passworm', 2.1270776175931523e-10)
('passwore', 1.196025357549521e-10)
'''
predict(pw_prefix)
except KeyboardInterrupt:
print('User canceled')
sys.exit(1)
except Exception as e:
sys.stderr.write("\x1b[1;%dm" % (31) + "Error: {}\n".format(e) + "\x1b[0m")
sys.exit(1)
if __name__ == '__main__':
print("{0}: {1:%Y-%m-%d %H:%M:%S}\n".format("Start", datetime.datetime.now()))
print("Press Ctrl+C to shutdown")
main()
print("{0}: {1:%Y-%m-%d %H:%M:%S}".format("Done", datetime.datetime.now()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment