Skip to content

Instantly share code, notes, and snippets.

@englehardt
Last active March 6, 2020 13:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save englehardt/5860308c5a69e622bf15b16735343934 to your computer and use it in GitHub Desktop.
Save englehardt/5860308c5a69e622bf15b16735343934 to your computer and use it in GitHub Desktop.
from urlparse import urlparse
from Crypto.Hash import MD2
import pandas as pd
import cookies as ck
import hackercodecs # noqa
import hashlib
import pyblake2
import urllib
import sha3
import mmh3
import mmhash
import base64
import base58
import zlib
import json
import re
from urllib import quote_plus
# DELIMITERS = re.compile('[&|\,]')
DELIMITERS = re.compile('[&|\,]|%s|%s' % (quote_plus("="), quote_plus("&")))
EXTENSION_RE = re.compile('\.[A-Za-z]{2,4}$')
ENCODING_LAYERS = 3
ENCODINGS_NO_ROT = ['base16', 'base32', 'base58', 'base64',
'urlencode', 'yenc', 'entity',
'deflate', 'zlib', 'gzip']
LIKELY_ENCODINGS = ['base16', 'base32', 'base58', 'base64',
'urlencode', 'yenc', 'entity']
HASHES = ['md2', 'md4', 'md5', 'sha1', 'sha256', 'sha224', 'sha384',
'sha512', 'sha3_224', 'sha3_256', 'sha3_384', 'sha3_512', 'mmh2',
'mmh2_unsigned', 'mmh3_32', 'mmh3_64_1', 'mmh3_64_2', 'mmh3_128',
'ripemd160', 'whirlpool', 'blake2b', 'blake2s']
class Hasher():
def __init__(self):
# Define Supported hashes
hashes = dict()
hashes['md2'] = lambda x: self._get_md2_hash(x)
hashes['md4'] = lambda x: self._get_hashlib_hash('md4', x)
hashes['md5'] = lambda x: hashlib.md5(x).hexdigest()
hashes['sha'] = lambda x: self._get_hashlib_hash('sha', x)
hashes['sha1'] = lambda x: hashlib.sha1(x).hexdigest()
hashes['sha256'] = lambda x: hashlib.sha256(x).hexdigest()
hashes['sha224'] = lambda x: hashlib.sha224(x).hexdigest()
hashes['sha384'] = lambda x: hashlib.sha384(x).hexdigest()
hashes['sha512'] = lambda x: hashlib.sha512(x).hexdigest()
hashes['sha3_224'] = lambda x: sha3.sha3_224(x).hexdigest()
hashes['sha3_256'] = lambda x: sha3.sha3_256(x).hexdigest()
hashes['sha3_384'] = lambda x: sha3.sha3_384(x).hexdigest()
hashes['sha3_512'] = lambda x: sha3.sha3_512(x).hexdigest()
hashes['mmh2'] = lambda x: str(mmhash.get_hash(x))
hashes['mmh2_unsigned'] = lambda x: str(mmhash.get_unsigned_hash(x))
hashes['mmh3_32'] = lambda x: str(mmh3.hash(x))
hashes['mmh3_64_1'] = lambda x: str(mmh3.hash64(x)[0])
hashes['mmh3_64_2'] = lambda x: str(mmh3.hash64(x)[1])
hashes['mmh3_128'] = lambda x: str(mmh3.hash128(x))
hashes['ripemd160'] = lambda x: self._get_hashlib_hash('ripemd160', x)
hashes['whirlpool'] = lambda x: self._get_hashlib_hash('whirlpool', x)
hashes['blake2b'] = lambda x: pyblake2.blake2b(x).hexdigest()
hashes['blake2s'] = lambda x: pyblake2.blake2s(x).hexdigest()
hashes['crc32'] = lambda x: str(zlib.crc32(x))
hashes['adler32'] = lambda x: str(zlib.adler32(x))
self._hashes = hashes
self.hashes_and_checksums = self._hashes.keys()
self.supported_hashes = HASHES
def _get_hashlib_hash(self, name, string):
"""Use for hashlib hashes that don't have a shortcut"""
hasher = hashlib.new(name)
hasher.update(string)
return hasher.hexdigest()
def _get_md2_hash(self, string):
"""Compute md2 hash"""
md2 = MD2.new()
md2.update(string)
return md2.hexdigest()
def get_hash(self, hash_name, string):
"""Compute the desired hash"""
return self._hashes[hash_name](string)
class Encoder():
def __init__(self):
# Define supported encodings
encodings = dict()
encodings['base16'] = lambda x: base64.b16encode(x)
encodings['base32'] = lambda x: base64.b32encode(x)
encodings['base58'] = lambda x: base58.b58encode(x)
encodings['base64'] = lambda x: base64.b64encode(x)
encodings['urlencode'] = lambda x: urllib.quote_plus(x)
encodings['deflate'] = lambda x: self._compress_with_zlib('deflate', x)
encodings['zlib'] = lambda x: self._compress_with_zlib('zlib', x)
encodings['gzip'] = lambda x: self._compress_with_zlib('gzip', x)
encodings['json'] = lambda x: json.dumps(x)
encodings['binary'] = lambda x: x.encode('bin')
encodings['entity'] = lambda x: x.encode('entity')
encodings['rot1'] = lambda x: x.encode('rot1')
encodings['rot10'] = lambda x: x.encode('rot10')
encodings['rot11'] = lambda x: x.encode('rot11')
encodings['rot12'] = lambda x: x.encode('rot12')
encodings['rot13'] = lambda x: x.encode('rot13')
encodings['rot14'] = lambda x: x.encode('rot14')
encodings['rot15'] = lambda x: x.encode('rot15')
encodings['rot16'] = lambda x: x.encode('rot16')
encodings['rot17'] = lambda x: x.encode('rot17')
encodings['rot18'] = lambda x: x.encode('rot18')
encodings['rot19'] = lambda x: x.encode('rot19')
encodings['rot2'] = lambda x: x.encode('rot2')
encodings['rot20'] = lambda x: x.encode('rot20')
encodings['rot21'] = lambda x: x.encode('rot21')
encodings['rot22'] = lambda x: x.encode('rot22')
encodings['rot23'] = lambda x: x.encode('rot23')
encodings['rot24'] = lambda x: x.encode('rot24')
encodings['rot25'] = lambda x: x.encode('rot25')
encodings['rot3'] = lambda x: x.encode('rot3')
encodings['rot4'] = lambda x: x.encode('rot4')
encodings['rot5'] = lambda x: x.encode('rot5')
encodings['rot6'] = lambda x: x.encode('rot6')
encodings['rot7'] = lambda x: x.encode('rot7')
encodings['rot8'] = lambda x: x.encode('rot8')
encodings['rot9'] = lambda x: x.encode('rot9')
encodings['yenc'] = lambda x: x.encode('yenc')
self._encodings = encodings
self.supported_encodings = self._encodings.keys()
def _compress_with_zlib(self, compression_type, string, level=6):
"""Compress in one of the zlib supported formats: zlib, gzip, or deflate.
For a description see: http://stackoverflow.com/a/22311297/6073564
"""
if compression_type == 'deflate':
compressor = zlib.compressobj(level, zlib.DEFLATED,
-zlib.MAX_WBITS)
elif compression_type == 'zlib':
compressor = zlib.compressobj(level, zlib.DEFLATED,
zlib.MAX_WBITS)
elif compression_type == 'gzip':
compressor = zlib.compressobj(level, zlib.DEFLATED,
zlib.MAX_WBITS | 16)
else:
raise ValueError("Unsupported zlib compression format %s." %
compression_type)
return compressor.compress(string) + compressor.flush()
def encode(self, encoding, string):
"""Encode `string` in desired `encoding`"""
return self._encodings[encoding](string)
class DecodeException(Exception):
def __init__(self, message, error):
super(DecodeException, self).__init__(message)
self.error = error
class Decoder():
def __init__(self):
# Define supported encodings
decodings = dict()
decodings['base16'] = lambda x: base64.b16decode(x)
decodings['base32'] = lambda x: base64.b32decode(x)
decodings['base58'] = lambda x: base58.b58decode(x)
decodings['base64'] = lambda x: base64.b64decode(x)
decodings['urlencode'] = lambda x: urllib.unquote_plus(x)
decodings['deflate'] = lambda x: self._decompress_with_zlib('deflate',
x)
decodings['zlib'] = lambda x: self._decompress_with_zlib('zlib', x)
decodings['gzip'] = lambda x: self._decompress_with_zlib('gzip', x)
decodings['json'] = lambda x: json.loads(x)
decodings['binary'] = lambda x: x.decode('bin')
decodings['entity'] = lambda x: x.decode('entity')
decodings['rot1'] = lambda x: x.decode('rot1')
decodings['rot10'] = lambda x: x.decode('rot10')
decodings['rot11'] = lambda x: x.decode('rot11')
decodings['rot12'] = lambda x: x.decode('rot12')
decodings['rot13'] = lambda x: x.decode('rot13')
decodings['rot14'] = lambda x: x.decode('rot14')
decodings['rot15'] = lambda x: x.decode('rot15')
decodings['rot16'] = lambda x: x.decode('rot16')
decodings['rot17'] = lambda x: x.decode('rot17')
decodings['rot18'] = lambda x: x.decode('rot18')
decodings['rot19'] = lambda x: x.decode('rot19')
decodings['rot2'] = lambda x: x.decode('rot2')
decodings['rot20'] = lambda x: x.decode('rot20')
decodings['rot21'] = lambda x: x.decode('rot21')
decodings['rot22'] = lambda x: x.decode('rot22')
decodings['rot23'] = lambda x: x.decode('rot23')
decodings['rot24'] = lambda x: x.decode('rot24')
decodings['rot25'] = lambda x: x.decode('rot25')
decodings['rot3'] = lambda x: x.decode('rot3')
decodings['rot4'] = lambda x: x.decode('rot4')
decodings['rot5'] = lambda x: x.decode('rot5')
decodings['rot6'] = lambda x: x.decode('rot6')
decodings['rot7'] = lambda x: x.decode('rot7')
decodings['rot8'] = lambda x: x.decode('rot8')
decodings['rot9'] = lambda x: x.decode('rot9')
decodings['yenc'] = lambda x: x.decode('yenc')
self._decodings = decodings
self.supported_encodings = self._decodings.keys()
def _decompress_with_zlib(self, compression_type, string, level=9):
"""Compress in one of the zlib supported formats: zlib, gzip, or deflate.
For a description see: http://stackoverflow.com/a/22311297/6073564
"""
if compression_type == 'deflate':
return zlib.decompress(string, -zlib.MAX_WBITS)
elif compression_type == 'zlib':
return zlib.decompress(string, zlib.MAX_WBITS)
elif compression_type == 'gzip':
return zlib.decompress(string, zlib.MAX_WBITS | 16)
else:
raise ValueError("Unsupported zlib compression format %s." %
compression_type)
def decode_error(self):
"""Catch-all error for all supported decoders"""
def decode(self, encoding, string):
"""Decode `string` encoded by `encoding`"""
try:
return self._decodings[encoding](string)
except Exception as e:
raise DecodeException(
'Error while trying to decode %s' % encoding,
e
)
class LeakDetector():
def __init__(self, search_strings, precompute_hashes=True, hash_set=None,
hash_layers=2, precompute_encodings=True, encoding_set=None,
encoding_layers=2, debugging=False):
"""LeakDetector searches URL, POST bodies, and cookies for leaks.
The detector is constructed with a set of search strings (given by
the `search_strings` parameters. It has several methods to check for
leaks containing these strings in URLs, POST bodies, and cookie header
strings.
Parameters
==========
search_strings : list
LeakDetector will search for leaks containing any item in this list
precompute_hashes : bool
Set to `True` to include precomputed hashes in the candidate set.
hash_set : list
List of hash functions to use when building the set of candidate
strings.
hash_layers : int
The detector will find instances of `search_string` iteratively
hashed up to `hash_layers` times by any combination of supported
hashes.
precompute_encodings : bool
Set to `True` to include precomputed encodings in the candidate set
encoding_set : list
List of encodings to use when building the set of candidate
strings.
encoding_layers : int
The detector will find instances of `search_string` iteratively
encoded up to `encoding_layers` times by any combination of
supported encodings.
debugging : bool
Set to `True` to enable a verbose output.
"""
self.search_strings = search_strings
self._min_length = min([len(x) for x in search_strings])
self._hasher = Hasher()
self._hash_set = hash_set
self._hash_layers = hash_layers
self._encoder = Encoder()
self._encoding_set = encoding_set
self._encoding_layers = encoding_layers
self._decoder = Decoder()
self._precompute_pool = dict()
# If hash/encoding sets aren't specified, use all available.
if self._hash_set is None:
self._hash_set = self._hasher.supported_hashes
if self._encoding_set is None:
self._encoding_set = self._encoder.supported_encodings
self._build_precompute_pool(precompute_hashes, precompute_encodings)
self._debugging = debugging
def _compute_hashes(self, string, layers, prev_hashes=tuple()):
"""Returns all iterative hashes of `string` up to the
specified number of `layers`"""
for h in self._hasher.supported_hashes:
hashed_string = self._hasher.get_hash(h, string)
if hashed_string == string: # skip no-ops
continue
hash_stack = (h,) + prev_hashes
self._precompute_pool[hashed_string] = hash_stack
if layers > 1:
self._compute_hashes(hashed_string, layers-1, hash_stack)
def _compute_encodings(self, string, layers, prev_encodings=tuple()):
"""Returns all iterative encodings of `string` up to the
specified number of `layers`"""
for enc in self._encoding_set:
encoded_string = str(self._encoder.encode(enc, string))
if encoded_string == string: # skip no-ops
continue
encoding_stack = (enc,) + prev_encodings
self._precompute_pool[encoded_string] = encoding_stack
if layers > 1:
self._compute_encodings(encoded_string, layers-1,
encoding_stack)
def _build_precompute_pool(self, precompute_hashes, precompute_encodings):
"""Build a pool of hashes for the given search string"""
seed_strings = list()
for string in self.search_strings:
seed_strings.append(string)
if string.startswith('http'):
continue
all_lower = string.lower()
if all_lower != string:
seed_strings.append(string.lower())
all_upper = string.upper()
if all_upper != string:
seed_strings.append(string.upper())
strings = list()
for string in seed_strings:
strings.append(string)
# If the search string appears to be an email address, we also want
# to include just the username portion of the URL, and the address
# and username with any '.'s removed from the username (since these
# are optional in Gmail).
if '@' in string:
parts = string.rsplit('@')
if len(parts) == 2:
uname, domain = parts
strings.append(uname)
strings.append(re.sub('\.', '', uname))
strings.append(re.sub('\.', '', uname) + '@' + domain)
# Domain searches have too many false positives
# strings.append(parts[1])
# strings.append(parts[1].rsplit('.', 1)[0])
# The URL tokenizer strips file extensions. So if our search string
# has a file extension we should also search for a stripped version
if re.match(EXTENSION_RE, string):
strings.append(re.sub(EXTENSION_RE, '', string))
for string in strings:
self._precompute_pool[string] = (string,)
self._min_length = min([len(x) for x in self._precompute_pool.keys()])
initial_items = self._precompute_pool.items()
if precompute_hashes:
for string, name in initial_items:
self._compute_hashes(string, self._hash_layers, name)
if precompute_encodings:
for string, name in initial_items:
self._compute_encodings(string, self._encoding_layers, name)
def _split_on_delims(self, string, rv_parts, rv_named):
"""Splits a string on several delimiters"""
if string == '':
return
parts = set(re.split(DELIMITERS, string))
if '' in parts:
parts.remove('')
for part in parts:
if part == '':
continue
count = part.count('=')
if count != 1:
rv_parts.add(part)
if count == 0:
continue
n, k = part.split('=', 1)
if len(n) > 0 and len(k) > 0:
rv_named.add((n, k))
else:
rv_parts.add(part)
def check_if_in_precompute_pool(self, string):
"""Returns a tuple that lists the (possibly layered) hashes or
encodings that result in input string
"""
try:
return self._precompute_pool[str(string)]
except KeyError:
return
except (UnicodeDecodeError, UnicodeEncodeError):
return
def check_for_leak(self, string, layers=1, prev_encodings=tuple(),
prev=''):
"""Check if given string contains a leak"""
# Short tokens won't contain email address
if len(string) < self._min_length:
return
# Check if direct hash or plaintext
rv = self.check_if_in_precompute_pool(string)
if rv is not None:
return prev_encodings + rv
# Try encodings
for encoding in self._encoding_set:
# multiple rots are unnecessary
if encoding.startswith('rot') and prev.startswith('rot'):
continue
try:
decoded = self._decoder.decode(encoding, string)
if type(decoded) == int or type(decoded) == long:
decoded = str(decoded)
except DecodeException: # means this isn't the correct decoding
continue
if decoded == string: # don't add no-ops
continue
if decoded is None: # Empty decodings aren't useful
continue
encoding_stack = prev_encodings + (encoding,)
if layers > 1:
rv = self.check_for_leak(decoded, layers-1,
encoding_stack, encoding)
if rv is not None:
return rv
else:
rv = self.check_if_in_precompute_pool(decoded)
if rv is not None:
return encoding_stack + rv
return
def _check_parts_for_leaks(self, tokens, parameters, nlayers):
"""Check token and parameter string parts for leaks"""
leaks = list()
for token in tokens:
leak = self.check_for_leak(token, layers=nlayers)
if leak is not None:
leaks.append(leak)
for name, value in parameters:
leak = self.check_for_leak(value, layers=nlayers)
if leak is not None:
leaks.append(leak)
leak = self.check_for_leak(name, layers=nlayers)
if leak is not None:
leaks.append(leak)
return leaks
def _split_url(self, url):
"""Split url path and query string on delimiters"""
tokens = set()
parameters = set()
try:
purl = urlparse(url)
except ValueError:
print "Can't parse url:", url
return [], []
path_parts = purl.path.split('/')
for part in path_parts:
if not part.endswith('.com'):
part = re.sub(EXTENSION_RE, '', part)
self._split_on_delims(part, tokens, parameters)
self._split_on_delims(purl.query, tokens, parameters)
self._split_on_delims(purl.fragment, tokens, parameters)
return tokens, parameters
def check_url(self, url, encoding_layers=3, substring_search=True):
"""Check if a given url contains a leak"""
tokens, parameters = self._split_url(url)
if self._debugging:
print "URL tokens:"
for token in tokens:
print token
print "\nURL parameters:"
for key, value in parameters:
print "Key: %s | Value: %s" % (key, value)
return self._check_whole_and_parts_for_leaks(
url, tokens, parameters, encoding_layers, substring_search)
def _get_header_str(self, header_str, header_name):
"""Returns the header string parsed from `header_str`"""
for item in json.loads(header_str):
if item[0] == header_name:
return item[1]
return ""
def _split_cookie(self, cookie_str, from_request=True):
"""Returns all parsed parts of the cookie names and values"""
tokens = set()
parameters = set()
try:
if from_request:
cookies = ck.Cookies.from_request(cookie_str)
else:
cookies = ck.Cookies.from_response(cookie_str,
ignore_bad_cookies=True)
except (ck.InvalidCookieError, UnicodeDecodeError, KeyError):
return tokens, parameters # return empty sets
for cookie in cookies.values():
self._split_on_delims(cookie.name, tokens, parameters)
self._split_on_delims(cookie.value, tokens, parameters)
return tokens, parameters
def get_location_str(self, header_str):
return self._get_header_str(header_str, "Location")
def get_referrer_str(self, header_str):
return self._get_header_str(header_str, "Referer")
def get_cookie_str(self, header_str, from_request=True):
if not header_str:
return ""
if from_request:
header_name = 'Cookie'
else:
header_name = 'Set-Cookie'
return self._get_header_str(header_str, header_name)
def check_cookies(self, header_str, encoding_layers=3,
from_request=True, substring_search=True):
"""Check the cookies portion of the header string for leaks"""
cookie_str = self.get_cookie_str(header_str, from_request)
if not cookie_str:
return list()
tokens, parameters = self._split_cookie(header_str, from_request=from_request)
return self._check_whole_and_parts_for_leaks(
cookie_str, tokens, parameters, encoding_layers, substring_search)
def check_location_header(self, location_str, encoding_layers=3,
substring_search=True):
"""Check the Location HTTP response header for leaks."""
if location_str == '':
return list()
tokens, parameters = self._split_url(location_str)
return self._check_whole_and_parts_for_leaks(
location_str, tokens, parameters, encoding_layers,
substring_search)
def check_referrer_header(self, header_str, encoding_layers=3,
substring_search=True):
"""Check the Referer HTTP request header for leaks."""
if header_str == '':
return list()
referrer_str = self.get_referrer_str(header_str)
# We use this check instead of ==''
# since _get_header_str may return None
if not referrer_str:
return list()
# print "referrer_str", referrer_str
tokens, parameters = self._split_url(referrer_str)
return self._check_whole_and_parts_for_leaks(
referrer_str, tokens, parameters, encoding_layers,
substring_search)
def _check_whole_and_parts_for_leaks(self, input_string, tokens,
parameters, encoding_layers,
substring_search):
"""Search an input string and its parts for leaks."""
results = self._check_parts_for_leaks(tokens, parameters,
encoding_layers)
if substring_search:
substr_results = self.substring_search(input_string, max_layers=2)
# filter repeating results
return list(set(results + substr_results))
else:
return results
def substring_search(self, input_string, max_layers=None):
"""Do a substring search for all precomputed hashes/encodings
`max_layers` limits the number of encoding/hashing layers used in the
substring search (to limit time). The default is no limit (`None`).
"""
if input_string is None or input_string == '':
return list()
try:
input_string = input_string.encode('utf8')
except (UnicodeDecodeError, UnicodeEncodeError):
print "ERROR encoding %s" % input_string
return list()
leaks = list()
for string, transform_stack in self._precompute_pool.items():
if max_layers and len(transform_stack) > (max_layers + 1):
continue
if string in input_string:
leaks.append(transform_stack)
return leaks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment