Created
September 30, 2019 08:11
-
-
Save sidoh/479708a8f5a314075b573a9982e333ec to your computer and use it in GitHub Desktop.
Custom HomeAssistant auth provider
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import jwt | |
from urllib.parse import urlparse | |
import urllib.request | |
from collections import OrderedDict | |
from typing import Any, Dict, Optional, cast | |
import voluptuous as vol | |
from homeassistant.exceptions import HomeAssistantError | |
from homeassistant.core import callback | |
from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow | |
from ..models import Credentials, UserMeta | |
_LOGGER = logging.getLogger(__name__) | |
CONF_PUBLIC_KEY = "public_key" | |
CONF_COOKIE_NAME = "cookie_name" | |
CONF_AUDIENCE = "audience" | |
CONF_USERNAME_CLAIM_KEY = "username_claim_key" | |
CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend( | |
{ | |
vol.Required(CONF_PUBLIC_KEY): str, | |
vol.Required(CONF_AUDIENCE): str, | |
vol.Optional(CONF_COOKIE_NAME, default="AccessToken"): str, | |
vol.Optional(CONF_USERNAME_CLAIM_KEY, default="preferred_username"): str | |
} | |
) | |
class InvalidAuthError(HomeAssistantError): | |
"""Raised when submitting invalid authentication.""" | |
@AUTH_PROVIDERS.register("access_token") | |
class AccessTokenAuthProvider(AuthProvider): | |
"""Logs in users from an access token stored in the cookie""" | |
DEFAULT_TITLE = "Access Token" | |
def __init__(self, *args, **kwargs) -> None: | |
super().__init__(*args, **kwargs) | |
async def async_login_flow(self, context: Optional[Dict]) -> LoginFlow: | |
"""Return a flow to login.""" | |
assert context is not None | |
cookie_name = self.config[CONF_COOKIE_NAME] | |
cookies = context.get("cookies") | |
access_token = None | |
if cookie_name in cookies: | |
access_token = cookies[cookie_name] | |
return AccessTokenLoginFlow(self, access_token) | |
async def async_validate_access(self, access_token: str) -> None: | |
"""Validate an access token""" | |
user = None | |
key = self.config[CONF_PUBLIC_KEY] | |
audience = self.config[CONF_AUDIENCE] | |
if access_token is None: | |
_LOGGER.info("Tried to authenticate when no access token was provided.") | |
raise InvalidAuthError("No access token present") | |
else: | |
try: | |
claims = jwt.decode(access_token, key, audience=audience) | |
except jwt.exceptions.InvalidTokenError: | |
raise InvalidAuthError("Invalid access token") | |
return claims | |
async def async_get_or_create_credentials( | |
self, flow_result: Dict[str, str] | |
) -> Credentials: | |
"""Get credentials based on the flow result.""" | |
# Extracts the username from the JWT claims. We default to the "sub" claim, | |
# which should always be set. | |
username = flow_result.get(self.config[CONF_USERNAME_CLAIM_KEY], flow_result["sub"]) | |
for credential in await self.async_credentials(): | |
if credential.data["username"] == username: | |
return credential | |
# Create new credentials. | |
return self.async_create_credentials({"username": username}) | |
async def async_user_meta_for_credentials( | |
self, credentials: Credentials | |
) -> UserMeta: | |
"""Return extra user metadata for credentials. | |
Will be used to populate info when creating a new user. | |
""" | |
username = credentials.data["username"] | |
return UserMeta(name=username, is_active=True) | |
class AccessTokenLoginFlow(LoginFlow): | |
"""Handler for the login flow.""" | |
def __init__( | |
self, | |
auth_provider: AccessTokenAuthProvider, | |
access_token: None, | |
) -> None: | |
"""Initialize the login flow""" | |
super().__init__(auth_provider) | |
self._access_token = access_token | |
async def async_step_init( | |
self, user_input: Optional[Dict[str, str]] = None | |
) -> Dict[str, Any]: | |
"""Handle the step of the form.""" | |
errors = {} | |
if user_input is not None: | |
try: | |
result = await cast( | |
AccessTokenAuthProvider, self._auth_provider | |
).async_validate_access(self._access_token) | |
except InvalidAuthError: | |
errors["base"] = "invalid_auth" | |
if not errors: | |
return await self.async_finish(result) | |
return self.async_show_form( | |
step_id="init", data_schema=vol.Schema(OrderedDict()), errors=errors | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
homeassistant: | |
auth_providers: | |
- type: access_token | |
key_domain: https://auth.mydomain.com | |
audience: https://homeassistant.mydomain.com | |
username_claim_key: email | |
public_key: !secret jwt_public_key |
Just a warning: Backup your HA before fiddling with this. If it comes up with a failing auth provider, it will trash your configuration by removing all your integrations and helpers.
Don't ask me how I know this...
Also, I couldn't get it to recognise my provider either. It only complained that there was no such provider, not that there was something wrong with it.
I fiddled with this with a local setup. Goal was to extract the Auth information from cloudflare tunnels. However in the end I gave up, since I saw no possible way how to support api keys which I needed for my garmin watch
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey, could you add a note on how I can register a custom auth provider? :)