Last active
May 1, 2023 13:59
-
-
Save darrenjrobinson/553ea10e304246ebfa1eac6dde0cf63b to your computer and use it in GitHub Desktop.
Microsoft Graph using MSAL with Python and Delegated Permissions using a persistent local MSAL Cache. Associated Blogpost https://blog.darrenjrobinson.com/microsoft-graph-using-msal-with-python-and-delegated-permissions/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import msal | |
import jwt | |
import json | |
import sys | |
import requests | |
from datetime import datetime | |
from msal_extensions import * | |
graphURI = 'https://graph.microsoft.com' | |
tenantID = 'yourTenantID' | |
authority = 'https://login.microsoftonline.com/' + tenantID | |
clientID = 'yourAADRegisteredAppClientID' | |
scope = ["User.Read"] | |
username = 'yourAADUsername' | |
result = None | |
tokenExpiry = None | |
def msal_persistence(location, fallback_to_plaintext=False): | |
"""Build a suitable persistence instance based your current OS""" | |
if sys.platform.startswith('win'): | |
return FilePersistenceWithDataProtection(location) | |
if sys.platform.startswith('darwin'): | |
return KeychainPersistence(location, "my_service_name", "my_account_name") | |
return FilePersistence(location) | |
def msal_cache_accounts(clientID, authority): | |
# Accounts | |
persistence = msal_persistence("token_cache.bin") | |
print("Is this MSAL persistence cache encrypted?", persistence.is_encrypted) | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication( | |
client_id=clientID, authority=authority, token_cache=cache) | |
accounts = app.get_accounts() | |
print(accounts) | |
return accounts | |
def msal_delegated_refresh(clientID, scope, authority, account): | |
persistence = msal_persistence("token_cache.bin") | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication( | |
client_id=clientID, authority=authority, token_cache=cache) | |
result = app.acquire_token_silent_with_error( | |
scopes=scope, account=account) | |
return result | |
def msal_delegated_refresh_force(clientID, scope, authority, account): | |
persistence = msal_persistence("token_cache.bin") | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication( | |
client_id=clientID, authority=authority, token_cache=cache) | |
result = app.acquire_token_silent_with_error( | |
scopes=scope, account=account, force_refresh=True) | |
return result | |
def msal_delegated_device_flow(clientID, scope, authority): | |
print("Initiate Device Code Flow to get an AAD Access Token.") | |
print("Open a browser window and paste in the URL below and then enter the Code. CTRL+C to cancel.") | |
persistence = msal_persistence("token_cache.bin") | |
cache = PersistedTokenCache(persistence) | |
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache) | |
flow = app.initiate_device_flow(scopes=scope) | |
if "user_code" not in flow: | |
raise ValueError("Fail to create device flow. Err: %s" % json.dumps(flow, indent=4)) | |
print(flow["message"]) | |
sys.stdout.flush() | |
result = app.acquire_token_by_device_flow(flow) | |
return result | |
def msal_jwt_expiry(accessToken): | |
decodedAccessToken = jwt.decode(accessToken, verify=False) | |
accessTokenFormatted = json.dumps(decodedAccessToken, indent=2) | |
# Token Expiry | |
tokenExpiry = datetime.fromtimestamp(int(decodedAccessToken['exp'])) | |
print("Token Expires at: " + str(tokenExpiry)) | |
return tokenExpiry | |
def msgraph_request(resource, requestHeaders): | |
# Request | |
results = requests.get(resource, headers=requestHeaders).json() | |
return results | |
accounts = msal_cache_accounts(clientID, authority) | |
if accounts: | |
for account in accounts: | |
if account['username'] == username: | |
myAccount = account | |
print("Found account in MSAL Cache: " + account['username']) | |
print("Obtaining a new Access Token using the Refresh Token") | |
result = msal_delegated_refresh(clientID, scope, authority, myAccount) | |
if result is None: | |
# Get a new Access Token using the Device Code Flow | |
result = msal_delegated_device_flow(clientID, scope, authority) | |
else: | |
if result["access_token"]: | |
msal_jwt_expiry(result["access_token"]) | |
else: | |
# Get a new Access Token using the Device Code Flow | |
result = msal_delegated_device_flow(clientID, scope, authority) | |
if result["access_token"]: | |
msal_jwt_expiry(result["access_token"]) | |
# Query AAD Users based on voice query using DisplayName | |
print(graphURI + "/v1.0/me") | |
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'} | |
queryResults = msgraph_request(graphURI + "/v1.0/me",requestHeaders) | |
print(json.dumps(queryResults, indent=2)) | |
# Force Token Refresh | |
result = msal_delegated_refresh_force(clientID, scope, authority, myAccount) | |
if result is None: | |
# Get a new Access Token using the Device Code Flow | |
result = msal_delegated_device_flow(clientID, scope, authority) | |
else: | |
if result["access_token"]: | |
msal_jwt_expiry(result["access_token"]) | |
print(json.dumps(queryResults, indent=2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
2 questions:
1 - The whole consent experience is bypassable, right? If the user is redirected to a browser, I can't see technical reasons this couldn't all be dealt directly by the application (although I can imagine legal/regulatory reasons). Am I missing something?
2 - I created a toy app using delegated permissions and after obtaining a token using only the "openid" scope, I'm able to upload and delete files in my OneDrive. Why is that? Shouldn't I have to send other scopes, such as Files.ReadWrite for this to work?