Skip to content

Instantly share code, notes, and snippets.

@tothi
Created August 28, 2022 20:52
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tothi/ad2575c29aa8880f8fb8ff48495e1afc to your computer and use it in GitHub Desktop.
Save tothi/ad2575c29aa8880f8fb8ff48495e1afc to your computer and use it in GitHub Desktop.
mitmproxy addon for handling oauth access and refresh tokens automatically
# run: mitmproxy -k -p 8090 -s mitmproxy-jwt-refresh-addon.py
# set burp upstream proxy to localhost:8090
#
# use case:
# - application authorization is implemented by OAuth 2.0
# - testing is performed using Burp as primary and mitmproxy as upstream proxy
# - mitmproxy takes care of the Authorization tokens using this addon
# - user gets an access_token and a refresh_token during the 1st login (e.g. password login)
# - mitmproxy addon caches access_token and refresh_token
# - mitmproxy addon adds Authorization: Bearer [access_token from cache] header for every request
# - mitmproxy addon gets a new access_token before expiration using the OAuth refresh_token grant_type
#
# without this upstream mitmproxy addon, Burp active scans (and other testing) would fail after access_token expires
# (and browser does not interact with the application)
from mitmproxy import ctx
import json
import requests
import threading
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TOKEN_URL = "https://example.domain.com/openid-connect/token"
CLIENT_ID = "example-client"
class JwtRefresh:
def __init__(self):
self._refresh_token = None
self._access_token = None
self._interval = None
self._timer = None
# extrat tokens from the appropriate response
def _update_tokens(self, resp):
data = json.loads(resp)
assert data["token_type"] == "Bearer"
self._interval = data["expires_in"] / 2 # make sure it is refreshed before expiration
self._access_token = data["access_token"]
self._refresh_token = data["refresh_token"]
ctx.log("[+] JwtRefresh: Updated in-cache access_token and refresh_token.")
# get a new access (and refresh) token using our stored refresh_token and start a background refresh task
def _refresh_request(self):
ctx.log("[*] JwtRefresh: Calling TOKEN_URL for refresh_token...")
r = requests.post(TOKEN_URL, data = {"grant_type": "refresh_token", "refresh_token": self._refresh_token, "client_id": CLIENT_ID}, verify=False)
self._update_tokens(r.content)
self._add_refresh_timer()
# init refresh loop (make sure there is only one instance)
def _add_refresh_timer(self):
if self._timer is not None:
self._timer.cancel()
self._timer = threading.Timer(self._interval, self._refresh_request)
self._timer.start()
ctx.log("[*] JwtRefresh: Added %ds timer for refresh_token request." % self._interval)
# get tokens from TOKEN_URL response and start a background refresh task
def response(self, flow):
if flow.request.pretty_url == TOKEN_URL:
ctx.log("[*] JwtRefresh: Triggered TOKEN_URL. Extracting tokens from response...")
self._update_tokens(flow.response.get_text())
self._add_refresh_timer()
# replace Auth header is we have the access_token
def request(self, flow):
if self._access_token is not None:
flow.request.headers["Authorization"] = "Bearer %s" % self._access_token
ctx.log("[*] JwtRefresh: Updated HTTP request, access_token from cache injected in Authorization header")
addons = [JwtRefresh()]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment