Skip to content

Instantly share code, notes, and snippets.

@qrkourier
Last active May 21, 2024 15:10
Show Gist options
  • Save qrkourier/b9cacf765b2d62817672bc7e6be6bdc3 to your computer and use it in GitHub Desktop.
Save qrkourier/b9cacf765b2d62817672bc7e6be6bdc3 to your computer and use it in GitHub Desktop.
parse or verify a Ziti enrollment token as JWT
#!/usr/bin/env python
# This script parses and attempts to verify the signature of a Ziti JWT token and prints the header, payload, and
# analysis of the token.
import argparse
import json
import logging
import os
import ssl
from datetime import datetime
from urllib.parse import urlparse
import chardet
import jwt
from OpenSSL import crypto
def detect_encoding(file_path):
with open(file_path, 'rb') as file:
data = file.read()
return chardet.detect(data)['encoding']
def get_file_content_or_string(s):
if os.path.isfile(s):
with open(s, 'rb') as file:
return file.read().decode(detect_encoding(s), 'strict').strip()
else:
return s
def parse_verify_jwt(token, key: str = None):
claimset = jwt.decode(
jwt=token,
key=key,
algorithms=["ES256", "RS256"],
options={
"verify_signature": True if key else False,
"verify_aud": False,
}
)
return claimset
def fetch_issuer_pubkey(iss: str):
url = urlparse(iss)
# Get the server's certificate
cert = ssl.get_server_certificate((url.hostname, url.port or 443), timeout=3)
# Get the public key from the certificate
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
pubkey = crypto.dump_publickey(crypto.FILETYPE_PEM, x509.get_pubkey())
return pubkey
# Create the parser
parser = argparse.ArgumentParser(description="Process some integers.")
# Add the positional arguments
parser.add_argument('token', type=str, help='The token to verify (required) as a string or file path')
parser.add_argument('pubkey', type=str, nargs='?', default=None, help='The client API\'s pubkey (optional) as a string or file path')
# Add the --verbose option
parser.add_argument('-v', '--verbose', action='store_true', help='Set log level to DEBUG')
# Parse the arguments
args = parser.parse_args()
# Create a custom logger
logger = logging.getLogger(__name__)
# Create handlers
c_handler = logging.StreamHandler()
c_handler.setLevel(logging.INFO)
# Create formatters and add it to handlers
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
c_handler.setFormatter(c_format)
# Add handlers to the logger
logger.addHandler(c_handler)
# Set up logging
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
token = get_file_content_or_string(args.token)
if args.pubkey:
pubkey = get_file_content_or_string(args.pubkey)
else:
claimset = parse_verify_jwt(token)
try:
pubkey = fetch_issuer_pubkey(claimset["iss"])
except TimeoutError as e:
logger.debug(f"failed to fetch issuer pubkey: {e}")
pubkey = None
if pubkey:
logger.debug(f"verifying '{token}' with pubkey '{pubkey}'")
else:
logger.debug(f"parsing '{token}' without a pubkey for signature verification")
header = jwt.get_unverified_header(token)
try:
claimset = parse_verify_jwt(token, pubkey)
if pubkey:
signature_valid = True
else:
signature_valid = False
except jwt.exceptions.InvalidSignatureError:
signature_valid = False
if 'em' in claimset:
if claimset['em'] == 'ott':
enrollment_method = 'one-time token for an identity from the built-in edge signer CA'
elif claimset['em'] == 'ottca':
enrollment_method = 'one-time token for a an identity from a trusted external CA'
elif claimset['em'] == 'ca':
enrollment_method = 'reusable token for auto-creating an identity from a trusted external CA'
elif claimset['em'] == 'erott':
enrollment_method = 'one-time token for a router'
else:
enrollment_method = 'unknown'
analysis = {
"signature_valid": signature_valid,
"enrollment_method": enrollment_method
}
if 'exp' in claimset:
analysis['expiration'] = datetime.fromtimestamp(claimset['exp']).isoformat()
print(
json.dumps({
"header": header,
"payload": claimset,
"analysis": analysis
}, indent=4)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment