Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darrenjrobinson/34dc0925724426823c79f46397d950b9 to your computer and use it in GitHub Desktop.
Save darrenjrobinson/34dc0925724426823c79f46397d950b9 to your computer and use it in GitHub Desktop.
Interactive Authentication to Microsoft Graph using MSAL with Python and Delegated Permissions. See associated blogpost https://blog.darrenjrobinson.com/interactive-authentication-to-microsoft-graph-using-msal-with-python-and-delegated-permissions/
import msal
import jwt
import json
import sys
import requests
from datetime import datetime
from msal_extensions import *
# Microsoft Azure PowerShell Client ID
clientID = '1950a258-227b-4e31-a9cf-717495945fc2'
scopes= ["https://management.azure.com/user_impersonation"]
username = input('Enter your Azure AD username: ')
tenantID = username.split('@')[1]
authority = 'https://login.microsoftonline.com/' + tenantID
result = None
tokenExpiry = None
accounts = None
myAccount = None
def msal_persistence(location, fallback_to_plaintext=False):
"""Build a suitable persistence instance based your current OS"""
if sys.platform.startswith('win'):
return FilePersistenceWithDataProtection(location)
if sys.platform.startswith('darwin'):
return KeychainPersistence(location, "my_service_name", "my_account_name")
return FilePersistence(location)
def msal_cache_accounts(clientID, authority):
# Accounts
persistence = msal_persistence("token_cache.bin")
print("Is this MSAL persistence cache encrypted?", persistence.is_encrypted)
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache)
accounts = app.get_accounts()
return accounts
def msal_delegated_refresh(clientID, scopes, authority, account):
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(
client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_silent_with_error(
scopes=scopes, account=account)
return result
def msal_delegated_refresh_force(clientID, scopes, authority, account):
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(
client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_silent_with_error(
scopes=scopes, account=account, force_refresh=True)
return result
def msal_delegated_interactive_flow(scopes, prompt=None, login_hint=None, domain_hint=None, claims_challenge=None, timeout=None, port=None, extra_scopes_to_consent=None):
print("Initate an Interactive Flow (auth via Browser) to get AAD Access and Refresh Tokens.")
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_interactive(scopes=scopes, prompt=None, login_hint=login_hint, domain_hint=domain_hint )
return result
def msal_jwt_expiry(accessToken):
decodedAccessToken = jwt.decode(accessToken, verify=False)
accessTokenFormatted = json.dumps(decodedAccessToken, indent=2)
# Token Expiry
tokenExpiry = datetime.fromtimestamp(int(decodedAccessToken['exp']))
print("Token Expires at: " + str(tokenExpiry))
return tokenExpiry
def msgraph_request(resource, requestHeaders):
# Request
results = requests.get(resource, headers=requestHeaders).json()
return results
accounts = msal_cache_accounts(clientID, authority)
if accounts:
for account in accounts:
if account['username'] == username:
myAccount = account
print("Found account in MSAL Cache: " + account['username'])
print("Attempting to obtain a new Access Token using the Refresh Token")
result = msal_delegated_refresh(clientID, scopes, authority, myAccount)
if result is None:
# Get a new Access Token using the Interactive Flow
print("Interactive Authentication required to obtain a new Access Token.")
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username)
else:
# No accounts found in the local MSAL Cache
# Trigger interactive authentication flow
print("First authentication for " +account['username'])
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username)
if result:
if result["access_token"]:
msal_jwt_expiry(result["access_token"])
# Query AAD for tenants the AAD User is Federated too
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'}
queryResults = msgraph_request("https://management.azure.com/tenants?api-version=2020-01-01",requestHeaders)
print(json.dumps(queryResults, indent=2))
else:
# User not found in MSAL Cache
# Trigger interactive authentication flow
print("First authentication for " +account['username'])
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username)
if result["access_token"]:
msal_jwt_expiry(result["access_token"])
# Query AAD for tenants the AAD User is Federated too
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'}
queryResults = msgraph_request("https://management.azure.com/tenants?api-version=2020-01-01",requestHeaders)
print(json.dumps(queryResults, indent=2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment