Skip to content

Instantly share code, notes, and snippets.

@charbonnierg
Last active September 25, 2022 10:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save charbonnierg/c1da34d4184a47bbb585c20db87ec3d1 to your computer and use it in GitHub Desktop.
Save charbonnierg/c1da34d4184a47bbb585c20db87ec3d1 to your computer and use it in GitHub Desktop.
OIDC Authenticator
"""
Two classes:
- OIDCAuthenticator
- OIDCClientAuthenticator
Dependencies:
- PyJWT: API to decode JWT tokens
- cryptography: Cryptography algorithms
- pydantic: JSON validation library
- httpx: Async HTTP client
Default well-known: https://lemur-16.cloud-iam.com/auth/realms/quara-demo/.well-known/openid-configuration
Allowed Grant types:
- authorization_code
- refresh_token
- password
- client_credentials
Additional helper:
- Browser login
- Decode token
- Decode access token
- Decode ID token
"""
import types
import typing as t
import urllib.parse
import httpx
import jwt
import jwt.algorithms
import jwt.exceptions
from pydantic import BaseModel, Extra, Field
ScopeType = t.Union[str, t.List[str], t.Tuple[str], t.Set[str]]
WELL_KNOWN = "https://lemur-16.cloud-iam.com/auth/realms/quara-demo/.well-known/openid-configuration"
class UntrustedGrant(BaseModel, extra=Extra.allow):
"""JSON data received from OIDC token endpoint"""
access_token: str
id_token: t.Optional[str] = None
refresh_token: t.Optional[str] = None
class BaseJWT(BaseModel, extra=Extra.allow):
"""Common fields between OIDC ID Token and OAuth2 Access Token"""
acr: str
aud: str
azp: str
email: str
email_verified: bool
exp: int
family_name: str
given_name: str
iat: int
iss: str
jti: str
name: str
preferred_username: str
session_state: str
sid: str
sub: str
typ: str
class Access(BaseModel, extra=Extra.allow):
"""Access holds a list of roles"""
roles: t.List[str] = []
class AccessToken(BaseJWT):
"""Access token received from OIDC provider"""
typ: t.Literal["Bearer"]
allowed_origins: t.List[str] = Field(alias="allowed-origins")
realm_access: Access
resource_access: t.Dict[str, Access]
scope: str
class IDToken(BaseJWT):
"""ID token received from OIDC provider"""
typ: t.Literal["ID"]
at_hash: str
auth_time: int
class Grant(BaseModel, extra=Extra.ignore):
"""Validated data received from the OIDC token endpoint"""
access_token: AccessToken
id_token: t.Optional[IDToken] = None
refresh_token: t.Optional[str] = None
def write_scope(scope: t.Optional[ScopeType]) -> str:
"""Write scopes as a space-delimited string."""
if isinstance(scope, str):
scope = scope.split(" ")
elif scope is None:
scope = scope = []
else:
scope = list(scope)
if "offline_access" not in scope:
scope.insert(0, "offline_access")
if "email" not in scope:
scope.insert(0, "email")
if "profile" not in scope:
scope.insert(0, "profile")
if "openid" not in scope:
scope.insert(0, "openid")
return " ".join(scope)
class OIDCAuthenticator:
"""An OIDCAuthenticator communicates with an OIDC provider to perform one of the
following grants:
- Password grant
- Client credentials grant
- Authorization code grant
- Refresh grant
"""
def __init__(
self,
well_known_uri: str = WELL_KNOWN,
) -> None:
"""Create a new OIDCAuthenticator instance.
Arguments:
well_known_uri: An URL pointing to well known OIDC configuration.
"""
self.client = httpx.AsyncClient()
self.default_algorithms = jwt.algorithms.get_default_algorithms()
self.well_known_uri = well_known_uri
self.well_known: t.Dict[str, t.Any] = {}
self._issuer_public_key: t.Optional[t.Any] = None
self._algorithm: t.Optional[str] = None
def __getitem__(self, client_id: str) -> "OIDCClientAuthenticator":
return OIDCClientAuthenticator(self, client_id)
async def __aenter__(self) -> "OIDCAuthenticator":
await self.start()
return self
async def __aexit__(
self,
exc_type: t.Optional[BaseException] = None,
exc: t.Optional[BaseException] = None,
tb: t.Optional[types.TracebackType] = None,
) -> None:
await self.stop()
return None
async def start(self) -> None:
"""Start the OIDCAuthenticator."""
well_known = await self.client.get(self.well_known_uri)
well_known.raise_for_status()
self.well_known.update(well_known.json())
await self.refresh_issuer_public_key()
async def stop(self) -> None:
"""Stop the OIDCAuthenticator."""
await self.client.aclose()
def get_url(self, endpoint: str) -> str:
"""Get the URl for given endpoint.
Arguments:
endpoint: a valid endpoint name from the OpenID Connect Discovery 1.0 specification
Examples:
>>> async with OIDCAuthenticator() as auth:
>>> auth_url = auth.get_url("authorization_endpoint")
>>> token_url = auth.get_url("token_endpoint")
"""
try:
return str(self.well_known[endpoint])
except KeyError:
raise KeyError(f"No URL configured for endpoint: {endpoint}")
def get_authorization_url(
self,
client_id: str,
redirect_uri: str,
scope: t.Optional[ScopeType] = None,
state: t.Optional[str] = None,
response_type: str = "code",
) -> str:
"""
Get authorization URL to redirect the resource owner to.
https://tools.ietf.org/html/rfc6749#section-4.1.1
Arguments:
client_id: OIDC client ID
redirect_uri: Absolute URL of the client where the user-agent will be redirected to.
scope: Space delimited list of strings, or iterable of strings.
state: An opaque value used by the client to maintain state between the request and callback.
response_type: Use "code" to perform authorization grant, or "token" to perform implicit grant.
Return:
URL to redirect the resource owner to.
"""
optional_parameters: t.Dict[str, t.Any] = {}
if state:
optional_parameters["state"] = state
params = urllib.parse.urlencode(
{
"client_id": client_id,
"response_type": response_type,
"redirect_uri": redirect_uri,
"scope": write_scope(scope),
**optional_parameters,
}
)
url = self.get_url("authorization_endpoint")
return "{}?{}".format(url, params)
def get_client(
self,
client_id: str,
client_secret: t.Optional[str] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
) -> "OIDCClientAuthenticator":
"""Get an OIDCAuthenticator client dedicated to a single OIDC client."""
return OIDCClientAuthenticator(
self,
client_id=client_id,
client_secret=client_secret,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
)
async def refresh_issuer_public_key(self) -> None:
"""Fetch the issuer public key from remote OIDC provider."""
jwks_url = self.get_url("jwks_uri")
response = await self.client.get(jwks_url)
response.raise_for_status()
jwks: t.List[t.Dict[str, t.Any]] = response.json()["keys"]
for key in jwks:
if "alg" in key and key["alg"] in self.default_algorithms:
self._algorithm = key["alg"]
self._issuer_public_key = self.default_algorithms[
self._algorithm
].from_jwk(key)
break
else:
raise KeyError(
f"No public found with supported algorithm ({list(self.default_algorithms)})"
)
def get_algorithm(self) -> str:
"""Get algorithm used to validate JWT"""
if self._algorithm:
return self._algorithm
raise ValueError(
"Algorithm is not configured. Make sure OIDCAuthenticator is started."
)
def get_issuer_public_key(self) -> t.Any:
"""Get public key used to validate JWT."""
if self._issuer_public_key:
return self._issuer_public_key
raise ValueError(
"Issuer public key is not configured. Make sure OIDCAuthenticator is started."
)
def decode_token(
self,
token: str,
audience: t.Optional[str] = None,
verify_signature: bool = True,
verify_audience: bool = True,
) -> t.Dict[str, t.Any]:
"""Decode a JWT using issuer public key.
By default JWT signature is verified and audience is verified.
Arguments:
token: The token to decode as a string.
audience: Value to check for audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
Returns:
A dictionary holding fields found within JWT.
"""
key = self.get_issuer_public_key()
algorithm = self.get_algorithm()
options: t.Dict[str, t.Any] = {"verify_signature": verify_signature}
if verify_audience:
options["verify_aud"] = True
else:
options["verify_aud"] = False
return jwt.decode(
token,
key=key,
algorithms=[algorithm],
audience=audience,
options=options,
)
def decode_access_token(
self,
token: str,
audience: t.Optional[str] = None,
verify_signature: bool = True,
verify_audience: bool = True,
) -> AccessToken:
"""Decode an OIDC access token using issuer public key.
Access token hold informations regarding user identity, just like ID tokens,
but they also hold authorization information such as:
- realm access (roles)
- resource access (roles)
- scopes
Arguments:
token: The token to decode as a string.
audience: Value to check for audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
Returns:
An AccessToken instance.
"""
return AccessToken.parse_obj(
self.decode_token(
token,
audience=audience,
verify_signature=verify_signature,
verify_audience=verify_audience,
)
)
def decode_id_token(
self,
token: str,
audience: t.Optional[str] = None,
verify_signature: bool = True,
verify_audience: bool = True,
) -> IDToken:
"""Decode an OIDC ID token using issuer public key.
ID tokens can be used to prove that user is AUTHENTICATED.
It holds information regarding IDENTITY of the user.
They should never be used for authorization purpose.
Use the access token instead.
Arguments:
token: The token to decode as a string.
audience: Value to check for audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
Returns:
An IDToken instance.
"""
return IDToken.parse_obj(
self.decode_token(
token,
audience=audience,
verify_signature=verify_signature,
verify_audience=verify_audience,
)
)
async def oidc_token_request(
self,
client_id: str,
grant_type: str,
scope: t.Optional[ScopeType] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: bool = True,
verify_signature: bool = True,
**kwargs: t.Any,
) -> Grant:
"""Send a request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
Arguments:
client_id: OIDC client ID.
grant_type: One of "authorization_code", "refresh_token", "password", "client_credentials".
scope: Space delimited list of strings, or iterable of strings.
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False.
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
kwargs: Extra arguments specific to each grant type.
Returns:
A Grant instance holding access token.
"""
scope = write_scope(scope)
response = await self.client.post(
self.well_known["token_endpoint"],
data={
"client_id": client_id,
"grant_type": grant_type,
"scope": scope,
**kwargs,
},
)
response.raise_for_status()
untrusted = UntrustedGrant.parse_obj(response.json())
return Grant(
access_token=self.decode_access_token(
untrusted.access_token,
audience=access_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
),
id_token=self.decode_id_token(
untrusted.id_token,
audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
if untrusted.id_token
else None,
refresh_token=untrusted.refresh_token,
)
async def oidc_password_grant(
self,
client_id: str,
username: str,
password: str,
scope: t.Optional[ScopeType] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: bool = True,
verify_signature: bool = True,
) -> Grant:
"""Send a password grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
Arguments:
client_id: OIDC client ID.
username: Name of user.
password: Password of user.
scope: Space delimited list of strings, or iterable of strings.
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False.
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
Returns:
A Grant instance holding access token.
"""
return await self.oidc_token_request(
client_id=client_id,
grant_type="password",
scope=scope,
username=username,
password=password,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
async def oidc_authorization_code_grant(
self,
client_id: str,
code: str,
redirect_uri: str,
state: t.Optional[str] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: bool = True,
verify_signature: bool = True,
) -> Grant:
"""Send an authorization code grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
This method is mostly useful when an HTTP server is listening for requests.
Arguments:
client_id: OIDC client ID.
code: The value of authorization code received as request query param.
redirect_uri: The exact redirect URI used when generating the authorization URL visited to obtain authorization code.
state: An opaque value used by the client to maintain state between the request and callback.
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False.
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
Returns:
A Grant instance holding access token.
"""
optional_parameters: t.Dict[str, t.Any] = {}
if state:
optional_parameters["state"] = state
return await self.oidc_token_request(
client_id=client_id,
grant_type="authorization_code",
code=code,
redirect_uri=redirect_uri,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
**optional_parameters,
)
async def oidc_refresh_token_grant(
self,
client_id: str,
refresh_token: str,
scope: t.Optional[ScopeType] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: bool = True,
verify_signature: bool = True,
) -> Grant:
"""Send a refresh token grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
This method should not be used for used for access tokens retrieved from client_credentials grant according to RFC6749 (Section 4.4.3).
Arguments:
client_id: OIDC client ID.
refresh_token: The value of the refresh token.
scope: Space delimited list of strings, or iterable of strings.
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False.
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
Returns:
A Grant instance holding access token.
"""
return await self.oidc_token_request(
client_id=client_id,
grant_type="refresh_token",
refresh_token=refresh_token,
scope=scope,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
async def oidc_client_credentials_grant(
self,
client_id: str,
client_secret: str,
scope: t.Optional[ScopeType] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: bool = True,
verify_signature: bool = True,
) -> Grant:
"""Send a client credential grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
Arguments:
client_id: OIDC client ID.
client_secret: The value of the OIDC client secret.
scope: Space delimited list of strings, or iterable of strings.
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False.
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
Returns:
A Grant instance holding access token.
"""
return await self.oidc_token_request(
client_id=client_id,
grant_type="client_credentials",
client_secret=client_secret,
scope=scope,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
async def oidc_browser_login(
self,
client_id: str,
scope: t.Optional[ScopeType] = None,
state: t.Optional[str] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: bool = True,
verify_signature: bool = True,
bind: str = "localhost:8000",
) -> Grant:
"""Log in using authorization code grant flow.
- Start an HTTP server in background
- Generate an authorization URl according to well-known URI, client ID and scope
- Open a navigator and visit the authorization URL
- User can log in using OIDC provider login page
- User is redirected to a page served by the temporary HTTP server
- User access token is retrieved from query parameters by the HTTP server
- Stop the HTTP server
- Return user access token
Arguments:
client_id: OIDC client ID.
scope: Space delimited list of strings, or iterable of strings.
state: An opaque value used by the client to maintain state between the request and callback.
access_token_audience: Value to check for access token audience claim. Ignored when verify_audience is False.
id_token_audience: Value to check for id token audience claim. Ignored when verify_audience is False.
verify_signature: Do not verify the JWT signature when False. True by default.
verify_audience: Do no verify the access token audience when False. True by default.
"""
import asyncio
import webbrowser
from fastapi.exceptions import HTTPException
from fastapi.requests import Request
from fastapi.responses import PlainTextResponse
from fastapi.routing import APIRouter
from quara.containers.http import HTTPContainer
prefix = "/oauth"
bind = "localhost:8000"
redirect_uri = f"http://{bind}{prefix}"
router = APIRouter(prefix=prefix)
future: "asyncio.Future[Grant]" = asyncio.get_running_loop().create_future()
@router.get("/")
async def extract_token(request: Request) -> PlainTextResponse:
try:
if "error" in request.query_params:
raise HTTPException(
500, detail=request.query_params.get("error_description")
)
grant = await self.oidc_authorization_code_grant(
client_id=client_id,
code=request.query_params["code"],
redirect_uri=redirect_uri,
state=state,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
except BaseException as err:
future.set_exception(err)
raise
else:
future.set_result(grant)
return PlainTextResponse("Successfully logged in. You can close this page.")
container = HTTPContainer(
http_routers=[router],
config={"server_settings": {"bind": bind, "log_level": "critical"}},
)
async with container:
url = self.get_authorization_url(
client_id=client_id, redirect_uri=redirect_uri, scope=scope, state=state
)
webbrowser.open_new(url)
return await future
class OIDCClientAuthenticator:
def __init__(
self,
OIDCAuthenticator: OIDCAuthenticator,
client_id: str,
client_secret: t.Optional[str] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: bool = True,
verify_signature: bool = True,
) -> None:
self.OIDCAuthenticator = OIDCAuthenticator
self.client_id = client_id
self.client_secret = client_secret or None
self.access_token_audience = access_token_audience or "account"
self.id_token_audience = id_token_audience or self.client_id
self.verify_audience = verify_audience
self.verify_signature = verify_signature
def set_access_token_audience(self, audience: str) -> "OIDCClientAuthenticator":
self.access_token_audience = audience
return self
def set_id_token_audience(self, audience: str) -> "OIDCClientAuthenticator":
self.id_token_audience = audience
return self
def set_client_secret(self, secret: str) -> "OIDCClientAuthenticator":
self.client_secret = secret
return self
def decode_access_token(
self,
token: str,
audience: t.Optional[str] = None,
verify_signature: t.Optional[bool] = None,
verify_audience: t.Optional[bool] = None,
) -> AccessToken:
"""Decode an access token using issuer public key.
Access token hold informations regarding user identity, just like ID tokens,
but they also hold authorization information such as:
- realm access (roles)
- resource access (roles)
- scopes
By default JWT signature is verified and audience is verified.
"""
audience = audience or self.access_token_audience
if verify_audience is None:
verify_audience = self.verify_audience
if verify_signature is None:
verify_signature = self.verify_signature
return self.OIDCAuthenticator.decode_access_token(
token=token,
audience=audience,
verify_signature=verify_signature,
verify_audience=verify_audience,
)
def decode_id_token(
self,
token: str,
audience: t.Optional[str] = None,
verify_signature: t.Optional[bool] = None,
verify_audience: t.Optional[bool] = None,
) -> IDToken:
"""Decode an ID token using issuer public key.
ID tokens can be used to prove that user is AUTHENTICATED.
It holds information regarding IDENTITY of the user.
They should never be used for authorization purpose.
Use the access token instead.
By default JWT signature is verified and audience is verified.
"""
audience = audience or self.id_token_audience
if verify_audience is None:
verify_audience = self.verify_audience
if verify_signature is None:
verify_signature = self.verify_signature
return self.OIDCAuthenticator.decode_id_token(
token=token,
audience=audience,
verify_signature=verify_signature,
verify_audience=verify_audience,
)
async def oidc_password_grant(
self,
username: str,
password: str,
scope: t.Optional[ScopeType] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: t.Optional[bool] = None,
verify_signature: t.Optional[bool] = None,
) -> Grant:
"""Send a password grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
Arguments:
username: Name of user.
password: Password of user.
scope: Space delimited list of strings, or iterable of strings.
Returns:
A Grant instance holding access token.
"""
access_token_audience = access_token_audience or self.access_token_audience
id_token_audience = id_token_audience or self.id_token_audience
if verify_audience is None:
verify_audience = self.verify_audience
if verify_signature is None:
verify_signature = self.verify_signature
return await self.OIDCAuthenticator.oidc_password_grant(
client_id=self.client_id,
username=username,
password=password,
scope=scope,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
async def oidc_authorization_code_grant(
self,
code: str,
redirect_uri: str,
state: t.Optional[str] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: t.Optional[bool] = None,
verify_signature: t.Optional[bool] = None,
) -> Grant:
"""Send an authorization code grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
This method is mostly useful when an HTTP server is listening for requests.
Arguments:
code: The value of authorization code received as request query param.
redirect_uri: The exact redirect URI used when generating the authorization URL visited to obtain authorization code.
Returns:
A Grant instance holding access token.
"""
access_token_audience = access_token_audience or self.access_token_audience
id_token_audience = id_token_audience or self.id_token_audience
if verify_audience is None:
verify_audience = self.verify_audience
if verify_signature is None:
verify_signature = self.verify_signature
return await self.OIDCAuthenticator.oidc_authorization_code_grant(
client_id=self.client_id,
code=code,
redirect_uri=redirect_uri,
state=state,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
async def oidc_refresh_token_grant(
self,
refresh_token: str,
scope: t.Optional[ScopeType] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: t.Optional[bool] = None,
verify_signature: t.Optional[bool] = None,
) -> Grant:
"""Send a refresh token grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
This method should not be used for used for access tokens retrieved from client_credentials grant according to RFC6749 (Section 4.4.3).
Arguments:
refresh_token: The value of the refresh token.
scope: Space delimited list of strings, or iterable of strings.
Returns:
A Grant instance holding access token.
"""
access_token_audience = access_token_audience or self.access_token_audience
id_token_audience = id_token_audience or self.id_token_audience
if verify_audience is None:
verify_audience = self.verify_audience
if verify_signature is None:
verify_signature = self.verify_signature
return await self.OIDCAuthenticator.oidc_refresh_token_grant(
client_id=self.client_id,
refresh_token=refresh_token,
scope=scope,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
async def oidc_client_credentials_grant(
self,
client_secret: t.Optional[str] = None,
scope: t.Optional[ScopeType] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: t.Optional[bool] = None,
verify_signature: t.Optional[bool] = None,
) -> Grant:
"""Send a client credential grant request to the token endpoint to obtain an access token, an ID token, and optionally a refresh token.
Arguments:
client_id: OIDC client ID.
client_secret: The value of the OIDC client secret.
scope: Space delimited list of strings, or iterable of strings.
Returns:
A Grant instance holding access token.
"""
secret = client_secret or self.client_secret
if secret is None:
raise ValueError("Client secret must be provided")
access_token_audience = access_token_audience or self.access_token_audience
id_token_audience = id_token_audience or self.id_token_audience
if verify_audience is None:
verify_audience = self.verify_audience
if verify_signature is None:
verify_signature = self.verify_signature
return await self.OIDCAuthenticator.oidc_client_credentials_grant(
client_id=self.client_id,
client_secret=secret,
scope=scope,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
)
async def oidc_browser_login(
self,
scope: t.Optional[ScopeType] = None,
state: t.Optional[str] = None,
access_token_audience: t.Optional[str] = None,
id_token_audience: t.Optional[str] = None,
verify_audience: t.Optional[bool] = None,
verify_signature: t.Optional[bool] = None,
bind: str = "localhost:8000",
) -> Grant:
"""Log in using authorization code grant flow.
- Start an HTTP server in background
- Generate an authorization URl according to well-known URI, client ID and scope
- Open a navigator and visit the authorization URL
- User can log in using OIDC provider login page
- User is redirected to a page served by the temporary HTTP server
- User access token is retrieved from query parameters by the HTTP server
- Stop the HTTP server
- Return user access token
Arguments:
scope: Optional OIDC scope.
"""
access_token_audience = access_token_audience or self.access_token_audience
id_token_audience = id_token_audience or self.id_token_audience
if verify_audience is None:
verify_audience = self.verify_audience
if verify_signature is None:
verify_signature = self.verify_signature
return await self.OIDCAuthenticator.oidc_browser_login(
client_id=self.client_id,
scope=scope,
state=state,
access_token_audience=access_token_audience,
id_token_audience=id_token_audience,
verify_audience=verify_audience,
verify_signature=verify_signature,
bind=bind,
)
if __name__ == "__main__":
# Create an OIDCAuthenticator
auth = OIDCAuthenticator(well_known_uri=WELL_KNOWN)
# Create a client
client = auth.get_client("quara-cli")
async def main() -> Grant:
"""Test to obtain OIDC JWT using OIDCAuthenticator client."""
async with auth:
# Login using a browser
grant = await client.oidc_browser_login()
# Login using user/pass
# grant = await client.oidc_password_grant(
# username="USERNAME", password="PASSWORD"
# )
# Login using client credentials
# grant = await client.oidc_client_credentials_grant(client_secret="SECRET")
# Login using refresh token
assert grant.refresh_token
return await client.oidc_refresh_token_grant(
refresh_token=grant.refresh_token
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment