Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darrenjrobinson/553ea10e304246ebfa1eac6dde0cf63b to your computer and use it in GitHub Desktop.
Save darrenjrobinson/553ea10e304246ebfa1eac6dde0cf63b to your computer and use it in GitHub Desktop.
Microsoft Graph using MSAL with Python and Delegated Permissions using a persistent local MSAL Cache. Associated Blogpost https://blog.darrenjrobinson.com/microsoft-graph-using-msal-with-python-and-delegated-permissions/
import msal
import jwt
import json
import sys
import requests
from datetime import datetime
from msal_extensions import *
graphURI = 'https://graph.microsoft.com'
tenantID = 'yourTenantID'
authority = 'https://login.microsoftonline.com/' + tenantID
clientID = 'yourAADRegisteredAppClientID'
scope = ["User.Read"]
username = 'yourAADUsername'
result = None
tokenExpiry = None
def msal_persistence(location, fallback_to_plaintext=False):
"""Build a suitable persistence instance based your current OS"""
if sys.platform.startswith('win'):
return FilePersistenceWithDataProtection(location)
if sys.platform.startswith('darwin'):
return KeychainPersistence(location, "my_service_name", "my_account_name")
return FilePersistence(location)
def msal_cache_accounts(clientID, authority):
# Accounts
persistence = msal_persistence("token_cache.bin")
print("Is this MSAL persistence cache encrypted?", persistence.is_encrypted)
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(
client_id=clientID, authority=authority, token_cache=cache)
accounts = app.get_accounts()
print(accounts)
return accounts
def msal_delegated_refresh(clientID, scope, authority, account):
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(
client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_silent_with_error(
scopes=scope, account=account)
return result
def msal_delegated_refresh_force(clientID, scope, authority, account):
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(
client_id=clientID, authority=authority, token_cache=cache)
result = app.acquire_token_silent_with_error(
scopes=scope, account=account, force_refresh=True)
return result
def msal_delegated_device_flow(clientID, scope, authority):
print("Initiate Device Code Flow to get an AAD Access Token.")
print("Open a browser window and paste in the URL below and then enter the Code. CTRL+C to cancel.")
persistence = msal_persistence("token_cache.bin")
cache = PersistedTokenCache(persistence)
app = msal.PublicClientApplication(client_id=clientID, authority=authority, token_cache=cache)
flow = app.initiate_device_flow(scopes=scope)
if "user_code" not in flow:
raise ValueError("Fail to create device flow. Err: %s" % json.dumps(flow, indent=4))
print(flow["message"])
sys.stdout.flush()
result = app.acquire_token_by_device_flow(flow)
return result
def msal_jwt_expiry(accessToken):
decodedAccessToken = jwt.decode(accessToken, verify=False)
accessTokenFormatted = json.dumps(decodedAccessToken, indent=2)
# Token Expiry
tokenExpiry = datetime.fromtimestamp(int(decodedAccessToken['exp']))
print("Token Expires at: " + str(tokenExpiry))
return tokenExpiry
def msgraph_request(resource, requestHeaders):
# Request
results = requests.get(resource, headers=requestHeaders).json()
return results
accounts = msal_cache_accounts(clientID, authority)
if accounts:
for account in accounts:
if account['username'] == username:
myAccount = account
print("Found account in MSAL Cache: " + account['username'])
print("Obtaining a new Access Token using the Refresh Token")
result = msal_delegated_refresh(clientID, scope, authority, myAccount)
if result is None:
# Get a new Access Token using the Device Code Flow
result = msal_delegated_device_flow(clientID, scope, authority)
else:
if result["access_token"]:
msal_jwt_expiry(result["access_token"])
else:
# Get a new Access Token using the Device Code Flow
result = msal_delegated_device_flow(clientID, scope, authority)
if result["access_token"]:
msal_jwt_expiry(result["access_token"])
# Query AAD Users based on voice query using DisplayName
print(graphURI + "/v1.0/me")
requestHeaders = {'Authorization': 'Bearer ' + result["access_token"],'Content-Type': 'application/json'}
queryResults = msgraph_request(graphURI + "/v1.0/me",requestHeaders)
print(json.dumps(queryResults, indent=2))
# Force Token Refresh
result = msal_delegated_refresh_force(clientID, scope, authority, myAccount)
if result is None:
# Get a new Access Token using the Device Code Flow
result = msal_delegated_device_flow(clientID, scope, authority)
else:
if result["access_token"]:
msal_jwt_expiry(result["access_token"])
print(json.dumps(queryResults, indent=2))
@tinmule
Copy link

tinmule commented Jun 6, 2021

Hello Darren.
You did a good useful job.
I'm working with your script https://blog.darrenjrobinson.com/microsoft-graph-using-msal-with-python-and-delegated-permissions/ for the purpose of getting and writing data to my microsoft todo instance.
For example: get all task lists with an endpoint https://graph.microsoft.com/v1.0/me/todo/lists

Step 1.
Set scope = ["Tasks.ReadWrite"]
Graph Explorer shows the correct result.
I insert the access token into the site and see scp,od,email, idtyp, etc.
The script outputs one default empty list that id doesn't match my default list id.

Step 2.
When I change my authority='https://login.microsoftonline.com/common' insted authority='https://login.microsoftonline.com/'+tenantID i see all my tasks lists.
However, now an error occurs:
File "D:\Programs\PyOne\lib\site-packages\jwt\api_jws.py", line 183, in _load
raise DecodeError('Not enough segments')
jwt.exceptions.DecodeError: Not enough segments

Checking the token for jwt.ms shows an empty result (as opposed to step 1) similar to the token generated by Graph Explorer.

Qusetion:
Is it possible to get the msal_jwt_expiry functionality in this case (Step 2)?

Thank You.
Sincerely, Vladimir.

@darrenjrobinson
Copy link
Author

Hi Vladimir,
What version of PyJWT are you using?

  • PyJWT (we will be using this to decode the Microsoft Graph Access Token)
  • You will need v1.7.1 of PyJWT as Version 2.x + of PyJWT has breaking changes for jwt.decode.

@tinmule
Copy link

tinmule commented Jun 7, 2021

Hi Darren
jwt.version
Python 3.8.5 (default, Sep 3 2020, 21:29:08) [MSC v.1916 64 bit (AMD64)]
Type 'copyright', 'credits' or 'license' for more information
IPython 7.19.0 -- An enhanced Interactive Python. Type '?' for help.
PyDev console: using IPython 7.19.0
Out[1]: '1.7.1'

@tinmule
Copy link

tinmule commented Jun 7, 2021

For step 1, the token contains 2 characters '.', for step 2, the token does not contain '. '. So api_jws.py/ _def load(self, jwt):/ the string signing_input, crypto_segment = jwt.rsplit(b'.', 1) raises an exception.

@darrenjrobinson
Copy link
Author

That is very weird.
I cannot reproduce what you are being returned from the STS with a badly formatted token.
Maybe something in the configuration of the Registered Application and the scopes you are requesting??

image

@tinmule
Copy link

tinmule commented Jun 8, 2021

https://docs.microsoft.com/en-us/azure/active-directory/develop/access-tokens
"Be aware, however, that the tokens you receive for a Microsoft API might not always be a JWT, and that you can't always decode them."
This seems to be the case.
Thank you for participating.

@ubalklen
Copy link

2 questions:

1 - The whole consent experience is bypassable, right? If the user is redirected to a browser, I can't see technical reasons this couldn't all be dealt directly by the application (although I can imagine legal/regulatory reasons). Am I missing something?

2 - I created a toy app using delegated permissions and after obtaining a token using only the "openid" scope, I'm able to upload and delete files in my OneDrive. Why is that? Shouldn't I have to send other scopes, such as Files.ReadWrite for this to work?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment