Skip to content

Instantly share code, notes, and snippets.

@Japanuspus
Created May 25, 2020 12:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Japanuspus/495112129beb372ae4504d2092d03894 to your computer and use it in GitHub Desktop.
Save Japanuspus/495112129beb372ae4504d2092d03894 to your computer and use it in GitHub Desktop.
Example of python CLI application using the device-code flow to authorize against login.microsoft.com
import requests
import json
from pathlib import Path
import time
import subprocess
import re
from datetime import datetime
import logging
import argparse
"""
Example of device-code-flow authorization
See https://docs.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-device-code
"""
logger = logging.getLogger(__name__)
log_levels = {n: getattr(logging, n) for n in ['DEBUG', 'INFO', 'WARN', 'ERROR']}
def get_endpoint_url(config, endpoint):
"""
See https://docs.microsoft.com/en-us/azure/active-directory/develop/active-directory-v2-protocols#endpoints
Tenant can be
- common
- consumers
- organisations
- <tenant-id>
"""
tenant = config.get('tenant', config['tenant_id'])
return f'https://login.microsoftonline.com/{tenant}/oauth2/v2.0/{endpoint}'
def get_device_authorization_response(config):
# using tenant here means I need to be a member of tenant
r = requests.post(
url=get_endpoint_url(config, 'devicecode'),
data={
"tenant": config["tenant_id"],
"client_id": config["application_id"],
"scope": ' '.join(config["scope"])
})
assert r.ok
return r.json()
def wait_for_auth_token(config, device_authorization_response):
token = None
print(device_authorization_response["message"])
while True:
time.sleep(device_authorization_response["interval"])
r = requests.post(
url=get_endpoint_url(config, 'token'),
data={
"tenant": config["tenant_id"],
"client_id": config["application_id"],
"device_code": device_authorization_response['device_code'],
"grant_type": "urn:ietf:params:oauth:grant-type:device_code"
})
if r.ok:
token = r.json()
print("Authentication successful")
break
json = r.json()
if json.get("error", None)=="authorization_pending":
print(".", end='', flush=True)
else:
msg = json.get('error_description', None)
if not msg:
msg = f"** Invalid response **: {r.headers} {r.text}"
print(f"Authentication failed: {msg}")
break
return token
def get_auth_token(config_path, open_browser=True):
config_path = Path(config_path)
token_path = config_path.parent / f"{config_path.stem}-token-{datetime.now().strftime('%Y-%m-%dT%H%M%S')}.json"
config = json.loads(config_path.read_text())
logger.debug(f"Config read from {config_path}: {config}")
device_authorization_response = get_device_authorization_response(config)
logger.debug(f"Device auth response: {device_authorization_response}")
if open_browser:
subprocess.run(f"rundll32 url.dll,FileProtocolHandler {device_authorization_response['verification_uri']}")
auth_token = wait_for_auth_token(config, device_authorization_response)
if auth_token:
logger.debug(f"Writing token to {token_path}")
token_path.write_text(json.dumps({'config': config, 'response': auth_token}))
def cli():
p = argparse.ArgumentParser()
p.add_argument('config_path')
p.add_argument('--no-browser', dest='open_browser', action='store_false', default=True)
p.add_argument('--log-level', default='INFO', choices=list(log_levels.keys()))
args = p.parse_args()
logging.basicConfig(level=log_levels[args.log_level])
get_auth_token(args.config_path, args.open_browser)
if __name__ == '__main__':
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment