Skip to content

Instantly share code, notes, and snippets.

@ktaka-ccmp
Last active July 7, 2025 09:41
Show Gist options
  • Select an option

  • Save ktaka-ccmp/915a3680f04f1704742f13e898ca668d to your computer and use it in GitHub Desktop.

Select an option

Save ktaka-ccmp/915a3680f04f1704742f13e898ca668d to your computer and use it in GitHub Desktop.
#!/bin/bash
# Check if input file is provided
if [ $# -ne 1 ]; then
echo "Usage: $0 results_file.json"
exit 1
fi
input_file=$1
output_file="${input_file%.*}_summary.json"
jq '
def mask_tokens:
with_entries(
if .key == "access_token" then
.value = "access_token_exists"
elif .key == "code" then
.value = "code_exists"
elif .key == "id_token" then
.value = "id_token_exists"
elif .key == "refresh_token" then
.value = "refresh_token_exists"
else
.
end
);
def mask_fragment_params:
with_entries(
if .key == "access_token" then
.value = "access_token_exists"
elif .key == "code" then
.value = "code_exists"
elif .key == "id_token" then
.value = "id_token_exists"
else
.
end
);
def mask_raw_fragment:
if . != null then
if contains("access_token=") then
sub("access_token=[^&]*"; "access_token=access_token_exists")
else . end |
if contains("code=") then
sub("code=[^&]*"; "code=code_exists")
else . end |
if contains("id_token=") then
sub("id_token=[^&]*"; "id_token=id_token_exists")
else . end
else . end;
.[]| {
response_mode,
response_type,
scope,
response: {
query_params: (.response.query_params // {} | mask_tokens),
form_params: (.response.form_params // {} | mask_tokens),
fragment_params: (.response.fragment_params // {} | mask_fragment_params),
raw_fragment: (.response.raw_fragment | mask_raw_fragment)
},
token_exchange: (.token_exchange // null | if . != null then mask_tokens else . end)
}' "$input_file" > "$output_file"
echo "Created summary file: $output_file"
#!/usr/bin/env python3
import http.server
import socketserver
import urllib.parse
import json
import webbrowser
import threading
import time
from datetime import datetime
import argparse
from typing import Dict, List
import requests
def load_env(env_path='.env'):
config = {}
try:
with open(env_path) as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
except FileNotFoundError:
print(f"Warning: {env_path} file not found")
return config
class Config:
# Load environment variables
env = load_env()
# OAuth2 Configuration
CLIENT_ID = env.get('OAUTH_CLIENT_ID')
CLIENT_SECRET = env.get('OAUTH_CLIENT_SECRET')
AUTH_ENDPOINT = env.get('OAUTH_AUTH_ENDPOINT', 'https://accounts.google.com/o/oauth2/v2/auth')
TOKEN_ENDPOINT = env.get('OAUTH_TOKEN_ENDPOINT', 'https://oauth2.googleapis.com/token')
PROXY_URL = env.get('OAUTH_PROXY_URL')
LOCAL_PORT = int(env.get('OAUTH_LOCAL_PORT', '3000'))
# Test combinations
RESPONSE_TYPES = [
"code",
"token",
"id_token",
"code token",
"code id_token",
"token id_token",
"code token id_token"
]
RESPONSE_MODES = [
"query",
"fragment",
"form_post"
]
SCOPES = [
"profile email",
"openid profile email"
]
@classmethod
def get_redirect_uri(cls) -> str:
return f"{cls.PROXY_URL}/authorized"
class OAuth2Handler(http.server.SimpleHTTPRequestHandler):
received_callback = threading.Event()
current_response = None
def log_message(self, format, *args):
pass
def do_GET(self):
self._handle_callback('query')
def do_POST(self):
if self.path == '/fragment':
self._handle_fragment()
else:
self._handle_form_post()
def _handle_form_post(self):
try:
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
form_params = urllib.parse.parse_qs(post_data)
form_params = {k: v[0] for k, v in form_params.items()}
response_data = self._create_response_data('POST', form_params=form_params)
OAuth2Handler.current_response = response_data
OAuth2Handler.received_callback.set()
self._send_html_response("Form Post", response_data)
except Exception as e:
print(f"Error handling form post: {e}")
self._send_error_response()
def _handle_fragment(self):
try:
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
fragment_data = json.loads(post_data)
if OAuth2Handler.current_response and fragment_data.get('fragment'):
OAuth2Handler.current_response['raw_fragment'] = fragment_data['fragment']
fragment = fragment_data['fragment'].lstrip('#')
decoded_fragment = urllib.parse.unquote(fragment)
pairs = [pair.split('=', 1) for pair in decoded_fragment.split('&')]
fragment_params = {k: v for k, v in pairs}
OAuth2Handler.current_response['fragment_params'] = fragment_params
self.send_response(200)
self.end_headers()
except Exception as e:
print(f"Error parsing fragment: {e}")
if OAuth2Handler.current_response:
OAuth2Handler.current_response['fragment_params'] = {}
OAuth2Handler.current_response['raw_fragment'] = None
self.send_response(500)
self.end_headers()
def _handle_callback(self, source_type):
try:
parsed_path = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed_path.query)
query_params = {k: v[0] for k, v in params.items()}
response_data = self._create_response_data('GET', query_params=query_params)
OAuth2Handler.current_response = response_data
OAuth2Handler.received_callback.set()
self._send_html_response("Callback", response_data)
except Exception as e:
print(f"Error handling callback: {e}")
self._send_error_response()
def _create_response_data(self, method, query_params=None, form_params=None):
return {
'timestamp': datetime.now().isoformat(),
'path': self.path,
'method': method,
'query_params': query_params or {},
'form_params': form_params or {},
'fragment_params': {},
'raw_fragment': None,
'headers': dict(self.headers),
'proxy_info': {
'forwarded_proto': self.headers.get('X-Forwarded-Proto', ''),
'forwarded_for': self.headers.get('X-Forwarded-For', ''),
'real_ip': self.headers.get('X-Real-IP', '')
}
}
def _send_html_response(self, title, response_data):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
response_html = f"""
<html>
<head>
<title>OAuth2 {title} Received</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 2em; }}
pre {{ background: #f5f5f5; padding: 1em; border-radius: 4px; }}
</style>
</head>
<body>
<h1>OAuth2 {title} Received</h1>
<pre>{json.dumps(response_data, indent=2)}</pre>
<script>
if(window.location.hash) {{
const hashData = {{
timestamp: '{response_data["timestamp"]}',
fragment: window.location.hash
}};
fetch('/fragment', {{
method: 'POST',
headers: {{'Content-Type': 'application/json'}},
body: JSON.stringify(hashData)
}})
.then(response => response.ok ? console.log('Fragment sent') : console.error('Fragment send failed'))
.catch(error => console.error('Failed to send fragment:', error));
}}
setTimeout(() => window.close(), 3000);
</script>
</body>
</html>
"""
self.wfile.write(response_html.encode())
def _send_error_response(self):
self.send_response(500)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'Internal Server Error')
class OAuth2TestServer:
def __init__(self, port: int):
self.port = port
self.server = None
self.server_thread = None
def start(self):
try:
self.server = socketserver.TCPServer(("", self.port), OAuth2Handler)
self.server_thread = threading.Thread(target=self.server.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
print(f"Local server started at http://localhost:{self.port}")
print(f"Accessible via reverse proxy at {Config.PROXY_URL}")
except Exception as e:
print(f"Error starting server: {e}")
raise
def stop(self):
if self.server:
try:
self.server.shutdown()
self.server.server_close()
except Exception as e:
print(f"Error stopping server: {e}")
def generate_auth_url(response_type: str, response_mode: str, scope: str) -> str:
params = {
'client_id': Config.CLIENT_ID,
'response_type': response_type,
'response_mode': response_mode,
'scope': scope,
'redirect_uri': Config.get_redirect_uri(),
'state': 'xyz123',
'prompt': 'none'
}
if 'id_token' in response_type:
params['nonce'] = 'test_nonce'
query_string = urllib.parse.urlencode(params)
return f"{Config.AUTH_ENDPOINT}?{query_string}"
def exchange_code_for_tokens(code: str) -> Dict:
try:
token_data = {
'client_id': Config.CLIENT_ID,
'client_secret': Config.CLIENT_SECRET,
'code': code,
'grant_type': 'authorization_code',
'redirect_uri': Config.get_redirect_uri()
}
response = requests.post(Config.TOKEN_ENDPOINT, data=token_data)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"Error exchanging code for tokens: {e}")
return {'error': str(e)}
def test_flow(response_type: str, response_mode: str, scope: str) -> Dict:
print("\nTesting combination:")
print(f"Response Type: {response_type}")
print(f"Response Mode: {response_mode}")
print(f"Scope: {scope}")
OAuth2Handler.received_callback.clear()
OAuth2Handler.current_response = None
auth_url = generate_auth_url(response_type, response_mode, scope)
print(f"\nOpening URL:\n{auth_url}\n")
webbrowser.open(auth_url)
callback_received = OAuth2Handler.received_callback.wait(timeout=60)
if callback_received:
# Wait for fragment processing
time.sleep(1)
response_data = OAuth2Handler.current_response
print("\nResponse data received:")
print(f"Query params: {response_data.get('query_params', {})}")
print(f"Form params: {response_data.get('form_params', {})}")
print(f"Fragment params: {response_data.get('fragment_params', {})}")
# Look for code in all possible locations
code = None
if response_data.get('query_params', {}).get('code'):
code = response_data['query_params']['code']
print("Found code in query params")
elif response_data.get('form_params', {}).get('code'):
code = response_data['form_params']['code']
print("Found code in form params")
elif response_data.get('fragment_params', {}).get('code'):
code = response_data['fragment_params']['code']
print("Found code in fragment params")
token_response = None
if code:
print(f"\nExchanging code for tokens")
print(f"Code location: {'fragment_params' if response_data.get('fragment_params', {}).get('code') else 'query/form'}")
print(f"Code: {code}")
token_response = exchange_code_for_tokens(code)
else:
print("\nNo code found to exchange")
return {
'response_type': response_type,
'response_mode': response_mode,
'scope': scope,
'auth_url': auth_url,
'response': response_data,
'token_exchange': token_response
}
else:
print("\nTimeout waiting for callback")
return {
'response_type': response_type,
'response_mode': response_mode,
'scope': scope,
'auth_url': auth_url,
'error': 'Timeout waiting for callback'
}
def main():
parser = argparse.ArgumentParser(description='OAuth2 Flow Tester')
parser.add_argument('--port', type=int, default=Config.LOCAL_PORT,
help='Local port to listen on (should match reverse proxy configuration)')
args = parser.parse_args()
timestamp = datetime.now().strftime('%Y%m%d%H%M')
results_file = f'results_{timestamp}.json'
server = OAuth2TestServer(args.port)
try:
server.start()
results = []
for response_mode in Config.RESPONSE_MODES:
for response_type in Config.RESPONSE_TYPES:
for scope in Config.SCOPES:
result = test_flow(response_type, response_mode, scope)
results.append(result)
with open(results_file, 'w') as f:
json.dump(results, f, indent=2)
time.sleep(3)
print(f"\nTesting completed. Results saved to {results_file}")
except KeyboardInterrupt:
print("\nShutting down...")
finally:
server.stop()
if __name__ == "__main__":
main()
@ktaka-ccmp
Copy link
Copy Markdown
Author

ktaka-ccmp commented Jul 7, 2025

OAuth2 Flow Testing & Analysis

A systematic analysis of OAuth2 authentication flows using Google's OAuth2 implementation, testing various combinations of response types and response modes.

Testing Methodology

Environment Setup

# OAuth2 Configuration (.env)
OAUTH_CLIENT_ID=your-client-id.apps.googleusercontent.com
OAUTH_CLIENT_SECRET=your-client-secret

OAUTH_AUTH_ENDPOINT=https://accounts.google.com/o/oauth2/v2/auth
OAUTH_TOKEN_ENDPOINT=https://oauth2.googleapis.com/token

OAUTH_PROXY_URL=https://7a6d-217-178-16-167.ngrok-free.app
OAUTH_LOCAL_PORT=3000

Test Parameters

RESPONSE_TYPES = ['code', 'token', 'id_token', 'code token', 'code id_token', 'token id_token', 'code token id_token']
RESPONSE_MODES = ['query', 'fragment', 'form_post']
SCOPES = ['profile email', 'openid profile email']

Testing Script Overview

The Python script automatically:

  1. Generates authorization URLs for each combination
  2. Opens browser windows for user authentication
  3. Captures responses across query parameters, form data, and URL fragments
  4. Performs token exchange when authorization codes are present
  5. Logs comprehensive results for analysis

Test Results Summary

Query Response Mode Results

Response Type Scope Response Location Returned Tokens Token Exchange
code profile email query params code access_token, id_token
code openid profile email query params code access_token, id_token
token both scopes fragment access_token none
id_token both scopes fragment id_token none
code token both scopes fragment code, access_token access_token, id_token
code id_token both scopes fragment code, id_token access_token, id_token
token id_token both scopes fragment access_token, id_token none
code token id_token both scopes fragment code, access_token, id_token access_token, id_token

Fragment Response Mode Results

Response Type Scope Response Location Returned Tokens Token Exchange
code both scopes query params code access_token, id_token
token both scopes fragment access_token none
id_token both scopes fragment id_token none
code token both scopes fragment code, access_token access_token, id_token
code id_token both scopes fragment code, id_token access_token, id_token
token id_token both scopes fragment access_token, id_token none
code token id_token both scopes fragment code, access_token, id_token access_token, id_token

Form Post Response Mode Results

Response Type Scope Response Location Returned Tokens Token Exchange
code both scopes form params code access_token, id_token
token both scopes form params access_token none
id_token both scopes form params id_token none
code token both scopes form params code, access_token access_token, id_token
code id_token both scopes form params code, id_token access_token, id_token
token id_token both scopes form params access_token, id_token none
code token id_token both scopes form params code, access_token, id_token access_token, id_token

Key Findings

1. Response Location Patterns

  • Query/Fragment modes:
    • Single 'code' responses always come in query params
    • Token-containing responses always come in fragments
    • Form post mode consistently delivers all responses in form params

2. Token Delivery Behavior

  • Multiple tokens are always delivered together in the same location
  • No token splitting across different response locations
  • Consistent behavior across all response modes

3. Scope Impact

  • Response patterns are identical for both "profile email" and "openid profile email" scopes
  • OpenID scope is always included in returned scope, even when not requested
  • Google expands scopes to include full URL versions

4. Code Exchange Pattern

  • Any flow that includes "code" triggers a token exchange
  • Token exchange always returns both access_token and id_token
  • Code exchange happens regardless of response_mode

Security Analysis

Most Recommended Flows

1. Authorization Code Flow with Form Post

Configuration: response_type=code, response_mode=form_post

Security benefits:

  • No tokens in URLs or browser history
  • Credentials don't appear in server logs
  • Backend token exchange enables client authentication
  • Authorization code is short-lived and single-use

Implementation benefits:

  • Simple to implement and debug
  • Reliable across all browsers
  • No client-side token handling required

2. Authorization Code Flow with PKCE

Configuration: response_type=code with PKCE

Security benefits:

  • Protection against code interception
  • Same benefits as basic code flow
  • Recommended for mobile/native apps

Least Recommended Flows

1. Implicit Flow

Configuration: response_type=token or response_type=id_token

Security concerns:

  • Tokens exposed in URL fragment
  • No client authentication
  • Tokens might be logged in browser history
  • Higher risk of token leakage

Functional limitations:

  • No refresh tokens
  • Shorter token lifetimes required

2. Hybrid Flow with Query Mode

Configuration: response_type=code token id_token, response_mode=query

Security issues:

  • Tokens exposed in URL query parameters
  • Higher risk of token leakage through logs
  • Multiple tokens in transit simultaneously

Complexity issues:

  • Hard to implement correctly
  • More points of failure
  • Difficult to debug

Data Processing Commands

Result Masking for Security

# Mask sensitive tokens in results
cat oauth2_test_results.json | jq '
def mask_tokens:
  with_entries(
    if .key == "access_token" then
      .value = "access_token_exists"
    elif .key == "code" then
      .value = "code_exists"
    elif .key == "id_token" then
      .value = "id_token_exists"
    else
      .
    end
  );

def mask_fragment:
  if . != null then
    if contains("access_token=") then
      sub("access_token=[^&]*"; "access_token=access_token_exists")
    else . end |
    if contains("code=") then
      sub("code=[^&]*"; "code=code_exists")
    else . end |
    if contains("id_token=") then
      sub("id_token=[^&]*"; "id_token=id_token_exists")
    else . end
  else . end;

.[]| {
  response_mode,
  response_type,
  scope,
  response: {
    query_params: (.response.query_params // {} | mask_tokens),
    form_params: (.response.form_params // {} | mask_tokens),
    fragment: (.response.fragment | mask_fragment)
  }
}'

Filter Query Mode Results

# Extract query mode results only
cat oauth2_test_results.json | jq '
.[] | select(.response_mode == "query")
|{
  response_mode,
  response_type,
  scope,
  auth_url,
  response: {
    query_params: (.response.query_params // {}),
    form_params: (.response.form_params // {}),
    fragment: (.response.fragment)
  }
}'

Implementation Recommendations

Response Mode Comparison

Response Mode Security Reliability Use Case
form_post ✅ Highest ✅ Excellent Web applications
fragment ⚠️ Medium ✅ Good SPAs, mobile apps
query ❌ Lowest ✅ Good Legacy systems only

Best Practices

  1. Always use PKCE for code-based flows
  2. Implement state parameter for CSRF protection
  3. Use nonce parameter for replay protection
  4. Validate redirect URIs strictly
  5. Keep authorization codes short-lived (< 10 minutes)

Code vs Query/Fragment Acceptability

Question: Is it acceptable to use (response_type=code, response_mode=query or fragment)?

Answer: ACCEPTABLE WITH MITIGATIONS

Pros:

  • Still maintains core security feature (code is short-lived and single-use)
  • Widely supported across browsers and OAuth2 servers
  • Simple to implement and debug

⚠️ Considerations:

  • Authorization code exposed in URL
  • Could be logged by web servers or appear in browser history
  • BUT: Code is single-use and short-lived

Mitigations:

  • Always use PKCE
  • Keep code lifetime very short
  • Implement strict code replay protection
  • Consider upgrading to form_post when possible

Conclusion

The systematic testing revealed that OAuth2 flow selection significantly impacts both security and implementation complexity. Key takeaways:

  1. Form Post mode provides the best security profile for web applications
  2. Authorization code flows remain the gold standard for secure authentication
  3. Implicit flows should be avoided in favor of code-based alternatives
  4. Consistent testing is essential for understanding provider-specific behavior

The OAuth2 landscape continues evolving, but the fundamental principles revealed through this testing remain relevant: prioritize security, minimize token exposure, and thoroughly test your chosen configuration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment