Skip to content

Instantly share code, notes, and snippets.

@jhodges10
Last active May 20, 2025 13:12
Show Gist options
  • Save jhodges10/bd87ea0d7484abdb4dbc0eab26cbc238 to your computer and use it in GitHub Desktop.
Save jhodges10/bd87ea0d7484abdb4dbc0eab26cbc238 to your computer and use it in GitHub Desktop.
Get and refresh OAuth credentials for v4
# This is not a real credential id
OAUTH_WEB_APP_CREDENTIAL_ID=b0a201b3ac588195ccfbf64f25833dd4
# This is not a real client secret
OAUTH_WEB_APP_CLIENT_SECRET=p8e-XzFPWIsqazlRgETASNgWj0dctEfk9wHF

Getting and refreshing tokens via a CLI

This should be enough to get you started, it's quite basic but will handle getting you an auth token, and support refreshing it afterwards.

import os
import urllib.parse
import secrets
import requests
import webbrowser
import http.server
import socketserver
import threading
import ssl
import tempfile
from urllib.parse import urlparse, parse_qs
from dotenv import load_dotenv, find_dotenv
# Load environment variables from .env.local file
load_dotenv(find_dotenv(".env.local"))
# Set the URLs for the OAuth flow
TOKEN_URL = "https://ims-na1.adobelogin.com/ims/token/v3"
AUTHORIZATION_URL = "https://ims-na1.adobelogin.com/ims/authorize/v2"
OAUTH_SCOPES = "openid profile additional_info.roles offline_access"
# Set the Client ID and Client Secret from the environment variables
CLIENT_ID = os.getenv("OAUTH_WEB_APP_CREDENTIAL_ID")
CLIENT_SECRET = os.getenv("OAUTH_WEB_APP_CLIENT_SECRET")
# Set the host and port for the HTTPS server
HOST = "localhost"
PORT = 8000
REDIRECT_URI = f"https://{HOST}:{PORT}/callback"
# Store auth code globally so callback handler can access it
auth_code = None
auth_state = None
server_closed = threading.Event()
class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
global auth_code, auth_state
# Parse the query parameters from the URL
query = urlparse(self.path).query
query_params = parse_qs(query)
# Check if the path is our callback path or root path
path = urlparse(self.path).path
if path == "/callback" or path == "/":
# Extract authorization code and state
if "code" in query_params and "state" in query_params:
auth_code = query_params["code"][0]
auth_state = query_params["state"][0]
# Send a success response to the browser
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(
b"Authentication successful! You can close this window and return to the command line."
)
# Signal that we can stop the server
server_closed.set()
else:
self.send_response(400)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"Error: Missing required parameters.")
else:
self.send_response(404)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"Not found.")
def generate_self_signed_cert():
"""Generate a self-signed certificate for HTTPS"""
# Create temporary files for the key and certificate
cert_file = tempfile.NamedTemporaryFile(delete=False)
key_file = tempfile.NamedTemporaryFile(delete=False)
# Get the file paths
cert_path = cert_file.name
key_path = key_file.name
# Close the files so we can use them
cert_file.close()
key_file.close()
# Generate a self-signed certificate using OpenSSL
os.system(
f"openssl req -x509 -newkey rsa:4096 -nodes -out {cert_path} -keyout {key_path} "
f'-days 365 -subj "/CN={HOST}" -addext "subjectAltName=DNS:{HOST}"'
)
return cert_path, key_path
def start_https_server():
# Generate self-signed certificate
cert_path, key_path = generate_self_signed_cert()
# Create HTTPS server
httpd = socketserver.TCPServer((HOST, PORT), OAuthCallbackHandler)
# Wrap the socket with SSL
httpd.socket = ssl.wrap_socket(
httpd.socket, certfile=cert_path, keyfile=key_path, server_side=True
)
print(f"HTTPS Server started at https://{HOST}:{PORT}")
# Run the server until the server_closed event is set
while not server_closed.is_set():
httpd.handle_request()
# Shutdown the server
httpd.server_close()
# Clean up the temporary certificate files
try:
os.unlink(cert_path)
os.unlink(key_path)
except Exception as e:
print(f"Error removing temporary files: {e}")
def main():
# Generate random state for security
state = secrets.token_hex(16)
# Construct the authorization URL
auth_params = {
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": OAUTH_SCOPES,
"state": state,
}
auth_url = f"{AUTHORIZATION_URL}?{urllib.parse.urlencode(auth_params)}"
# Start the HTTPS server in a separate thread
server_thread = threading.Thread(target=start_https_server)
server_thread.daemon = True
server_thread.start()
# Give the server a moment to start
print("Starting HTTPS server...")
import time
time.sleep(1)
# Open the user's browser to the authorization URL
print(f"Opening browser to authorize: {auth_url}")
webbrowser.open(auth_url)
print("Waiting for authorization...")
# Wait for the callback to be received
server_closed.wait()
# Verify state parameter
if auth_state != state:
print("Error: State parameter doesn't match!")
return
# Exchange authorization code for access token
token_params = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"code": auth_code,
"redirect_uri": REDIRECT_URI,
}
print("Exchanging authorization code for access token...")
response = requests.post(TOKEN_URL, data=token_params)
if response.status_code == 200:
token_data = response.json()
access_token = token_data.get("access_token")
refresh_token = token_data.get("refresh_token")
print("\n\nAuthentication successful!\n")
print(f"Access token:\n\n{access_token}\n")
if refresh_token:
print(f"Refresh token:\n\n{refresh_token}\n")
# You might want to save these tokens for later use
# save_tokens(access_token, refresh_token)
else:
print(f"Error getting token: {response.status_code}")
print(response.text)
if __name__ == "__main__":
main()
[tool.poetry]
name = "python-cli-sample"
version = "0.1.0"
description = "Code sample script for fetching a \"OAuth Web App Credential (private)\""
[tool.poetry.dependencies]
python = "^3.9"
requests-oauthlib = "^2.0.0"
python-dotenv = "^1.0.1"
requests = "^2.32.3"
aiohttp = "^3.11.15"
aiofiles = "^24.1.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
import os
import argparse
import requests
import time
import base64
from dotenv import load_dotenv, find_dotenv
def load_environment():
"""Load environment variables from .env.local file"""
load_dotenv(find_dotenv(".env.local"))
# Set the URL for the token refresh
token_url = "https://ims-na1.adobelogin.com/ims/token/v3"
# Set the Client ID and Client Secret from the environment variables
client_id = os.getenv("OAUTH_WEB_APP_CREDENTIAL_ID")
client_secret = os.getenv("OAUTH_WEB_APP_CLIENT_SECRET")
return token_url, client_id, client_secret
def refresh_access_token(refresh_token, token_url, client_id, client_secret):
"""Use refresh token to get a new access token"""
# Create the Basic Authorization header
credentials = f"{client_id}:{client_secret}"
encoded_credentials = base64.b64encode(credentials.encode()).decode()
headers = {
"Authorization": f"Basic {encoded_credentials}",
"Content-Type": "application/x-www-form-urlencoded"
}
token_params = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
print("Refreshing access token...")
response = requests.post(token_url, headers=headers, data=token_params)
if response.status_code == 200:
token_data = response.json()
expires_in = token_data.get("expires_in")
current_time = int(time.time())
expiration_time = current_time + expires_in
days_until_expiry = (expiration_time - current_time) / (24 * 60 * 60)
print(f"\nToken expires in {days_until_expiry:.1f} days")
new_access_token = token_data.get("access_token")
new_refresh_token = token_data.get(
"refresh_token", refresh_token
) # Use new refresh token if provided, otherwise keep the old one
print("\n\nToken refresh successful!\n")
print(f"New access token:\n\n{new_access_token}\n")
if new_refresh_token != refresh_token:
print(f"New refresh token:\n\n{new_refresh_token}\n")
return new_access_token, new_refresh_token
else:
print(f"Error refreshing token: {response.status_code}")
print(response.text)
return None, None
def main():
parser = argparse.ArgumentParser(description="Refresh OAuth access token")
parser.add_argument("--refresh-token", required=True, help="OAuth refresh token")
parser.add_argument(
"--access-token",
help="Current access token (not required for refresh, but can be provided for reference)",
)
args = parser.parse_args()
# Load environment and configuration
token_url, client_id, client_secret = load_environment()
# Refresh the token
new_access_token, new_refresh_token = refresh_access_token(
args.refresh_token, token_url, client_id, client_secret
)
# You could save these tokens to a file or pass them back to another process
# save_tokens(new_access_token, new_refresh_token)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment