Last active
September 25, 2022 10:01
-
-
Save charbonnierg/c1da34d4184a47bbb585c20db87ec3d1 to your computer and use it in GitHub Desktop.
OIDC Authenticator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Two classes: | |
- OIDCAuthenticator | |
- OIDCClientAuthenticator | |
Dependencies: | |
- PyJWT: API to decode JWT tokens | |
- cryptography: Cryptography algorithms | |
- pydantic: JSON validation library | |
- httpx: Async HTTP client | |
Default well-known: https://lemur-16.cloud-iam.com/auth/realms/quara-demo/.well-known/openid-configuration | |
Allowed Grant types: | |
- authorization_code | |
- refresh_token | |
- password | |
- client_credentials | |
Additional helper: | |
- Browser login | |
- Decode token | |
- Decode access token | |
- Decode ID token | |
""" | |
import types | |
import typing as t | |
import urllib.parse | |
import httpx | |
import jwt | |
import jwt.algorithms | |
import jwt.exceptions | |
from pydantic import BaseModel, Extra, Field | |
ScopeType = t.Union[str, t.List[str], t.Tuple[str], t.Set[str]] | |
WELL_KNOWN = "https://lemur-16.cloud-iam.com/auth/realms/quara-demo/.well-known/openid-configuration" | |
class UntrustedGrant(BaseModel, extra=Extra.allow): | |
"""JSON data received from OIDC token endpoint""" | |
access_token: str | |
id_token: t.Optional[str] = None | |
refresh_token: t.Optional[str] = None | |
class BaseJWT(BaseModel, extra=Extra.allow): | |
"""Common fields between OIDC ID Token and OAuth2 Access Token""" | |
acr: str | |
aud: str | |
azp: str | |
email: str | |
email_verified: bool | |
exp: int | |
family_name: str | |
given_name: str | |
iat: int | |
iss: str | |
jti: str | |
name: str | |
preferred_username: str | |
session_state: str | |
sid: str | |
sub: str | |
typ: str | |
class Access(BaseModel, extra=Extra.allow): | |
"""Access holds a list of roles""" | |
roles: t.List[str] = [] | |
class AccessToken(BaseJWT): | |
"""Access token received from OIDC provider""" | |
typ: t.Literal["Bearer"] | |
allowed_origins: t.List[str] = Field(alias="allowed-origins") | |
realm_access: Access | |
resource_access: t.Dict[str, Access] | |
scope: str | |
class IDToken(BaseJWT): | |
"""ID token received from OIDC provider""" | |
typ: t.Literal["ID"] | |
at_hash: str | |
auth_time: int | |
class Grant(BaseModel, extra=Extra.ignore): | |
"""Validated data received from the OIDC token endpoint""" | |
access_token: AccessToken | |
id_token: t.Optional[IDToken] = None | |
refresh_token: t.Optional[str] = None | |
def write_scope(scope: t.Optional[ScopeType]) -> str: | |
"""Write scopes as a space-delimited string.""" | |
if isinstance(scope, str): | |
scope = scope.split(" ") | |
elif scope is None: | |
scope = scope = [] | |
else: | |
scope = list(scope) | |
if "offline_access" not in scope: | |
scope.insert(0, "offline_access") | |
if "email" not in scope: | |
scope.insert(0, "email") | |
if "profile" not in scope: | |
scope.insert(0, "profile") | |
if "openid" not in scope: | |
scope.insert(0, "openid") | |
return " ".join(scope) | |
class OIDCAuthenticator: | |
"""An OIDCAuthenticator communicates with an OIDC provider to perform one of the | |
following grants: | |
- Password grant | |
- Client credentials grant | |
- Authorization code grant | |
- Refresh grant | |
""" | |
def __init__( | |
self, | |
well_known_uri: str = WELL_KNOWN, | |
) -> None: | |
"""Create a new OIDCAuthenticator instance. | |
Arguments: | |
well_known_uri: An URL pointing to well known OIDC configuration. | |
""" | |
self.client = httpx.AsyncClient() | |
self.default_algorithms = jwt.algorithms.get_default_algorithms() | |
self.well_known_uri = well_known_uri | |
self.well_known: t.Dict[str, t.Any] = {} | |
self._issuer_public_key: t.Optional[t.Any] = None | |
self._algorithm: t.Optional[str] = None | |
def __getitem__(self, client_id: str) -> "OIDCClientAuthenticator": | |
return OIDCClientAuthenticator(self, client_id) | |
async def __aenter__(self) -> "OIDCAuthenticator": | |
await self.start() | |
return self | |
async def __aexit__( | |
self, | |
exc_type: t.Optional[BaseException] = None, | |
exc: t.Optional[BaseException] = None, | |
tb: t.Optional[types.TracebackType] = None, | |
) -> None: | |
await self.stop() | |
return None | |
async def start(self) -> None: | |
"""Start the OIDCAuthenticator.""" | |
well_known = await self.client.get(self.well_known_uri) | |
well_known.raise_for_status() | |
self.well_known.update(well_known.json()) | |
await self.refresh_issuer_public_key() | |
async def stop(self) -> None: | |
"""Stop the OIDCAuthenticator.""" | |
await self.client.aclose() | |
def get_url(self, endpoint: str) -> str: | |
"""Get the URl for given endpoint. | |
Arguments: | |
endpoint: a valid endpoint name from the OpenID Connect Discovery 1.0 specification | |
Examples: | |
>>> async with OIDCAuthenticator() as auth: | |
>>> auth_url = auth.get_url("authorization_endpoint") | |
>>> token_url = auth.get_url("token_endpoint") | |
""" | |
try: | |
return str(self.well_known[endpoint]) | |
except KeyError: | |
raise KeyError(f"No URL configured for endpoint: {endpoint}") | |
def get_authorization_url( | |
self, | |
client_id: str, | |
redirect_uri: str, | |
scope: t.Optional[ScopeType] = None, | |
state: t.Optional[str] = None, | |
response_type: str = "code", | |
) -> str: | |
""" | |
Get authorization URL to redirect the resource owner to. | |
https://tools.ietf.org/html/rfc6749#section-4.1.1 | |
Arguments: | |
client_id: OIDC client ID | |
redirect_uri: Absolute URL of the client where the user-agent will be redirected to. | |
scope: Space delimited list of strings, or iterable of strings. | |
state: An opaque value used by the client to maintain state between the request and callback. | |
response_type: Use "code" to perform authorization grant, or "token" to perform implicit grant. | |
Return: | |
URL to redirect the resource owner to. | |
""" | |
optional_parameters: t.Dict[str, t.Any] = {} | |
if state: | |
optional_parameters["state"] = state | |
params = urllib.parse.urlencode( | |
{ | |
"client_id": client_id, | |
"response_type": response_type, | |
"redirect_uri": redirect_uri, | |
"scope": write_scope(scope), | |
**optional_parameters, | |
} | |
) | |
url = self.get_url("authorization_endpoint") | |
return "{}?{}".format(url, params) | |
def get_client( | |
self, | |
client_id: str, | |
client_secret: t.Optional[str] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
) -> "OIDCClientAuthenticator": | |
"""Get an OIDCAuthenticator client dedicated to a single OIDC client.""" | |
return OIDCClientAuthenticator( | |
self, | |
client_id=client_id, | |
client_secret=client_secret, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
) | |
async def refresh_issuer_public_key(self) -> None: | |
"""Fetch the issuer public key from remote OIDC provider.""" | |
jwks_url = self.get_url("jwks_uri") | |
response = await self.client.get(jwks_url) | |
response.raise_for_status() | |
jwks: t.List[t.Dict[str, t.Any]] = response.json()["keys"] | |
for key in jwks: | |
if "alg" in key and key["alg"] in self.default_algorithms: | |
self._algorithm = key["alg"] | |
self._issuer_public_key = self.default_algorithms[ | |
self._algorithm | |
].from_jwk(key) | |
break | |
else: | |
raise KeyError( | |
f"No public found with supported algorithm ({list(self.default_algorithms)})" | |
) | |
def get_algorithm(self) -> str: | |
"""Get algorithm used to validate JWT""" | |
if self._algorithm: | |
return self._algorithm | |
raise ValueError( | |
"Algorithm is not configured. Make sure OIDCAuthenticator is started." | |
) | |
def get_issuer_public_key(self) -> t.Any: | |
"""Get public key used to validate JWT.""" | |
if self._issuer_public_key: | |
return self._issuer_public_key | |
raise ValueError( | |
"Issuer public key is not configured. Make sure OIDCAuthenticator is started." | |
) | |
def decode_token( | |
self, | |
token: str, | |
audience: t.Optional[str] = None, | |
verify_signature: bool = True, | |
verify_audience: bool = True, | |
) -> t.Dict[str, t.Any]: | |
"""Decode a JWT using issuer public key. | |
By default JWT signature is verified and audience is verified. | |
Arguments: | |
token: The token to decode as a string. | |
audience: Value to check for audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
Returns: | |
A dictionary holding fields found within JWT. | |
""" | |
key = self.get_issuer_public_key() | |
algorithm = self.get_algorithm() | |
options: t.Dict[str, t.Any] = {"verify_signature": verify_signature} | |
if verify_audience: | |
options["verify_aud"] = True | |
else: | |
options["verify_aud"] = False | |
return jwt.decode( | |
token, | |
key=key, | |
algorithms=[algorithm], | |
audience=audience, | |
options=options, | |
) | |
def decode_access_token( | |
self, | |
token: str, | |
audience: t.Optional[str] = None, | |
verify_signature: bool = True, | |
verify_audience: bool = True, | |
) -> AccessToken: | |
"""Decode an OIDC access token using issuer public key. | |
Access token hold informations regarding user identity, just like ID tokens, | |
but they also hold authorization information such as: | |
- realm access (roles) | |
- resource access (roles) | |
- scopes | |
Arguments: | |
token: The token to decode as a string. | |
audience: Value to check for audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
Returns: | |
An AccessToken instance. | |
""" | |
return AccessToken.parse_obj( | |
self.decode_token( | |
token, | |
audience=audience, | |
verify_signature=verify_signature, | |
verify_audience=verify_audience, | |
) | |
) | |
def decode_id_token( | |
self, | |
token: str, | |
audience: t.Optional[str] = None, | |
verify_signature: bool = True, | |
verify_audience: bool = True, | |
) -> IDToken: | |
"""Decode an OIDC ID token using issuer public key. | |
ID tokens can be used to prove that user is AUTHENTICATED. | |
It holds information regarding IDENTITY of the user. | |
They should never be used for authorization purpose. | |
Use the access token instead. | |
Arguments: | |
token: The token to decode as a string. | |
audience: Value to check for audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
Returns: | |
An IDToken instance. | |
""" | |
return IDToken.parse_obj( | |
self.decode_token( | |
token, | |
audience=audience, | |
verify_signature=verify_signature, | |
verify_audience=verify_audience, | |
) | |
) | |
async def oidc_token_request( | |
self, | |
client_id: str, | |
grant_type: str, | |
scope: t.Optional[ScopeType] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: bool = True, | |
verify_signature: bool = True, | |
**kwargs: t.Any, | |
) -> Grant: | |
"""Send a request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
Arguments: | |
client_id: OIDC client ID. | |
grant_type: One of "authorization_code", "refresh_token", "password", "client_credentials". | |
scope: Space delimited list of strings, or iterable of strings. | |
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False. | |
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
kwargs: Extra arguments specific to each grant type. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
scope = write_scope(scope) | |
response = await self.client.post( | |
self.well_known["token_endpoint"], | |
data={ | |
"client_id": client_id, | |
"grant_type": grant_type, | |
"scope": scope, | |
**kwargs, | |
}, | |
) | |
response.raise_for_status() | |
untrusted = UntrustedGrant.parse_obj(response.json()) | |
return Grant( | |
access_token=self.decode_access_token( | |
untrusted.access_token, | |
audience=access_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
), | |
id_token=self.decode_id_token( | |
untrusted.id_token, | |
audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
if untrusted.id_token | |
else None, | |
refresh_token=untrusted.refresh_token, | |
) | |
async def oidc_password_grant( | |
self, | |
client_id: str, | |
username: str, | |
password: str, | |
scope: t.Optional[ScopeType] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: bool = True, | |
verify_signature: bool = True, | |
) -> Grant: | |
"""Send a password grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
Arguments: | |
client_id: OIDC client ID. | |
username: Name of user. | |
password: Password of user. | |
scope: Space delimited list of strings, or iterable of strings. | |
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False. | |
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
return await self.oidc_token_request( | |
client_id=client_id, | |
grant_type="password", | |
scope=scope, | |
username=username, | |
password=password, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
async def oidc_authorization_code_grant( | |
self, | |
client_id: str, | |
code: str, | |
redirect_uri: str, | |
state: t.Optional[str] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: bool = True, | |
verify_signature: bool = True, | |
) -> Grant: | |
"""Send an authorization code grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
This method is mostly useful when an HTTP server is listening for requests. | |
Arguments: | |
client_id: OIDC client ID. | |
code: The value of authorization code received as request query param. | |
redirect_uri: The exact redirect URI used when generating the authorization URL visited to obtain authorization code. | |
state: An opaque value used by the client to maintain state between the request and callback. | |
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False. | |
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
optional_parameters: t.Dict[str, t.Any] = {} | |
if state: | |
optional_parameters["state"] = state | |
return await self.oidc_token_request( | |
client_id=client_id, | |
grant_type="authorization_code", | |
code=code, | |
redirect_uri=redirect_uri, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
**optional_parameters, | |
) | |
async def oidc_refresh_token_grant( | |
self, | |
client_id: str, | |
refresh_token: str, | |
scope: t.Optional[ScopeType] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: bool = True, | |
verify_signature: bool = True, | |
) -> Grant: | |
"""Send a refresh token grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
This method should not be used for used for access tokens retrieved from client_credentials grant according to RFC6749 (Section 4.4.3). | |
Arguments: | |
client_id: OIDC client ID. | |
refresh_token: The value of the refresh token. | |
scope: Space delimited list of strings, or iterable of strings. | |
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False. | |
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
return await self.oidc_token_request( | |
client_id=client_id, | |
grant_type="refresh_token", | |
refresh_token=refresh_token, | |
scope=scope, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
async def oidc_client_credentials_grant( | |
self, | |
client_id: str, | |
client_secret: str, | |
scope: t.Optional[ScopeType] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: bool = True, | |
verify_signature: bool = True, | |
) -> Grant: | |
"""Send a client credential grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
Arguments: | |
client_id: OIDC client ID. | |
client_secret: The value of the OIDC client secret. | |
scope: Space delimited list of strings, or iterable of strings. | |
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False. | |
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
return await self.oidc_token_request( | |
client_id=client_id, | |
grant_type="client_credentials", | |
client_secret=client_secret, | |
scope=scope, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
async def oidc_browser_login( | |
self, | |
client_id: str, | |
scope: t.Optional[ScopeType] = None, | |
state: t.Optional[str] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: bool = True, | |
verify_signature: bool = True, | |
bind: str = "localhost:8000", | |
) -> Grant: | |
"""Log in using authorization code grant flow. | |
- Start an HTTP server in background | |
- Generate an authorization URl according to well-known URI, client ID and scope | |
- Open a navigator and visit the authorization URL | |
- User can log in using OIDC provider login page | |
- User is redirected to a page served by the temporary HTTP server | |
- User access token is retrieved from query parameters by the HTTP server | |
- Stop the HTTP server | |
- Return user access token | |
Arguments: | |
client_id: OIDC client ID. | |
scope: Space delimited list of strings, or iterable of strings. | |
state: An opaque value used by the client to maintain state between the request and callback. | |
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False. | |
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False. | |
verify_signature: Do not verify the JWT signature when False. True by default. | |
verify_audience: Do no verify the access token audience when False. True by default. | |
""" | |
import asyncio | |
import webbrowser | |
from fastapi.exceptions import HTTPException | |
from fastapi.requests import Request | |
from fastapi.responses import PlainTextResponse | |
from fastapi.routing import APIRouter | |
from quara.containers.http import HTTPContainer | |
prefix = "/oauth" | |
bind = "localhost:8000" | |
redirect_uri = f"http://{bind}{prefix}" | |
router = APIRouter(prefix=prefix) | |
future: "asyncio.Future[Grant]" = asyncio.get_running_loop().create_future() | |
@router.get("/") | |
async def extract_token(request: Request) -> PlainTextResponse: | |
try: | |
if "error" in request.query_params: | |
raise HTTPException( | |
500, detail=request.query_params.get("error_description") | |
) | |
grant = await self.oidc_authorization_code_grant( | |
client_id=client_id, | |
code=request.query_params["code"], | |
redirect_uri=redirect_uri, | |
state=state, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
except BaseException as err: | |
future.set_exception(err) | |
raise | |
else: | |
future.set_result(grant) | |
return PlainTextResponse("Successfully logged in. You can close this page.") | |
container = HTTPContainer( | |
http_routers=[router], | |
config={"server_settings": {"bind": bind, "log_level": "critical"}}, | |
) | |
async with container: | |
url = self.get_authorization_url( | |
client_id=client_id, redirect_uri=redirect_uri, scope=scope, state=state | |
) | |
webbrowser.open_new(url) | |
return await future | |
class OIDCClientAuthenticator: | |
def __init__( | |
self, | |
OIDCAuthenticator: OIDCAuthenticator, | |
client_id: str, | |
client_secret: t.Optional[str] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: bool = True, | |
verify_signature: bool = True, | |
) -> None: | |
self.OIDCAuthenticator = OIDCAuthenticator | |
self.client_id = client_id | |
self.client_secret = client_secret or None | |
self.access_token_audience = access_token_audience or "account" | |
self.id_token_audience = id_token_audience or self.client_id | |
self.verify_audience = verify_audience | |
self.verify_signature = verify_signature | |
def set_access_token_audience(self, audience: str) -> "OIDCClientAuthenticator": | |
self.access_token_audience = audience | |
return self | |
def set_id_token_audience(self, audience: str) -> "OIDCClientAuthenticator": | |
self.id_token_audience = audience | |
return self | |
def set_client_secret(self, secret: str) -> "OIDCClientAuthenticator": | |
self.client_secret = secret | |
return self | |
def decode_access_token( | |
self, | |
token: str, | |
audience: t.Optional[str] = None, | |
verify_signature: t.Optional[bool] = None, | |
verify_audience: t.Optional[bool] = None, | |
) -> AccessToken: | |
"""Decode an access token using issuer public key. | |
Access token hold informations regarding user identity, just like ID tokens, | |
but they also hold authorization information such as: | |
- realm access (roles) | |
- resource access (roles) | |
- scopes | |
By default JWT signature is verified and audience is verified. | |
""" | |
audience = audience or self.access_token_audience | |
if verify_audience is None: | |
verify_audience = self.verify_audience | |
if verify_signature is None: | |
verify_signature = self.verify_signature | |
return self.OIDCAuthenticator.decode_access_token( | |
token=token, | |
audience=audience, | |
verify_signature=verify_signature, | |
verify_audience=verify_audience, | |
) | |
def decode_id_token( | |
self, | |
token: str, | |
audience: t.Optional[str] = None, | |
verify_signature: t.Optional[bool] = None, | |
verify_audience: t.Optional[bool] = None, | |
) -> IDToken: | |
"""Decode an ID token using issuer public key. | |
ID tokens can be used to prove that user is AUTHENTICATED. | |
It holds information regarding IDENTITY of the user. | |
They should never be used for authorization purpose. | |
Use the access token instead. | |
By default JWT signature is verified and audience is verified. | |
""" | |
audience = audience or self.id_token_audience | |
if verify_audience is None: | |
verify_audience = self.verify_audience | |
if verify_signature is None: | |
verify_signature = self.verify_signature | |
return self.OIDCAuthenticator.decode_id_token( | |
token=token, | |
audience=audience, | |
verify_signature=verify_signature, | |
verify_audience=verify_audience, | |
) | |
async def oidc_password_grant( | |
self, | |
username: str, | |
password: str, | |
scope: t.Optional[ScopeType] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: t.Optional[bool] = None, | |
verify_signature: t.Optional[bool] = None, | |
) -> Grant: | |
"""Send a password grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
Arguments: | |
username: Name of user. | |
password: Password of user. | |
scope: Space delimited list of strings, or iterable of strings. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
access_token_audience = access_token_audience or self.access_token_audience | |
id_token_audience = id_token_audience or self.id_token_audience | |
if verify_audience is None: | |
verify_audience = self.verify_audience | |
if verify_signature is None: | |
verify_signature = self.verify_signature | |
return await self.OIDCAuthenticator.oidc_password_grant( | |
client_id=self.client_id, | |
username=username, | |
password=password, | |
scope=scope, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
async def oidc_authorization_code_grant( | |
self, | |
code: str, | |
redirect_uri: str, | |
state: t.Optional[str] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: t.Optional[bool] = None, | |
verify_signature: t.Optional[bool] = None, | |
) -> Grant: | |
"""Send an authorization code grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
This method is mostly useful when an HTTP server is listening for requests. | |
Arguments: | |
code: The value of authorization code received as request query param. | |
redirect_uri: The exact redirect URI used when generating the authorization URL visited to obtain authorization code. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
access_token_audience = access_token_audience or self.access_token_audience | |
id_token_audience = id_token_audience or self.id_token_audience | |
if verify_audience is None: | |
verify_audience = self.verify_audience | |
if verify_signature is None: | |
verify_signature = self.verify_signature | |
return await self.OIDCAuthenticator.oidc_authorization_code_grant( | |
client_id=self.client_id, | |
code=code, | |
redirect_uri=redirect_uri, | |
state=state, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
async def oidc_refresh_token_grant( | |
self, | |
refresh_token: str, | |
scope: t.Optional[ScopeType] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: t.Optional[bool] = None, | |
verify_signature: t.Optional[bool] = None, | |
) -> Grant: | |
"""Send a refresh token grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
This method should not be used for used for access tokens retrieved from client_credentials grant according to RFC6749 (Section 4.4.3). | |
Arguments: | |
refresh_token: The value of the refresh token. | |
scope: Space delimited list of strings, or iterable of strings. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
access_token_audience = access_token_audience or self.access_token_audience | |
id_token_audience = id_token_audience or self.id_token_audience | |
if verify_audience is None: | |
verify_audience = self.verify_audience | |
if verify_signature is None: | |
verify_signature = self.verify_signature | |
return await self.OIDCAuthenticator.oidc_refresh_token_grant( | |
client_id=self.client_id, | |
refresh_token=refresh_token, | |
scope=scope, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
async def oidc_client_credentials_grant( | |
self, | |
client_secret: t.Optional[str] = None, | |
scope: t.Optional[ScopeType] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: t.Optional[bool] = None, | |
verify_signature: t.Optional[bool] = None, | |
) -> Grant: | |
"""Send a client credential grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token. | |
Arguments: | |
client_id: OIDC client ID. | |
client_secret: The value of the OIDC client secret. | |
scope: Space delimited list of strings, or iterable of strings. | |
Returns: | |
A Grant instance holding access token. | |
""" | |
secret = client_secret or self.client_secret | |
if secret is None: | |
raise ValueError("Client secret must be provided") | |
access_token_audience = access_token_audience or self.access_token_audience | |
id_token_audience = id_token_audience or self.id_token_audience | |
if verify_audience is None: | |
verify_audience = self.verify_audience | |
if verify_signature is None: | |
verify_signature = self.verify_signature | |
return await self.OIDCAuthenticator.oidc_client_credentials_grant( | |
client_id=self.client_id, | |
client_secret=secret, | |
scope=scope, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
) | |
async def oidc_browser_login( | |
self, | |
scope: t.Optional[ScopeType] = None, | |
state: t.Optional[str] = None, | |
access_token_audience: t.Optional[str] = None, | |
id_token_audience: t.Optional[str] = None, | |
verify_audience: t.Optional[bool] = None, | |
verify_signature: t.Optional[bool] = None, | |
bind: str = "localhost:8000", | |
) -> Grant: | |
"""Log in using authorization code grant flow. | |
- Start an HTTP server in background | |
- Generate an authorization URl according to well-known URI, client ID and scope | |
- Open a navigator and visit the authorization URL | |
- User can log in using OIDC provider login page | |
- User is redirected to a page served by the temporary HTTP server | |
- User access token is retrieved from query parameters by the HTTP server | |
- Stop the HTTP server | |
- Return user access token | |
Arguments: | |
scope: Optional OIDC scope. | |
""" | |
access_token_audience = access_token_audience or self.access_token_audience | |
id_token_audience = id_token_audience or self.id_token_audience | |
if verify_audience is None: | |
verify_audience = self.verify_audience | |
if verify_signature is None: | |
verify_signature = self.verify_signature | |
return await self.OIDCAuthenticator.oidc_browser_login( | |
client_id=self.client_id, | |
scope=scope, | |
state=state, | |
access_token_audience=access_token_audience, | |
id_token_audience=id_token_audience, | |
verify_audience=verify_audience, | |
verify_signature=verify_signature, | |
bind=bind, | |
) | |
if __name__ == "__main__": | |
# Create an OIDCAuthenticator | |
auth = OIDCAuthenticator(well_known_uri=WELL_KNOWN) | |
# Create a client | |
client = auth.get_client("quara-cli") | |
async def main() -> Grant: | |
"""Test to obtain OIDC JWT using OIDCAuthenticator client.""" | |
async with auth: | |
# Login using a browser | |
grant = await client.oidc_browser_login() | |
# Login using user/pass | |
# grant = await client.oidc_password_grant( | |
# username="USERNAME", password="PASSWORD" | |
# ) | |
# Login using client credentials | |
# grant = await client.oidc_client_credentials_grant(client_secret="SECRET") | |
# Login using refresh token | |
assert grant.refresh_token | |
return await client.oidc_refresh_token_grant( | |
refresh_token=grant.refresh_token | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment