Skip to content

Instantly share code, notes, and snippets.

@sidoh
Created September 30, 2019 08:11
Show Gist options
  • Save sidoh/479708a8f5a314075b573a9982e333ec to your computer and use it in GitHub Desktop.
Save sidoh/479708a8f5a314075b573a9982e333ec to your computer and use it in GitHub Desktop.
Custom HomeAssistant auth provider
import logging
import jwt
from urllib.parse import urlparse
import urllib.request
from collections import OrderedDict
from typing import Any, Dict, Optional, cast
import voluptuous as vol
from homeassistant.exceptions import HomeAssistantError
from homeassistant.core import callback
from . import AuthProvider, AUTH_PROVIDER_SCHEMA, AUTH_PROVIDERS, LoginFlow
from ..models import Credentials, UserMeta
_LOGGER = logging.getLogger(__name__)
CONF_PUBLIC_KEY = "public_key"
CONF_COOKIE_NAME = "cookie_name"
CONF_AUDIENCE = "audience"
CONF_USERNAME_CLAIM_KEY = "username_claim_key"
CONFIG_SCHEMA = AUTH_PROVIDER_SCHEMA.extend(
{
vol.Required(CONF_PUBLIC_KEY): str,
vol.Required(CONF_AUDIENCE): str,
vol.Optional(CONF_COOKIE_NAME, default="AccessToken"): str,
vol.Optional(CONF_USERNAME_CLAIM_KEY, default="preferred_username"): str
}
)
class InvalidAuthError(HomeAssistantError):
"""Raised when submitting invalid authentication."""
@AUTH_PROVIDERS.register("access_token")
class AccessTokenAuthProvider(AuthProvider):
"""Logs in users from an access token stored in the cookie"""
DEFAULT_TITLE = "Access Token"
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
async def async_login_flow(self, context: Optional[Dict]) -> LoginFlow:
"""Return a flow to login."""
assert context is not None
cookie_name = self.config[CONF_COOKIE_NAME]
cookies = context.get("cookies")
access_token = None
if cookie_name in cookies:
access_token = cookies[cookie_name]
return AccessTokenLoginFlow(self, access_token)
async def async_validate_access(self, access_token: str) -> None:
"""Validate an access token"""
user = None
key = self.config[CONF_PUBLIC_KEY]
audience = self.config[CONF_AUDIENCE]
if access_token is None:
_LOGGER.info("Tried to authenticate when no access token was provided.")
raise InvalidAuthError("No access token present")
else:
try:
claims = jwt.decode(access_token, key, audience=audience)
except jwt.exceptions.InvalidTokenError:
raise InvalidAuthError("Invalid access token")
return claims
async def async_get_or_create_credentials(
self, flow_result: Dict[str, str]
) -> Credentials:
"""Get credentials based on the flow result."""
# Extracts the username from the JWT claims. We default to the "sub" claim,
# which should always be set.
username = flow_result.get(self.config[CONF_USERNAME_CLAIM_KEY], flow_result["sub"])
for credential in await self.async_credentials():
if credential.data["username"] == username:
return credential
# Create new credentials.
return self.async_create_credentials({"username": username})
async def async_user_meta_for_credentials(
self, credentials: Credentials
) -> UserMeta:
"""Return extra user metadata for credentials.
Will be used to populate info when creating a new user.
"""
username = credentials.data["username"]
return UserMeta(name=username, is_active=True)
class AccessTokenLoginFlow(LoginFlow):
"""Handler for the login flow."""
def __init__(
self,
auth_provider: AccessTokenAuthProvider,
access_token: None,
) -> None:
"""Initialize the login flow"""
super().__init__(auth_provider)
self._access_token = access_token
async def async_step_init(
self, user_input: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""Handle the step of the form."""
errors = {}
if user_input is not None:
try:
result = await cast(
AccessTokenAuthProvider, self._auth_provider
).async_validate_access(self._access_token)
except InvalidAuthError:
errors["base"] = "invalid_auth"
if not errors:
return await self.async_finish(result)
return self.async_show_form(
step_id="init", data_schema=vol.Schema(OrderedDict()), errors=errors
)
homeassistant:
auth_providers:
- type: access_token
key_domain: https://auth.mydomain.com
audience: https://homeassistant.mydomain.com
username_claim_key: email
public_key: !secret jwt_public_key
@jaecktec
Copy link

Hey, could you add a note on how I can register a custom auth provider? :)

@HenryLoenwind
Copy link

Just a warning: Backup your HA before fiddling with this. If it comes up with a failing auth provider, it will trash your configuration by removing all your integrations and helpers.

Don't ask me how I know this...

Also, I couldn't get it to recognise my provider either. It only complained that there was no such provider, not that there was something wrong with it.

@jaecktec
Copy link

I fiddled with this with a local setup. Goal was to extract the Auth information from cloudflare tunnels. However in the end I gave up, since I saw no possible way how to support api keys which I needed for my garmin watch

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment