Custom HomeAssistant auth provider
import logging | |
import jwt | |
from urllib.parse import urlparse | |
import urllib.request | |
from collections import OrderedDict | |
from typing import Any, Dict, Optional, cast | |
import voluptuous as vol | |
from homeassistant.exceptions import HomeAssistantError | |
from homeassistant.core import callback | |
from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow | |
from ..models import Credentials, UserMeta | |
_LOGGER = logging.getLogger(__name__) | |
CONF_PUBLIC_KEY = "public_key" | |
CONF_COOKIE_NAME = "cookie_name" | |
CONF_AUDIENCE = "audience" | |
CONF_USERNAME_CLAIM_KEY = "username_claim_key" | |
CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend( | |
{ | |
vol.Required(CONF_PUBLIC_KEY): str, | |
vol.Required(CONF_AUDIENCE): str, | |
vol.Optional(CONF_COOKIE_NAME, default="AccessToken"): str, | |
vol.Optional(CONF_USERNAME_CLAIM_KEY, default="preferred_username"): str | |
} | |
) | |
class InvalidAuthError(HomeAssistantError): | |
"""Raised when submitting invalid authentication.""" | |
@AUTH_PROVIDERS.register("access_token") | |
class AccessTokenAuthProvider(AuthProvider): | |
"""Logs in users from an access token stored in the cookie""" | |
DEFAULT_TITLE = "Access Token" | |
def __init__(self, *args, **kwargs) -> None: | |
super().__init__(*args, **kwargs) | |
async def async_login_flow(self, context: Optional[Dict]) -> LoginFlow: | |
"""Return a flow to login.""" | |
assert context is not None | |
cookie_name = self.config[CONF_COOKIE_NAME] | |
cookies = context.get("cookies") | |
access_token = None | |
if cookie_name in cookies: | |
access_token = cookies[cookie_name] | |
return AccessTokenLoginFlow(self, access_token) | |
async def async_validate_access(self, access_token: str) -> None: | |
"""Validate an access token""" | |
user = None | |
key = self.config[CONF_PUBLIC_KEY] | |
audience = self.config[CONF_AUDIENCE] | |
if access_token is None: | |
_LOGGER.info("Tried to authenticate when no access token was provided.") | |
raise InvalidAuthError("No access token present") | |
else: | |
try: | |
claims = jwt.decode(access_token, key, audience=audience) | |
except jwt.exceptions.InvalidTokenError: | |
raise InvalidAuthError("Invalid access token") | |
return claims | |
async def async_get_or_create_credentials( | |
self, flow_result: Dict[str, str] | |
) -> Credentials: | |
"""Get credentials based on the flow result.""" | |
# Extracts the username from the JWT claims. We default to the "sub" claim, | |
# which should always be set. | |
username = flow_result.get(self.config[CONF_USERNAME_CLAIM_KEY], flow_result["sub"]) | |
for credential in await self.async_credentials(): | |
if credential.data["username"] == username: | |
return credential | |
# Create new credentials. | |
return self.async_create_credentials({"username": username}) | |
async def async_user_meta_for_credentials( | |
self, credentials: Credentials | |
) -> UserMeta: | |
"""Return extra user metadata for credentials. | |
Will be used to populate info when creating a new user. | |
""" | |
username = credentials.data["username"] | |
return UserMeta(name=username, is_active=True) | |
class AccessTokenLoginFlow(LoginFlow): | |
"""Handler for the login flow.""" | |
def __init__( | |
self, | |
auth_provider: AccessTokenAuthProvider, | |
access_token: None, | |
) -> None: | |
"""Initialize the login flow""" | |
super().__init__(auth_provider) | |
self._access_token = access_token | |
async def async_step_init( | |
self, user_input: Optional[Dict[str, str]] = None | |
) -> Dict[str, Any]: | |
"""Handle the step of the form.""" | |
errors = {} | |
if user_input is not None: | |
try: | |
result = await cast( | |
AccessTokenAuthProvider, self._auth_provider | |
).async_validate_access(self._access_token) | |
except InvalidAuthError: | |
errors["base"] = "invalid_auth" | |
if not errors: | |
return await self.async_finish(result) | |
return self.async_show_form( | |
step_id="init", data_schema=vol.Schema(OrderedDict()), errors=errors | |
) |
homeassistant: | |
auth_providers: | |
- type: access_token | |
key_domain: https://auth.mydomain.com | |
audience: https://homeassistant.mydomain.com | |
username_claim_key: email | |
public_key: !secret jwt_public_key |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment