Skip to content

Instantly share code, notes, and snippets.

@philpennock
Created March 24, 2021 16:10
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save philpennock/09d49524ad98043ff11d8a40c2bb0d5a to your computer and use it in GitHub Desktop.
Save philpennock/09d49524ad98043ff11d8a40c2bb0d5a to your computer and use it in GitHub Desktop.
NATS CVE-2021-3127 account scanning tool to find exploit attempts
#!/usr/bin/env python3
"""jwt-audit: audit a packfile of JWTs for security issues.
See CVE-2021-3127 for more details.
"""
import argparse
import base64
import json
import os
import pathlib
import sys
from dataclasses import dataclass
import requests
SCRIPT_DIR = pathlib.Path(__file__).absolute().parent
class Error(Exception):
"""Base class for exceptions from audit."""
pass
class Exit(Exception):
"""Base class for exceptions which exit without a stack trace."""
pass
@dataclass
class ReadAccount:
lineno1: int
account_id: str
b64_jwt: str
class AuditAccount:
def __init__(self, read_account):
self.read_account = read_account
self.acct = self.read_account.account_id
self.problem_count = 0
def report(self, message):
print('{}: {}'.format(self.acct, message), file=sys.stderr)
self.problem_count += 1
def b64_json_decode(self, raw):
decode = raw + '=' * (4 - len(raw) % 4)
return json.loads(base64.urlsafe_b64decode(decode))
def payload_from_jwt(self, raw):
return self.b64_json_decode(raw.split('.', 2)[1])
def populate(self):
self.jwt = self.payload_from_jwt(self.read_account.b64_jwt)
def audit(self):
self.populate()
if self.jwt["sub"] != self.acct:
self.report(f'account does not match JWT subject {self.jwt["sub"]}')
if 'nats' not in self.jwt:
self.report(f'JWT missing "nats" top-level key')
return
if 'imports' not in self.jwt['nats']:
return
for impIndex0, Import in enumerate(self.jwt['nats']['imports']):
if 'token' not in Import:
continue
index = impIndex0 + 1
token = self.payload_from_jwt(Import['token'])
if token["type"] != 'activation':
self.report(f'import {index}: non-activation token {token["type"]!r}')
continue
if token["nats"]["type"] == 'stream':
# we don't audit these
continue
if token["nats"]["type"] != 'service':
self.report(f'import {index}: import not of type service but is {token["nats"]["type"]!r}')
if token["iss"] != Import["account"]:
self.report(f'import {index}: issuer {token["iss"]!r} != import-from account {Import["account"]!r}')
if token["nats"]["subject"] != Import["to"]:
self.report(f'import {index}: ABUSE: token grants {token["nats"]["subject"]!r} but used to access {Import["to"]!r}')
class AccountCollection:
def __init__(self, *, options):
self.options = options
if options.file:
self.data_reader = self.from_file(options.file)
elif options.url:
self.data_reader = self.from_url(options.url)
else:
raise Error('code bug: unhandled origin for AccountCollection')
def from_file(self, filename):
yield from self.from_opener(repr(filename), open(filename))
def from_url(self, url):
response = requests.get(url, stream=True)
iterator = [line.decode(response.encoding) for line in response.iter_lines()]
yield from self.from_opener('<'+url+'>', iterator)
def from_opener(self, label, opened):
for lineno0, line in enumerate(opened):
try:
line = line.strip()
if not line:
continue
acct, jwt = line.split('|', 1)
yield ReadAccount(lineno0+1, acct, jwt)
except ValueError as e:
l = lineno0 + 1
print(f'warning: {label} line {l}: {e}', file=sys.stderr)
def audit(self):
for index, ra in enumerate(self.data_reader):
AuditAccount(ra).audit()
def _main(args, argv0):
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
origin = parser.add_mutually_exclusive_group(required=True)
origin.add_argument('-f', '--file',
help='Audit a pack-file of JWTs')
origin.add_argument('--url',
help='Pull packfile of accounts from given URL')
parser.add_argument('-v', '--verbose',
action='count', default=0,
help='Be more verbose')
options = parser.parse_args(args=args)
accounts = AccountCollection(options=options)
accounts.audit()
if __name__ == '__main__':
argv0 = pathlib.Path(sys.argv[0]).name
if argv0.endswith('.py'):
argv0 = argv0[:-3]
try:
rv = _main(sys.argv[1:], argv0=argv0)
except Exit as e:
for arg in e.args:
print('{}: {}'.format(argv0, arg), file=sys.stderr)
sys.exit(1)
sys.exit(rv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment