-
-
Save ktaka-ccmp/915a3680f04f1704742f13e898ca668d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| # Check if input file is provided | |
| if [ $# -ne 1 ]; then | |
| echo "Usage: $0 results_file.json" | |
| exit 1 | |
| fi | |
| input_file=$1 | |
| output_file="${input_file%.*}_summary.json" | |
| jq ' | |
| def mask_tokens: | |
| with_entries( | |
| if .key == "access_token" then | |
| .value = "access_token_exists" | |
| elif .key == "code" then | |
| .value = "code_exists" | |
| elif .key == "id_token" then | |
| .value = "id_token_exists" | |
| elif .key == "refresh_token" then | |
| .value = "refresh_token_exists" | |
| else | |
| . | |
| end | |
| ); | |
| def mask_fragment_params: | |
| with_entries( | |
| if .key == "access_token" then | |
| .value = "access_token_exists" | |
| elif .key == "code" then | |
| .value = "code_exists" | |
| elif .key == "id_token" then | |
| .value = "id_token_exists" | |
| else | |
| . | |
| end | |
| ); | |
| def mask_raw_fragment: | |
| if . != null then | |
| if contains("access_token=") then | |
| sub("access_token=[^&]*"; "access_token=access_token_exists") | |
| else . end | | |
| if contains("code=") then | |
| sub("code=[^&]*"; "code=code_exists") | |
| else . end | | |
| if contains("id_token=") then | |
| sub("id_token=[^&]*"; "id_token=id_token_exists") | |
| else . end | |
| else . end; | |
| .[]| { | |
| response_mode, | |
| response_type, | |
| scope, | |
| response: { | |
| query_params: (.response.query_params // {} | mask_tokens), | |
| form_params: (.response.form_params // {} | mask_tokens), | |
| fragment_params: (.response.fragment_params // {} | mask_fragment_params), | |
| raw_fragment: (.response.raw_fragment | mask_raw_fragment) | |
| }, | |
| token_exchange: (.token_exchange // null | if . != null then mask_tokens else . end) | |
| }' "$input_file" > "$output_file" | |
| echo "Created summary file: $output_file" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import http.server | |
| import socketserver | |
| import urllib.parse | |
| import json | |
| import webbrowser | |
| import threading | |
| import time | |
| from datetime import datetime | |
| import argparse | |
| from typing import Dict, List | |
| import requests | |
| def load_env(env_path='.env'): | |
| config = {} | |
| try: | |
| with open(env_path) as f: | |
| for line in f: | |
| line = line.strip() | |
| if line and not line.startswith('#'): | |
| key, value = line.split('=', 1) | |
| config[key.strip()] = value.strip() | |
| except FileNotFoundError: | |
| print(f"Warning: {env_path} file not found") | |
| return config | |
| class Config: | |
| # Load environment variables | |
| env = load_env() | |
| # OAuth2 Configuration | |
| CLIENT_ID = env.get('OAUTH_CLIENT_ID') | |
| CLIENT_SECRET = env.get('OAUTH_CLIENT_SECRET') | |
| AUTH_ENDPOINT = env.get('OAUTH_AUTH_ENDPOINT', 'https://accounts.google.com/o/oauth2/v2/auth') | |
| TOKEN_ENDPOINT = env.get('OAUTH_TOKEN_ENDPOINT', 'https://oauth2.googleapis.com/token') | |
| PROXY_URL = env.get('OAUTH_PROXY_URL') | |
| LOCAL_PORT = int(env.get('OAUTH_LOCAL_PORT', '3000')) | |
| # Test combinations | |
| RESPONSE_TYPES = [ | |
| "code", | |
| "token", | |
| "id_token", | |
| "code token", | |
| "code id_token", | |
| "token id_token", | |
| "code token id_token" | |
| ] | |
| RESPONSE_MODES = [ | |
| "query", | |
| "fragment", | |
| "form_post" | |
| ] | |
| SCOPES = [ | |
| "profile email", | |
| "openid profile email" | |
| ] | |
| @classmethod | |
| def get_redirect_uri(cls) -> str: | |
| return f"{cls.PROXY_URL}/authorized" | |
| class OAuth2Handler(http.server.SimpleHTTPRequestHandler): | |
| received_callback = threading.Event() | |
| current_response = None | |
| def log_message(self, format, *args): | |
| pass | |
| def do_GET(self): | |
| self._handle_callback('query') | |
| def do_POST(self): | |
| if self.path == '/fragment': | |
| self._handle_fragment() | |
| else: | |
| self._handle_form_post() | |
| def _handle_form_post(self): | |
| try: | |
| content_length = int(self.headers.get('Content-Length', 0)) | |
| post_data = self.rfile.read(content_length).decode('utf-8') | |
| form_params = urllib.parse.parse_qs(post_data) | |
| form_params = {k: v[0] for k, v in form_params.items()} | |
| response_data = self._create_response_data('POST', form_params=form_params) | |
| OAuth2Handler.current_response = response_data | |
| OAuth2Handler.received_callback.set() | |
| self._send_html_response("Form Post", response_data) | |
| except Exception as e: | |
| print(f"Error handling form post: {e}") | |
| self._send_error_response() | |
| def _handle_fragment(self): | |
| try: | |
| content_length = int(self.headers['Content-Length']) | |
| post_data = self.rfile.read(content_length) | |
| fragment_data = json.loads(post_data) | |
| if OAuth2Handler.current_response and fragment_data.get('fragment'): | |
| OAuth2Handler.current_response['raw_fragment'] = fragment_data['fragment'] | |
| fragment = fragment_data['fragment'].lstrip('#') | |
| decoded_fragment = urllib.parse.unquote(fragment) | |
| pairs = [pair.split('=', 1) for pair in decoded_fragment.split('&')] | |
| fragment_params = {k: v for k, v in pairs} | |
| OAuth2Handler.current_response['fragment_params'] = fragment_params | |
| self.send_response(200) | |
| self.end_headers() | |
| except Exception as e: | |
| print(f"Error parsing fragment: {e}") | |
| if OAuth2Handler.current_response: | |
| OAuth2Handler.current_response['fragment_params'] = {} | |
| OAuth2Handler.current_response['raw_fragment'] = None | |
| self.send_response(500) | |
| self.end_headers() | |
| def _handle_callback(self, source_type): | |
| try: | |
| parsed_path = urllib.parse.urlparse(self.path) | |
| params = urllib.parse.parse_qs(parsed_path.query) | |
| query_params = {k: v[0] for k, v in params.items()} | |
| response_data = self._create_response_data('GET', query_params=query_params) | |
| OAuth2Handler.current_response = response_data | |
| OAuth2Handler.received_callback.set() | |
| self._send_html_response("Callback", response_data) | |
| except Exception as e: | |
| print(f"Error handling callback: {e}") | |
| self._send_error_response() | |
| def _create_response_data(self, method, query_params=None, form_params=None): | |
| return { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'path': self.path, | |
| 'method': method, | |
| 'query_params': query_params or {}, | |
| 'form_params': form_params or {}, | |
| 'fragment_params': {}, | |
| 'raw_fragment': None, | |
| 'headers': dict(self.headers), | |
| 'proxy_info': { | |
| 'forwarded_proto': self.headers.get('X-Forwarded-Proto', ''), | |
| 'forwarded_for': self.headers.get('X-Forwarded-For', ''), | |
| 'real_ip': self.headers.get('X-Real-IP', '') | |
| } | |
| } | |
| def _send_html_response(self, title, response_data): | |
| self.send_response(200) | |
| self.send_header('Content-type', 'text/html') | |
| self.end_headers() | |
| response_html = f""" | |
| <html> | |
| <head> | |
| <title>OAuth2 {title} Received</title> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; margin: 2em; }} | |
| pre {{ background: #f5f5f5; padding: 1em; border-radius: 4px; }} | |
| </style> | |
| </head> | |
| <body> | |
| <h1>OAuth2 {title} Received</h1> | |
| <pre>{json.dumps(response_data, indent=2)}</pre> | |
| <script> | |
| if(window.location.hash) {{ | |
| const hashData = {{ | |
| timestamp: '{response_data["timestamp"]}', | |
| fragment: window.location.hash | |
| }}; | |
| fetch('/fragment', {{ | |
| method: 'POST', | |
| headers: {{'Content-Type': 'application/json'}}, | |
| body: JSON.stringify(hashData) | |
| }}) | |
| .then(response => response.ok ? console.log('Fragment sent') : console.error('Fragment send failed')) | |
| .catch(error => console.error('Failed to send fragment:', error)); | |
| }} | |
| setTimeout(() => window.close(), 3000); | |
| </script> | |
| </body> | |
| </html> | |
| """ | |
| self.wfile.write(response_html.encode()) | |
| def _send_error_response(self): | |
| self.send_response(500) | |
| self.send_header('Content-type', 'text/html') | |
| self.end_headers() | |
| self.wfile.write(b'Internal Server Error') | |
| class OAuth2TestServer: | |
| def __init__(self, port: int): | |
| self.port = port | |
| self.server = None | |
| self.server_thread = None | |
| def start(self): | |
| try: | |
| self.server = socketserver.TCPServer(("", self.port), OAuth2Handler) | |
| self.server_thread = threading.Thread(target=self.server.serve_forever) | |
| self.server_thread.daemon = True | |
| self.server_thread.start() | |
| print(f"Local server started at http://localhost:{self.port}") | |
| print(f"Accessible via reverse proxy at {Config.PROXY_URL}") | |
| except Exception as e: | |
| print(f"Error starting server: {e}") | |
| raise | |
| def stop(self): | |
| if self.server: | |
| try: | |
| self.server.shutdown() | |
| self.server.server_close() | |
| except Exception as e: | |
| print(f"Error stopping server: {e}") | |
| def generate_auth_url(response_type: str, response_mode: str, scope: str) -> str: | |
| params = { | |
| 'client_id': Config.CLIENT_ID, | |
| 'response_type': response_type, | |
| 'response_mode': response_mode, | |
| 'scope': scope, | |
| 'redirect_uri': Config.get_redirect_uri(), | |
| 'state': 'xyz123', | |
| 'prompt': 'none' | |
| } | |
| if 'id_token' in response_type: | |
| params['nonce'] = 'test_nonce' | |
| query_string = urllib.parse.urlencode(params) | |
| return f"{Config.AUTH_ENDPOINT}?{query_string}" | |
| def exchange_code_for_tokens(code: str) -> Dict: | |
| try: | |
| token_data = { | |
| 'client_id': Config.CLIENT_ID, | |
| 'client_secret': Config.CLIENT_SECRET, | |
| 'code': code, | |
| 'grant_type': 'authorization_code', | |
| 'redirect_uri': Config.get_redirect_uri() | |
| } | |
| response = requests.post(Config.TOKEN_ENDPOINT, data=token_data) | |
| response.raise_for_status() | |
| return response.json() | |
| except Exception as e: | |
| print(f"Error exchanging code for tokens: {e}") | |
| return {'error': str(e)} | |
| def test_flow(response_type: str, response_mode: str, scope: str) -> Dict: | |
| print("\nTesting combination:") | |
| print(f"Response Type: {response_type}") | |
| print(f"Response Mode: {response_mode}") | |
| print(f"Scope: {scope}") | |
| OAuth2Handler.received_callback.clear() | |
| OAuth2Handler.current_response = None | |
| auth_url = generate_auth_url(response_type, response_mode, scope) | |
| print(f"\nOpening URL:\n{auth_url}\n") | |
| webbrowser.open(auth_url) | |
| callback_received = OAuth2Handler.received_callback.wait(timeout=60) | |
| if callback_received: | |
| # Wait for fragment processing | |
| time.sleep(1) | |
| response_data = OAuth2Handler.current_response | |
| print("\nResponse data received:") | |
| print(f"Query params: {response_data.get('query_params', {})}") | |
| print(f"Form params: {response_data.get('form_params', {})}") | |
| print(f"Fragment params: {response_data.get('fragment_params', {})}") | |
| # Look for code in all possible locations | |
| code = None | |
| if response_data.get('query_params', {}).get('code'): | |
| code = response_data['query_params']['code'] | |
| print("Found code in query params") | |
| elif response_data.get('form_params', {}).get('code'): | |
| code = response_data['form_params']['code'] | |
| print("Found code in form params") | |
| elif response_data.get('fragment_params', {}).get('code'): | |
| code = response_data['fragment_params']['code'] | |
| print("Found code in fragment params") | |
| token_response = None | |
| if code: | |
| print(f"\nExchanging code for tokens") | |
| print(f"Code location: {'fragment_params' if response_data.get('fragment_params', {}).get('code') else 'query/form'}") | |
| print(f"Code: {code}") | |
| token_response = exchange_code_for_tokens(code) | |
| else: | |
| print("\nNo code found to exchange") | |
| return { | |
| 'response_type': response_type, | |
| 'response_mode': response_mode, | |
| 'scope': scope, | |
| 'auth_url': auth_url, | |
| 'response': response_data, | |
| 'token_exchange': token_response | |
| } | |
| else: | |
| print("\nTimeout waiting for callback") | |
| return { | |
| 'response_type': response_type, | |
| 'response_mode': response_mode, | |
| 'scope': scope, | |
| 'auth_url': auth_url, | |
| 'error': 'Timeout waiting for callback' | |
| } | |
| def main(): | |
| parser = argparse.ArgumentParser(description='OAuth2 Flow Tester') | |
| parser.add_argument('--port', type=int, default=Config.LOCAL_PORT, | |
| help='Local port to listen on (should match reverse proxy configuration)') | |
| args = parser.parse_args() | |
| timestamp = datetime.now().strftime('%Y%m%d%H%M') | |
| results_file = f'results_{timestamp}.json' | |
| server = OAuth2TestServer(args.port) | |
| try: | |
| server.start() | |
| results = [] | |
| for response_mode in Config.RESPONSE_MODES: | |
| for response_type in Config.RESPONSE_TYPES: | |
| for scope in Config.SCOPES: | |
| result = test_flow(response_type, response_mode, scope) | |
| results.append(result) | |
| with open(results_file, 'w') as f: | |
| json.dump(results, f, indent=2) | |
| time.sleep(3) | |
| print(f"\nTesting completed. Results saved to {results_file}") | |
| except KeyboardInterrupt: | |
| print("\nShutting down...") | |
| finally: | |
| server.stop() | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
OAuth2 Flow Testing & Analysis
A systematic analysis of OAuth2 authentication flows using Google's OAuth2 implementation, testing various combinations of response types and response modes.
Testing Methodology
Environment Setup
# OAuth2 Configuration (.env) OAUTH_CLIENT_ID=your-client-id.apps.googleusercontent.com OAUTH_CLIENT_SECRET=your-client-secret OAUTH_AUTH_ENDPOINT=https://accounts.google.com/o/oauth2/v2/auth OAUTH_TOKEN_ENDPOINT=https://oauth2.googleapis.com/token OAUTH_PROXY_URL=https://7a6d-217-178-16-167.ngrok-free.app OAUTH_LOCAL_PORT=3000Test Parameters
Testing Script Overview
The Python script automatically:
Test Results Summary
Query Response Mode Results
Fragment Response Mode Results
Form Post Response Mode Results
Key Findings
1. Response Location Patterns
2. Token Delivery Behavior
3. Scope Impact
4. Code Exchange Pattern
Security Analysis
Most Recommended Flows
1. Authorization Code Flow with Form Post
Configuration:
response_type=code,response_mode=form_postSecurity benefits:
Implementation benefits:
2. Authorization Code Flow with PKCE
Configuration:
response_type=codewith PKCESecurity benefits:
Least Recommended Flows
1. Implicit Flow
Configuration:
response_type=tokenorresponse_type=id_tokenSecurity concerns:
Functional limitations:
2. Hybrid Flow with Query Mode
Configuration:
response_type=code token id_token,response_mode=querySecurity issues:
Complexity issues:
Data Processing Commands
Result Masking for Security
Filter Query Mode Results
Implementation Recommendations
Response Mode Comparison
form_postfragmentqueryBest Practices
Code vs Query/Fragment Acceptability
Question: Is it acceptable to use
(response_type=code, response_mode=query or fragment)?Answer: ACCEPTABLE WITH MITIGATIONS
✅ Pros:
Mitigations:
Conclusion
The systematic testing revealed that OAuth2 flow selection significantly impacts both security and implementation complexity. Key takeaways:
The OAuth2 landscape continues evolving, but the fundamental principles revealed through this testing remain relevant: prioritize security, minimize token exposure, and thoroughly test your chosen configuration.