Skip to content

Instantly share code, notes, and snippets.

@aayla-secura
Created January 19, 2021 00:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aayla-secura/45ef6caeb7fe43056a8e4c39236c56e4 to your computer and use it in GitHub Desktop.
Save aayla-secura/45ef6caeb7fe43056a8e4c39236c56e4 to your computer and use it in GitHub Desktop.
Brute-force a JWT signed with a shared key
#!/usr/bin/env python3
#############################################################
# @AaylaSecura1138, github.com/aayla-secura
# Modify and distribute as you wish
#############################################################
import logging
import jwt
import sys
import argparse
from concurrent.futures import ThreadPoolExecutor, as_completed
def split_list(full_list, size):
i = 0
while True:
yield full_list[i * size:(i + 1) * size]
i += 1
if i * size >= len(full_list):
return
def try_pwd(token, pwd, cracked):
if token in tokens_cracked:
return pwd == cracked[token]
try:
jwt.decode(token, pwd)
except jwt.exceptions.InvalidSignatureError:
return False
cracked[token] = pwd
return True
def read_file(fname):
with open(fname, 'r') as f:
try:
return f.read().strip('\n')
except IOError as e:
logger.error('Cannot read {}: {}'.format(fname, e))
return None
class LogHandler(object):
def emit(self, record):
if record.levelno < self.__class__._min_level \
or record.levelno > self.__class__._max_level:
return
super().emit(record)
class StreamHandler(LogHandler, logging.StreamHandler):
pass
class ErrorStreamHandler(StreamHandler):
_min_level = logging.WARNING
_max_level = logging.ERROR
class InfoStreamHandler(StreamHandler):
_min_level = logging.INFO
_max_level = logging.INFO
class DebugStreamHandler(StreamHandler):
_min_level = logging.DEBUG
_max_level = logging.DEBUG
########## Setup
logger = logging.getLogger('SharpHound parser')
log_levels = {
ErrorStreamHandler: sys.stderr,
InfoStreamHandler: sys.stdout,
DebugStreamHandler: sys.stderr,
}
for cls, out in log_levels.items():
handler = cls(out)
handler.setLevel(1)
logger.setLevel(1)
logger.addHandler(handler)
########## Cmdline
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description=(
'Brute-force a JWT signed with a shared key.'))
parser.add_argument(
'-j', '--jwt-file', metavar='FILE', required=True,
help='''File containing the JWTs, one per line.''')
parser.add_argument(
'-p', '--pass-file', metavar='FILE', required=True,
help='''File containing a list of passwords to try, one per
line.''')
parser.add_argument(
'-t', '--threads', type=int, metavar='INTEGER', default=5,
help='''Number of threads to use.''')
parser.add_argument(
'-s', '--chunk-size', type=int, metavar='INTEGER', default=100,
help='''Try <chunk_size> passwords against each token at a time.
Larger values incur the overhead of creating unnecessary threads.''')
args = parser.parse_args()
########## Read files
passwords = read_file(args.pass_file)
if not passwords:
sys.exit(2)
passwords = passwords.split('\n')
tokens = read_file(args.jwt_file)
if not tokens:
sys.exit(2)
tokens = tokens.split('\n')
########## Brute force
# all tokens in parallel and split passwords in chunks
with ThreadPoolExecutor(max_workers=args.threads) as executor:
tokens_cracked = {}
for pwd_chunks in split_list(passwords, args.chunk_size):
futures = {executor.submit(try_pwd, token, pwd, tokens_cracked):
(token, pwd)
for token in tokens if token not in tokens_cracked
for pwd in pwd_chunks}
for future in as_completed(futures):
if future.result():
token, pwd = futures[future]
logger.info(
'Found passphrase for token {}: {}'.format(token, pwd))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment