This should be enough to get you started, it's quite basic but will handle getting you an auth token, and support refreshing it afterwards.
-
-
Save jhodges10/bd87ea0d7484abdb4dbc0eab26cbc238 to your computer and use it in GitHub Desktop.
Get and refresh OAuth credentials for v4
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is not a real credential id | |
OAUTH_WEB_APP_CREDENTIAL_ID=b0a201b3ac588195ccfbf64f25833dd4 | |
# This is not a real client secret | |
OAUTH_WEB_APP_CLIENT_SECRET=p8e-XzFPWIsqazlRgETASNgWj0dctEfk9wHF |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import urllib.parse | |
import secrets | |
import requests | |
import webbrowser | |
import http.server | |
import socketserver | |
import threading | |
import ssl | |
import tempfile | |
from urllib.parse import urlparse, parse_qs | |
from dotenv import load_dotenv, find_dotenv | |
# Load environment variables from .env.local file | |
load_dotenv(find_dotenv(".env.local")) | |
# Set the URLs for the OAuth flow | |
TOKEN_URL = "https://ims-na1.adobelogin.com/ims/token/v3" | |
AUTHORIZATION_URL = "https://ims-na1.adobelogin.com/ims/authorize/v2" | |
OAUTH_SCOPES = "openid profile additional_info.roles offline_access" | |
# Set the Client ID and Client Secret from the environment variables | |
CLIENT_ID = os.getenv("OAUTH_WEB_APP_CREDENTIAL_ID") | |
CLIENT_SECRET = os.getenv("OAUTH_WEB_APP_CLIENT_SECRET") | |
# Set the host and port for the HTTPS server | |
HOST = "localhost" | |
PORT = 8000 | |
REDIRECT_URI = f"https://{HOST}:{PORT}/callback" | |
# Store auth code globally so callback handler can access it | |
auth_code = None | |
auth_state = None | |
server_closed = threading.Event() | |
class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler): | |
def do_GET(self): | |
global auth_code, auth_state | |
# Parse the query parameters from the URL | |
query = urlparse(self.path).query | |
query_params = parse_qs(query) | |
# Check if the path is our callback path or root path | |
path = urlparse(self.path).path | |
if path == "/callback" or path == "/": | |
# Extract authorization code and state | |
if "code" in query_params and "state" in query_params: | |
auth_code = query_params["code"][0] | |
auth_state = query_params["state"][0] | |
# Send a success response to the browser | |
self.send_response(200) | |
self.send_header("Content-type", "text/html") | |
self.end_headers() | |
self.wfile.write( | |
b"Authentication successful! You can close this window and return to the command line." | |
) | |
# Signal that we can stop the server | |
server_closed.set() | |
else: | |
self.send_response(400) | |
self.send_header("Content-type", "text/html") | |
self.end_headers() | |
self.wfile.write(b"Error: Missing required parameters.") | |
else: | |
self.send_response(404) | |
self.send_header("Content-type", "text/html") | |
self.end_headers() | |
self.wfile.write(b"Not found.") | |
def generate_self_signed_cert(): | |
"""Generate a self-signed certificate for HTTPS""" | |
# Create temporary files for the key and certificate | |
cert_file = tempfile.NamedTemporaryFile(delete=False) | |
key_file = tempfile.NamedTemporaryFile(delete=False) | |
# Get the file paths | |
cert_path = cert_file.name | |
key_path = key_file.name | |
# Close the files so we can use them | |
cert_file.close() | |
key_file.close() | |
# Generate a self-signed certificate using OpenSSL | |
os.system( | |
f"openssl req -x509 -newkey rsa:4096 -nodes -out {cert_path} -keyout {key_path} " | |
f'-days 365 -subj "/CN={HOST}" -addext "subjectAltName=DNS:{HOST}"' | |
) | |
return cert_path, key_path | |
def start_https_server(): | |
# Generate self-signed certificate | |
cert_path, key_path = generate_self_signed_cert() | |
# Create HTTPS server | |
httpd = socketserver.TCPServer((HOST, PORT), OAuthCallbackHandler) | |
# Wrap the socket with SSL | |
httpd.socket = ssl.wrap_socket( | |
httpd.socket, certfile=cert_path, keyfile=key_path, server_side=True | |
) | |
print(f"HTTPS Server started at https://{HOST}:{PORT}") | |
# Run the server until the server_closed event is set | |
while not server_closed.is_set(): | |
httpd.handle_request() | |
# Shutdown the server | |
httpd.server_close() | |
# Clean up the temporary certificate files | |
try: | |
os.unlink(cert_path) | |
os.unlink(key_path) | |
except Exception as e: | |
print(f"Error removing temporary files: {e}") | |
def main(): | |
# Generate random state for security | |
state = secrets.token_hex(16) | |
# Construct the authorization URL | |
auth_params = { | |
"client_id": CLIENT_ID, | |
"redirect_uri": REDIRECT_URI, | |
"response_type": "code", | |
"scope": OAUTH_SCOPES, | |
"state": state, | |
} | |
auth_url = f"{AUTHORIZATION_URL}?{urllib.parse.urlencode(auth_params)}" | |
# Start the HTTPS server in a separate thread | |
server_thread = threading.Thread(target=start_https_server) | |
server_thread.daemon = True | |
server_thread.start() | |
# Give the server a moment to start | |
print("Starting HTTPS server...") | |
import time | |
time.sleep(1) | |
# Open the user's browser to the authorization URL | |
print(f"Opening browser to authorize: {auth_url}") | |
webbrowser.open(auth_url) | |
print("Waiting for authorization...") | |
# Wait for the callback to be received | |
server_closed.wait() | |
# Verify state parameter | |
if auth_state != state: | |
print("Error: State parameter doesn't match!") | |
return | |
# Exchange authorization code for access token | |
token_params = { | |
"grant_type": "authorization_code", | |
"client_id": CLIENT_ID, | |
"client_secret": CLIENT_SECRET, | |
"code": auth_code, | |
"redirect_uri": REDIRECT_URI, | |
} | |
print("Exchanging authorization code for access token...") | |
response = requests.post(TOKEN_URL, data=token_params) | |
if response.status_code == 200: | |
token_data = response.json() | |
access_token = token_data.get("access_token") | |
refresh_token = token_data.get("refresh_token") | |
print("\n\nAuthentication successful!\n") | |
print(f"Access token:\n\n{access_token}\n") | |
if refresh_token: | |
print(f"Refresh token:\n\n{refresh_token}\n") | |
# You might want to save these tokens for later use | |
# save_tokens(access_token, refresh_token) | |
else: | |
print(f"Error getting token: {response.status_code}") | |
print(response.text) | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[tool.poetry] | |
name = "python-cli-sample" | |
version = "0.1.0" | |
description = "Code sample script for fetching a \"OAuth Web App Credential (private)\"" | |
[tool.poetry.dependencies] | |
python = "^3.9" | |
requests-oauthlib = "^2.0.0" | |
python-dotenv = "^1.0.1" | |
requests = "^2.32.3" | |
aiohttp = "^3.11.15" | |
aiofiles = "^24.1.0" | |
[build-system] | |
requires = ["poetry-core"] | |
build-backend = "poetry.core.masonry.api" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import argparse | |
import requests | |
import time | |
import base64 | |
from dotenv import load_dotenv, find_dotenv | |
def load_environment(): | |
"""Load environment variables from .env.local file""" | |
load_dotenv(find_dotenv(".env.local")) | |
# Set the URL for the token refresh | |
token_url = "https://ims-na1.adobelogin.com/ims/token/v3" | |
# Set the Client ID and Client Secret from the environment variables | |
client_id = os.getenv("OAUTH_WEB_APP_CREDENTIAL_ID") | |
client_secret = os.getenv("OAUTH_WEB_APP_CLIENT_SECRET") | |
return token_url, client_id, client_secret | |
def refresh_access_token(refresh_token, token_url, client_id, client_secret): | |
"""Use refresh token to get a new access token""" | |
# Create the Basic Authorization header | |
credentials = f"{client_id}:{client_secret}" | |
encoded_credentials = base64.b64encode(credentials.encode()).decode() | |
headers = { | |
"Authorization": f"Basic {encoded_credentials}", | |
"Content-Type": "application/x-www-form-urlencoded" | |
} | |
token_params = { | |
"grant_type": "refresh_token", | |
"refresh_token": refresh_token, | |
} | |
print("Refreshing access token...") | |
response = requests.post(token_url, headers=headers, data=token_params) | |
if response.status_code == 200: | |
token_data = response.json() | |
expires_in = token_data.get("expires_in") | |
current_time = int(time.time()) | |
expiration_time = current_time + expires_in | |
days_until_expiry = (expiration_time - current_time) / (24 * 60 * 60) | |
print(f"\nToken expires in {days_until_expiry:.1f} days") | |
new_access_token = token_data.get("access_token") | |
new_refresh_token = token_data.get( | |
"refresh_token", refresh_token | |
) # Use new refresh token if provided, otherwise keep the old one | |
print("\n\nToken refresh successful!\n") | |
print(f"New access token:\n\n{new_access_token}\n") | |
if new_refresh_token != refresh_token: | |
print(f"New refresh token:\n\n{new_refresh_token}\n") | |
return new_access_token, new_refresh_token | |
else: | |
print(f"Error refreshing token: {response.status_code}") | |
print(response.text) | |
return None, None | |
def main(): | |
parser = argparse.ArgumentParser(description="Refresh OAuth access token") | |
parser.add_argument("--refresh-token", required=True, help="OAuth refresh token") | |
parser.add_argument( | |
"--access-token", | |
help="Current access token (not required for refresh, but can be provided for reference)", | |
) | |
args = parser.parse_args() | |
# Load environment and configuration | |
token_url, client_id, client_secret = load_environment() | |
# Refresh the token | |
new_access_token, new_refresh_token = refresh_access_token( | |
args.refresh_token, token_url, client_id, client_secret | |
) | |
# You could save these tokens to a file or pass them back to another process | |
# save_tokens(new_access_token, new_refresh_token) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment