Custom HomeAssistant auth provider
import logging
import jwt
from urllib.parse import urlparse
import urllib.request
from collections import OrderedDict
from typing import Any, Dict, Optional, cast
import voluptuous as vol
from homeassistant.exceptions import HomeAssistantError
from homeassistant.core import callback
from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow
from ..models import Credentials, UserMeta
_LOGGER = logging.getLogger(__name__)
CONF_PUBLIC_KEY = "public_key"
CONF_COOKIE_NAME = "cookie_name"
CONF_AUDIENCE = "audience"
CONF_USERNAME_CLAIM_KEY = "username_claim_key"
vol.Required(CONF_PUBLIC_KEY): str,
vol.Required(CONF_AUDIENCE): str,
vol.Optional(CONF_COOKIE_NAME, default="AccessToken"): str,
vol.Optional(CONF_USERNAME_CLAIM_KEY, default="preferred_username"): str
class InvalidAuthError(HomeAssistantError):
"""Raised when submitting invalid authentication."""
class AccessTokenAuthProvider(AuthProvider):
"""Logs in users from an access token stored in the cookie"""
DEFAULT_TITLE = "Access Token"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
async def async_login_flow(self, context: Optional[Dict]) -> LoginFlow:
"""Return a flow to login."""
assert context is not None
cookie_name = self.config[CONF_COOKIE_NAME]
cookies = context.get("cookies")
access_token = None
if cookie_name in cookies:
access_token = cookies[cookie_name]
return AccessTokenLoginFlow(self, access_token)
async def async_validate_access(self, access_token: str) -> None:
"""Validate an access token"""
user = None
key = self.config[CONF_PUBLIC_KEY]
audience = self.config[CONF_AUDIENCE]
if access_token is None:"Tried to authenticate when no access token was provided.")
raise InvalidAuthError("No access token present")
claims = jwt.decode(access_token, key, audience=audience)
except jwt.exceptions.InvalidTokenError:
raise InvalidAuthError("Invalid access token")
return claims
async def async_get_or_create_credentials(
self, flow_result: Dict[str, str]
) -> Credentials:
"""Get credentials based on the flow result."""
# Extracts the username from the JWT claims. We default to the "sub" claim,
# which should always be set.
username = flow_result.get(self.config[CONF_USERNAME_CLAIM_KEY], flow_result["sub"])
for credential in await self.async_credentials():
if["username"] == username:
return credential
# Create new credentials.
return self.async_create_credentials({"username": username})
async def async_user_meta_for_credentials(
self, credentials: Credentials
) -> UserMeta:
"""Return extra user metadata for credentials.
Will be used to populate info when creating a new user.
username =["username"]
return UserMeta(name=username, is_active=True)
class AccessTokenLoginFlow(LoginFlow):
"""Handler for the login flow."""
def __init__(
auth_provider: AccessTokenAuthProvider,
access_token: None,
) -> None:
"""Initialize the login flow"""
self._access_token = access_token
async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Handle the step of the form."""
errors = {}
if user_input is not None:
result = await cast(
AccessTokenAuthProvider, self._auth_provider
except InvalidAuthError:
errors["base"] = "invalid_auth"
if not errors:
return await self.async_finish(result)
return self.async_show_form(
step_id="init", data_schema=vol.Schema(OrderedDict()), errors=errors
- type: access_token
username_claim_key: email
public_key: !secret jwt_public_key
