Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Interactive Authentication to Microsoft Graph using MSAL with Python and Delegated Permissions. See associated blogpost https://blog.darrenjrobinson.com/interactive-authentication-to-microsoft-graph-using-msal-with-python-and-delegated-permissions/
import msal
import jwt
import json
import sys
import requests
from datetime import datetime
from msal_extensions import *
# Microsoft Azure PowerShell Client ID
clientID = '1950a258-227b-4e31-a9cf-717495945fc2'
scopes= ["https://management.azure.com/user_impersonation"]
username = input('Enter your Azure AD username: ')
tenantID = username.split('@')[1]
authority = 'https://login.microsoftonline.com/' + tenantID
result = None
tokenExpiry = None
accounts = None
myAccount = None
def msal_persistence(location, fallback_to_plaintext=False):
"""Build a suitable persistence instance based your current OS"""
if sys.platform.startswith('win'):
return FilePersistenceWithDataProtection(location)
if sys.platform.startswith('darwin'):
return KeychainPersistence(location, "my_service_name", "my_account_name")
return FilePersistence(location)
def msal_cache_accounts(clientID, authority):
# Accounts
persistence = msal_persistence("token_cache.bin")
print("Is this MSAL persistence cache encrypted?", persistence.is_encrypted)
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache)
accounts = app.get_accounts()
return accounts
def msal_delegated_refresh(clientID, scopes, authority, account):
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(
client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_silent_with_error(
scopes=scopes, account=account)
return result
def msal_delegated_refresh_force(clientID, scopes, authority, account):
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(
client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_silent_with_error(
scopes=scopes, account=account, force_refresh=True)
return result
def msal_delegated_interactive_flow(scopes, prompt=None, login_hint=None, domain_hint=None, claims_challenge=None, timeout=None, port=None, extra_scopes_to_consent=None):
print("Initate an Interactive Flow (auth via Browser) to get AAD Access and Refresh Tokens.")
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_interactive(scopes=scopes, prompt=None, login_hint=login_hint, domain_hint=domain_hint )
return result
def msal_jwt_expiry(accessToken):
decodedAccessToken = jwt.decode(accessToken, verify=False)
accessTokenFormatted = json.dumps(decodedAccessToken, indent=2)
# Token Expiry
tokenExpiry = datetime.fromtimestamp(int(decodedAccessToken['exp']))
print("Token Expires at: " + str(tokenExpiry))
return tokenExpiry
def msgraph_request(resource, requestHeaders):
# Request
results = requests.get(resource, headers=requestHeaders).json()
return results
accounts = msal_cache_accounts(clientID, authority)
if accounts:
for account in accounts:
if account['username'] == username:
myAccount = account
print("Found account in MSAL Cache: " + account['username'])
print("Attempting to obtain a new Access Token using the Refresh Token")
result = msal_delegated_refresh(clientID, scopes, authority, myAccount)
if result is None:
# Get a new Access Token using the Interactive Flow
print("Interactive Authentication required to obtain a new Access Token.")
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username)
else:
# No accounts found in the local MSAL Cache
# Trigger interactive authentication flow
print("First authentication for " +account['username'])
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username)
if result:
if result["access_token"]:
msal_jwt_expiry(result["access_token"])
# Query AAD for tenants the AAD User is Federated too
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'}
queryResults = msgraph_request("https://management.azure.com/tenants?api-version=2020-01-01",requestHeaders)
print(json.dumps(queryResults, indent=2))
else:
# User not found in MSAL Cache
# Trigger interactive authentication flow
print("First authentication for " +account['username'])
result = msal_delegated_interactive_flow(scopes=scopes, domain_hint=tenantID, login_hint=username)
if result["access_token"]:
msal_jwt_expiry(result["access_token"])
# Query AAD for tenants the AAD User is Federated too
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'}
queryResults = msgraph_request("https://management.azure.com/tenants?api-version=2020-01-01",requestHeaders)
print(json.dumps(queryResults, indent=2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment