8 Commits

Author SHA1 Message Date
Christiaan Goossens
7cc960e4db Bump to rc3 (#249) 2026-04-15 12:08:36 +02:00
Christiaan Goossens
07c1e3a4c4 Fix regression of storeToken parameter (#248)
* Try a different method to set ?storeToken

* Formatting

* Only insert storeToken on web client & fix tests
2026-04-15 12:07:19 +02:00
Christiaan Goossens
0ca300c385 Add tests for other signing methods (#246)
* Add tests for other signing methods #151

* Add doc for list source
2026-04-14 15:29:06 +02:00
Christiaan Goossens
a9483e2038 Change build script to align with HACS (#245)
* Change build script to align with HACS

* Fix path typo
2026-04-14 14:42:13 +02:00
Christiaan Goossens
6f1d2bcb3f Switch to creating releases by tag (#244) 2026-04-14 14:26:02 +02:00
Christiaan Goossens
67f58a39aa Better tag matching (#243)
* Better tag matching

* Split PR and release flows

* Undo PR archiving
2026-04-14 14:17:09 +02:00
Christiaan Goossens
ddb2952e64 Release with autogenerated zip files (#242)
* Try autobuilding

* Typo fix

* Entire components dir

* Directly upload zip
2026-04-14 13:55:09 +02:00
Christiaan Goossens
baf3ac6b5a Fixes for known bugs in v1.0.0-rc1 (#241)
* Fix #238 for same-site cookies

* Redirect in Python + bump to rc2
2026-04-14 09:43:58 +02:00
20 changed files with 602 additions and 166 deletions

21
.github/workflows/build-pr.yaml vendored Normal file
View File

@@ -0,0 +1,21 @@
name: Build pull request artifact
on:
pull_request:
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Build
run: scripts/build
- name: Upload Artifact
uses: actions/upload-artifact@v7
with:
path: ./hass-oidc-auth.zip
archive: false

27
.github/workflows/release.yaml vendored Normal file
View File

@@ -0,0 +1,27 @@
name: Build and create draft release
on:
push:
tags:
- v*.*.*
jobs:
release:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v6
- name: Build
run: scripts/build
- name: Create or update draft release with ZIP
uses: softprops/action-gh-release@v3
with:
draft: true
fail_on_unmatched_files: true
generate_release_notes: true
files: ./hass-oidc-auth.zip

3
.gitignore vendored
View File

@@ -111,5 +111,6 @@ dmypy.json
.pytest_logs.log
# Build NPM
node_modules
custom_components/auth_oidc/static/style.css

View File

@@ -157,6 +157,6 @@ async def _setup_oidc_provider(hass: HomeAssistant, my_config: dict, display_nam
_LOGGER.info("Registered OIDC views")
# Inject OIDC code into the frontend for /auth/authorize for automatic redirect
await OIDCInjectedAuthPage.inject(hass)
await OIDCInjectedAuthPage.inject(hass, force_https)
return True

View File

@@ -8,9 +8,7 @@ from typing import Any, Dict
DEFAULT_TITLE = "OpenID Connect (SSO)"
DOMAIN = "auth_oidc"
REPO_ROOT_URL = (
"https://github.com/christiaangoossens/hass-oidc-auth/tree/v1.0.0-rc1"
)
REPO_ROOT_URL = "https://github.com/christiaangoossens/hass-oidc-auth/tree/v1.0.0-rc3"
## ===
## Config keys

View File

@@ -61,11 +61,9 @@ class OIDCFinishView(HomeAssistantView):
if "?" in redirect_uri:
separator = "&"
# Redirect to this new URL for login
new_url = (
redirect_uri + separator + "storeToken=true&skip_oidc_redirect=true"
)
raise web.HTTPFound(location=new_url)
# Redirect to this new URL for login, make sure to skip OIDC to prevent loops
redirect_uri = f"{redirect_uri}{separator}skip_oidc_redirect=true"
raise web.HTTPFound(location=redirect_uri)
# Check if we can link this device
linked = await self.oidc_provider.async_link_state_to_code(

View File

@@ -1,12 +1,18 @@
"""Injected authorization page, replacing the original"""
import base64
import logging
from functools import partial
from homeassistant.components.http import HomeAssistantView, StaticPathConfig
from homeassistant.core import HomeAssistant
from urllib.parse import quote, unquote
from aiohttp import web
from aiofiles import open as async_open
from homeassistant.components.http import HomeAssistantView, StaticPathConfig
from homeassistant.core import HomeAssistant
from .welcome import PATH as WELCOME_PATH
from ..tools.helpers import get_url
PATH = "/auth/authorize"
_LOGGER = logging.getLogger(__name__)
@@ -18,7 +24,7 @@ async def read_file(path: str) -> str:
return await f.read()
async def frontend_injection(hass: HomeAssistant) -> None:
async def frontend_injection(hass: HomeAssistant, force_https: bool) -> None:
"""Inject new frontend code into /auth/authorize."""
router = hass.http.app.router
frontend_path = None
@@ -61,7 +67,7 @@ async def frontend_injection(hass: HomeAssistant) -> None:
frontend_code = await read_file(frontend_path)
# Inject JS and register that route
injection_js = "<script src='/auth/oidc/static/injection.js?v=4'></script>"
injection_js = "<script src='/auth/oidc/static/injection.js?v=6'></script>"
frontend_code = frontend_code.replace("</body>", f"{injection_js}</body>")
await hass.http.async_register_static_paths(
@@ -80,7 +86,7 @@ async def frontend_injection(hass: HomeAssistant) -> None:
)
# If everything is succesful, register a fake view that just returns the modified HTML
hass.http.register_view(OIDCInjectedAuthPage(frontend_code))
hass.http.register_view(OIDCInjectedAuthPage(frontend_code, force_https))
_LOGGER.info("Performed OIDC frontend injection")
@@ -91,18 +97,49 @@ class OIDCInjectedAuthPage(HomeAssistantView):
url = PATH
name = "auth:oidc:authorize_page"
def __init__(self, html: str) -> None:
def __init__(self, html: str, force_https: bool) -> None:
"""Initialize the injected auth page."""
self.html = html
self.force_https = force_https
@staticmethod
async def inject(hass: HomeAssistant) -> None:
async def inject(hass: HomeAssistant, force_https: bool) -> None:
"""Inject the OIDC auth page into the frontend."""
try:
await frontend_injection(hass)
await frontend_injection(hass, force_https)
except Exception as e: # pylint: disable=broad-except
_LOGGER.error("Failed to inject OIDC auth page: %s", e)
async def get(self, _) -> web.Response:
"""Return the screen"""
@staticmethod
def _should_do_oidc_redirect(req: web.Request) -> bool:
"""Check if we should redirect to the OIDC flow."""
# Set when we return from finish
if req.query.get("skip_oidc_redirect") == "true":
return False
# Set whenever you directly do /?skip_oidc_redirect=true,
# for example when you click the "other" button on the welcome screen
redirect_uri = req.query.get("redirect_uri")
if not redirect_uri:
return False
# Handle both encoded and plain redirect_uri values.
decoded_redirect_uri = unquote(redirect_uri)
return "skip_oidc_redirect=true" not in decoded_redirect_uri
def _get_welcome_redirect_location(self, req: web.Request) -> str:
"""Build the welcome URL for the injected auth page redirect."""
encoded_current_url = quote(
base64.b64encode(str(req.url).encode("utf-8")).decode("ascii")
)
return get_url(
f"{WELCOME_PATH}?redirect_uri={encoded_current_url}",
self.force_https,
)
async def get(self, req: web.Request) -> web.Response:
"""Return the original page or redirect into the OIDC flow."""
if self._should_do_oidc_redirect(req):
raise web.HTTPFound(location=self._get_welcome_redirect_location(req))
return web.Response(text=self.html, content_type="text/html")

View File

@@ -1,8 +1,9 @@
"""Welcome route to show the user the OIDC login button and give instructions."""
from ast import List
import base64
import binascii
from urllib.parse import urlparse, parse_qs, unquote
from urllib.parse import urlparse, parse_qs, unquote, urlencode
from aiohttp import web
from homeassistant.components.http import HomeAssistantView
from ..tools.helpers import error_response, get_url, template_response
@@ -30,13 +31,46 @@ class OIDCWelcomeView(HomeAssistantView):
self.force_https = force_https
self.has_other_auth_providers = has_other_auth_providers
def determine_if_mobile(self, redirect_uri: str) -> bool:
"""Determine if the client is a mobile client based on the redirect_uri."""
oauth2_url = urlparse(redirect_uri)
client_id = parse_qs(oauth2_url.query).get("client_id")
async def _process_url(self, redirect_uri: str) -> List[str, bool]:
"""Processes the redirect URI to determine if we need setTokens and if this is mobile."""
# decodeURIComponent(btoa(...)) -> unquote first, then base64 decode
redirect_uri = base64.b64decode(unquote(redirect_uri), validate=True).decode(
"utf-8"
)
# If the client_id starts with https://home-assistant.io/ we assume it's a mobile client
return bool(client_id and client_id[0].startswith("https://home-assistant.io/"))
oauth2_url = urlparse(redirect_uri)
oauth2_query = parse_qs(oauth2_url.query)
client_id = oauth2_query.get("client_id")[0]
original_redirect_uri = oauth2_query.get("redirect_uri")[0]
# If the client_id starts with https://home-assistant.io/
# we assume it's a mobile client
# Android = https://home-assistant.io/Android,
# iOS = https://home-assistant.io/iOS
is_mobile = client_id.startswith("https://home-assistant.io/")
# Check if we appear to be signing in to the web version,
# for which we want to store tokens.
# We don't want to set storeTokens on sign-in to Google for instance
base_url = get_url("/", self.force_https)
is_web_client = original_redirect_uri.startswith(base_url)
if is_web_client:
# Adjust the original_redirect_uri to include the storeTokens parameter
separator = "?"
if "?" in original_redirect_uri:
separator = "&"
original_redirect_uri = f"{original_redirect_uri}{separator}storeToken=true"
oauth2_query.update({"redirect_uri": original_redirect_uri})
# Create new redirect_uri with the updated query parameters
new_oauth2_url = oauth2_url._replace(
query=urlencode(oauth2_query, doseq=True)
)
redirect_uri = new_oauth2_url.geturl()
return redirect_uri, is_mobile
async def get(self, req: web.Request) -> web.Response:
"""Receive response."""
@@ -44,23 +78,26 @@ class OIDCWelcomeView(HomeAssistantView):
# Get the query parameter with the redirect_uri
redirect_uri = req.query.get("redirect_uri")
# If set, determine if this is a mobile client based on the redirect_uri,
# otherwise assume it's not mobile
# Do some processing on the redirect_uri to correct it
# and determine if this is a mobile client.
if redirect_uri:
try:
# decodeURIComponent(btoa(...)) -> unquote first, then base64 decode
redirect_uri = base64.b64decode(
unquote(redirect_uri), validate=True
).decode("utf-8")
is_mobile = self.determine_if_mobile(redirect_uri)
except (binascii.Error, UnicodeDecodeError, ValueError):
redirect_uri, is_mobile = await self._process_url(redirect_uri)
except (
binascii.Error,
UnicodeDecodeError,
ValueError,
KeyError,
TypeError,
):
return await error_response(
"Invalid redirect_uri, please restart login."
)
else:
# Backwards compatibility with older versions that directly go to /auth/oidc/welcome
# If not set, redirect back to the main page and assume that this is a web client
redirect_uri = get_url("/", self.force_https)
redirect_uri = get_url("/?storeToken=true", self.force_https)
is_mobile = False
# Create OIDC state with the redirect_uri so we can use it later in the flow

View File

@@ -16,8 +16,7 @@
"requirements": [
"aiofiles",
"jinja2",
"bcrypt",
"joserfc"
],
"version": "1.0.0-rc1"
"version": "1.0.0-rc3"
}

View File

@@ -6,7 +6,6 @@ import logging
from typing import Dict, Optional
import asyncio
import bcrypt
from homeassistant.auth import EVENT_USER_ADDED
from homeassistant.auth.providers import (
AUTH_PROVIDERS,
@@ -236,7 +235,7 @@ class OpenIDAuthProvider(AuthProvider):
# Keep cookie lifetime aligned with state lifetime in storage (5 minutes).
"set-cookie": f"{COOKIE_NAME}="
+ state_id
+ "; Path=/auth/; SameSite=Strict; HttpOnly; Max-Age=300"
+ "; Path=/auth/; SameSite=Lax; HttpOnly; Max-Age=300"
+ secure_flag,
}
@@ -367,14 +366,6 @@ class OpenIdLoginFlow(LoginFlow):
"""Handler for the login flow."""
async def _finalize_user(self, state_id: str) -> AuthFlowResult:
# Verify a dummy hash to make it last a bit longer
# as security measure (limits the amount of attempts you have in 5 min)
# Similar to what the HomeAssistant auth provider does
dummy = b"$2b$12$CiuFGszHx9eNHxPuQcwBWez4CwDTOcLTX5CbOpV6gef2nYuXkY7BO"
bcrypt.checkpw(b"foo", dummy)
# Actually look up the auth provider after,
# this doesn't take a lot of time (regardless of it's in there or not)
sub = await self._auth_provider.async_get_subject(state_id)
if sub:
return await self.async_finish(
@@ -396,11 +387,10 @@ class OpenIdLoginFlow(LoginFlow):
state_cookie = req.cookies.get(COOKIE_NAME)
if state_cookie:
_LOGGER.debug("State cookie found on login: %s", state_cookie)
try:
return await self._finalize_user(state_cookie)
except InvalidAuthError:
pass
return self.async_abort(reason="oidc_cookie_invalid")
# If no cookie is found, abort.
# User should either be redirected or start manually on the welcome

View File

@@ -1,58 +1,19 @@
/**
* OIDC Frontend Redirect injection script
* This script is injected because the 'hass-oidc-auth' custom component is active.
* hass-oidc-auth - UX script to automatically select the Home Assistant auth provider when the "Login aborted" message is shown.
*/
function attempt_oidc_redirect() {
// Get URL parameters
const urlParams = new URLSearchParams(window.location.search);
let authFlowElement = null
// Check if we have skip_oidc_redirect directly here
if (urlParams.get('skip_oidc_redirect') === 'true') {
// No console log because this is intended behavior
return;
}
const originalUrl = urlParams.get('redirect_uri');
if (!originalUrl) {
console.warn('[OIDC] No OAuth2 redirect_uri parameter found in the URL. Frontend redirect cancelled.');
return;
}
try {
// Parse the redirect URI
const redirectUrl = new URL(originalUrl);
// Check if redirect URI has a query parameter to stop OIDC injection
if (redirectUrl.searchParams.get('skip_oidc_redirect') === 'true') {
// No console log because this is intended behavior
return;
}
} catch (error) {
console.error('[OIDC] Invalid redirect_uri parameter:', error);
}
window.stop(); // Stop loading the current page before redirecting
// Redirect to the OIDC auth URL
const base64encodeUrl = btoa(window.location.href);
const oidcAuthUrl = '/auth/oidc/welcome?redirect_uri=' + encodeURIComponent(base64encodeUrl);
window.location.href = oidcAuthUrl;
}
function click_alternative_provider_instead() {
setTimeout(() => {
// Find ha-auth-flow
const authFlowElement = document.querySelector('ha-auth-flow');
function update() {
// Find ha-auth-flow
authFlowElement = document.querySelector('ha-auth-flow');
if (!authFlowElement) {
console.warn("[OIDC] ha-auth-flow element not found. Not automatically selecting HA provider.");
return;
}
// Check if the text "Login aborted" is present on the page
if (!authFlowElement.innerText.includes('Login aborted')) {
console.warn("[OIDC] 'Login aborted' text not found. Not automatically selecting HA provider.");
return;
}
@@ -60,7 +21,6 @@ function click_alternative_provider_instead() {
const authProviderElement = document.querySelector('ha-pick-auth-provider');
if (!authProviderElement) {
console.warn("[OIDC] ha-pick-auth-provider not found. Not automatically selecting HA provider.");
return;
}
@@ -72,11 +32,30 @@ function click_alternative_provider_instead() {
}
firstListItem.click();
}, 500);
}
// Run OIDC injection upon load
(() => {
attempt_oidc_redirect();
click_alternative_provider_instead();
})();
// Hide the content until ready
let ready = false
document.querySelector(".content").style.display = "none"
const observer = new MutationObserver((mutationsList, observer) => {
update();
if (!ready) {
ready = Boolean(authFlowElement)
if (ready) {
document.querySelector(".content").style.display = ""
}
}
})
observer.observe(document.body, { childList: true, subtree: true })
setTimeout(() => {
if (!ready) {
console.warn("[hass-oidc-auth]: Document was not ready after 300ms seconds, showing content anyway.")
}
// Force display the content
document.querySelector(".content").style.display = "";
}, 300)

File diff suppressed because one or more lines are too long

View File

@@ -1,6 +1,8 @@
{
"name": "OpenID Connect",
"name": "OpenID Connect/SSO Authentication",
"hide_default_branch": true,
"render_readme": true,
"homeassistant": "2025.11"
"homeassistant": "2025.11",
"zip_release": true,
"filename": "hass-oidc-auth.zip"
}

View File

@@ -9,7 +9,6 @@ license = "MIT"
dependencies = [
"aiofiles~=25.1",
"jinja2~=3.1",
"bcrypt~=5.0",
"joserfc~=1.6.0",
]
readme = "README.md"

10
scripts/build Executable file
View File

@@ -0,0 +1,10 @@
#! /bin/bash
# Build the plugin CSS
npm install --frozen-lockfile
npm run css
# Create zip from the custom_components/auth_oidc directory
# HACS wants only the contents of this dir in a zip
cd custom_components/auth_oidc/
zip -r ../../hass-oidc-auth.zip ./*

View File

@@ -93,7 +93,7 @@ async def test_provider_cookie_header_sets_secure_when_requested(hass: HomeAssis
provider = hass.auth.get_auth_providers(DOMAIN)[0]
cookie_header = provider.get_cookie_header("state-id", secure=True)["set-cookie"]
assert "SameSite=Strict" in cookie_header
assert "SameSite=Lax" in cookie_header
assert "HttpOnly" in cookie_header
assert "Secure" in cookie_header
@@ -342,5 +342,36 @@ async def test_login_with_invalid_cookie_aborts(hass: HomeAssistant):
result = await flow.async_step_init({})
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "oidc_cookie_invalid"
@pytest.mark.asyncio
async def test_login_with_no_cookie_aborts(hass: HomeAssistant):
"""Missing cookie should fail closed."""
await setup(
hass,
{
CLIENT_ID: "dummy",
DISCOVERY_URL: MockOIDCServer.get_discovery_url(),
FEATURES: {
FEATURES_AUTOMATIC_PERSON_CREATION: False,
FEATURES_AUTOMATIC_USER_LINKING: False,
},
},
True,
)
provider = hass.auth.get_auth_providers(DOMAIN)[0]
flow = await provider.async_login_flow({})
fake_request = SimpleNamespace(cookies={}, remote="127.0.0.1")
with patch(
"custom_components.auth_oidc.provider.http.current_request"
) as current_request:
current_request.get.return_value = fake_request
result = await flow.async_step_init({})
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "no_oidc_cookie_found"

View File

@@ -103,10 +103,10 @@ async def verify_back_redirect(client, expected_redirect_uri: str):
"""Verify that POST to finish without body redirects back to the original redirect_uri."""
resp_finish_post = await client.post("/auth/oidc/finish", allow_redirects=False)
assert resp_finish_post.status == 302
assert (
resp_finish_post.headers["Location"]
== unquote(expected_redirect_uri) + "&storeToken=true&skip_oidc_redirect=true"
)
location = resp_finish_post.headers["Location"]
assert location.startswith(unquote(expected_redirect_uri))
assert "skip_oidc_redirect=true" in location
async def listen_for_sse_events(

View File

@@ -23,6 +23,25 @@ from custom_components.auth_oidc.tools.oidc_client import (
http_raise_for_status,
)
# List from https://jose.authlib.org/en/guide/algorithms/#json-web-signature
ALL_ID_TOKEN_SIGNING_ALGORITHMS = (
"HS256",
"HS384",
"HS512",
"RS256",
"RS384",
"RS512",
"ES256",
"ES384",
"ES512",
"PS256",
"PS384",
"PS512",
"ES256K",
"Ed25519",
"Ed448",
)
def make_client(hass: HomeAssistant, **kwargs) -> OIDCClient:
"""Build an OIDC client with explicit defaults for unit testing."""
@@ -67,6 +86,59 @@ def make_signed_hs256_jwt(secret: str, claims: dict) -> str:
return jwt.encode({"alg": "HS256"}, claims, jwk_obj)
def build_real_signed_token(
algorithm: str, claims: dict, secret: str
) -> tuple[str, dict]:
"""Build a real signed token and matching JWKS payload for a given algorithm."""
if algorithm.startswith("HS"):
signing_key = jwk.import_key(
{
"kty": "oct",
"k": base64.urlsafe_b64encode(secret.encode()).decode().rstrip("="),
"alg": algorithm,
}
)
token = jwt.encode(
{"alg": algorithm}, claims, signing_key, algorithms=[algorithm]
)
return token, {"keys": []}
if algorithm in ("RS256", "RS384", "RS512", "PS256", "PS384", "PS512"):
key = jwk.generate_key(
"RSA", 2048, {"alg": algorithm, "use": "sig"}, private=True, auto_kid=True
)
elif algorithm in ("ES256", "ES384", "ES512", "ES256K"):
curve = {
"ES256": "P-256",
"ES384": "P-384",
"ES512": "P-521",
"ES256K": "secp256k1",
}[algorithm]
key = jwk.generate_key(
"EC", curve, {"alg": algorithm, "use": "sig"}, private=True, auto_kid=True
)
elif algorithm in ("Ed25519", "Ed448"):
key = jwk.generate_key(
"OKP",
algorithm,
{"alg": algorithm, "use": "sig"},
private=True,
auto_kid=True,
)
else:
raise ValueError(f"Unsupported test algorithm: {algorithm}")
kid = key.kid
token = jwt.encode(
{"alg": algorithm, "kid": kid},
claims,
key,
algorithms=[algorithm],
)
public_key = key.as_dict(private=False)
return token, {"keys": [public_key]}
@pytest.mark.asyncio
async def test_complete_token_flow_rejects_missing_state(hass: HomeAssistant):
"""Flow state must exist; missing state should fail closed."""
@@ -447,6 +519,62 @@ async def test_parse_id_token_rejects_invalid_registered_claims(hass: HomeAssist
assert parsed is None
@pytest.mark.asyncio
@pytest.mark.parametrize("algorithm", ALL_ID_TOKEN_SIGNING_ALGORITHMS)
async def test_parse_id_token_validates_real_signed_tokens_and_decode_inputs(
hass: HomeAssistant, algorithm: str
):
"""Use real signatures and verify token/key/algorithm passed into joserfc."""
secret = "top-secret-value"
client_kwargs = {"id_token_signing_alg": algorithm}
if algorithm.startswith("HS"):
client_kwargs["client_secret"] = secret
client = make_client(hass, **client_kwargs)
client.discovery_document = {
"issuer": "https://issuer",
"jwks_uri": "https://issuer/jwks",
}
now = int(time.time())
claims = {
"sub": "subject-1",
"aud": "test-client",
"iss": "https://issuer",
"nbf": now,
"iat": now,
"exp": now + 3600,
}
token, jwks_payload = build_real_signed_token(algorithm, claims, secret)
with (
patch.object(client, "_fetch_jwks", new=AsyncMock(return_value=jwks_payload)),
patch(
"custom_components.auth_oidc.tools.oidc_client.jwt.decode",
wraps=jwt.decode,
) as decode_spy,
patch(
"custom_components.auth_oidc.tools.oidc_client.jwk.import_key",
wraps=jwk.import_key,
) as import_key_spy,
):
parsed = await client._parse_id_token(token)
assert parsed == claims
decode_spy.assert_called_once()
assert decode_spy.call_args.args[0] == token
assert decode_spy.call_args.kwargs["algorithms"] == [algorithm]
import_key_spy.assert_called()
imported_key_payload = import_key_spy.call_args.args[0]
assert imported_key_payload["alg"] == algorithm
if algorithm.startswith("HS"):
assert imported_key_payload["kty"] == "oct"
else:
assert imported_key_payload["kid"] is not None
@pytest.mark.asyncio
async def test_get_authorization_url_returns_none_when_discovery_fails(
hass: HomeAssistant,

View File

@@ -2,8 +2,11 @@
import base64
import os
from urllib.parse import parse_qs, quote, unquote, urlparse, urlencode
from unittest.mock import AsyncMock, MagicMock, patch
from auth_oidc.config.const import DISCOVERY_URL, CLIENT_ID
from pytest_homeassistant_custom_component.typing import ClientSessionGenerator
import pytest
from homeassistant.core import HomeAssistant
@@ -16,14 +19,19 @@ from custom_components.auth_oidc.endpoints.injected_auth_page import (
frontend_injection,
)
WEB_CLIENT_ID = "https://example.com"
MOBILE_CLIENT_ID = "https://home-assistant.io/Android"
def create_redirect_uri(client_id: str) -> str:
"""Build a redirect URI that includes a client_id query parameter."""
return f"http://example.com/auth/authorize?client_id={client_id}"
params = {
"response_type": "code",
"redirect_uri": client_id,
"client_id": client_id,
"state": "example",
}
return f"http://example.com/auth/authorize?{urlencode(params)}"
def encode_redirect_uri(redirect_uri: str) -> str:
@@ -45,8 +53,26 @@ async def setup(
assert result
async def setup_mock_authorize_route(hass: HomeAssistant) -> None:
"""Register a mock /auth/authorize page so frontend injection can hook into it."""
await async_setup_component(hass, HTTP_DOMAIN, {})
mock_html_path = os.path.join(os.path.dirname(__file__), "mocks", "auth_page.html")
await hass.http.async_register_static_paths(
[
StaticPathConfig(
"/auth/authorize",
mock_html_path,
cache_headers=False,
)
]
)
@pytest.mark.asyncio
async def test_welcome_page_registration(hass: HomeAssistant, hass_client):
async def test_welcome_page_registration(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that welcome page is present."""
await setup(hass)
@@ -57,7 +83,9 @@ async def test_welcome_page_registration(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio
async def test_redirect_page_registration(hass: HomeAssistant, hass_client):
async def test_redirect_page_registration(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that redirect page can be reached."""
await setup(hass)
@@ -72,7 +100,7 @@ async def test_redirect_page_registration(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio
async def test_welcome_rejects_invalid_encoded_redirect_uri(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Welcome should reject malformed base64 redirect_uri values."""
await setup(hass)
@@ -87,12 +115,104 @@ async def test_welcome_rejects_invalid_encoded_redirect_uri(
@pytest.mark.asyncio
async def test_welcome_sets_strict_state_cookie_flags(hass: HomeAssistant, hass_client):
@pytest.mark.parametrize(
"redirect_uri",
[
"http://example.com/auth/authorize?client_id=https://example.com",
"http://example.com/auth/authorize?redirect_uri=https://example.com",
],
)
async def test_welcome_rejects_redirect_uris_missing_required_query_params(
hass: HomeAssistant, hass_client: ClientSessionGenerator, redirect_uri: str
):
"""Welcome should reject redirect URIs that decode but are incomplete."""
await setup(hass)
client = await hass_client()
encoded = encode_redirect_uri(redirect_uri)
resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
allow_redirects=False,
)
assert resp.status == 400
assert "Invalid redirect_uri, please restart login." in await resp.text()
@pytest.mark.asyncio
@pytest.mark.parametrize(
("client_id", "should_store_token", "is_mobile"),
[
("", True, False),
(MOBILE_CLIENT_ID, False, True),
("https://random.example", False, False),
],
)
async def test_welcome_only_adds_store_token_for_web_clients(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
client_id: str,
should_store_token: bool,
is_mobile: bool,
):
"""Welcome should only append storeToken for clients aligned with the base URL."""
await setup(hass)
captured_redirect_uri = {}
async def fake_create_state(state_redirect_uri: str, *_args):
captured_redirect_uri["value"] = state_redirect_uri
return "state-id"
with (
patch(
"custom_components.auth_oidc.provider.OpenIDAuthProvider.async_create_state",
new=AsyncMock(side_effect=fake_create_state),
),
patch(
"custom_components.auth_oidc.provider.OpenIDAuthProvider.async_generate_device_code",
new=AsyncMock(return_value="123456"),
),
):
client = await hass_client()
if client_id == "":
# If not present, set it to the root URL to
# emulate the normal website/Lovelace/dashboard
client_id = str(client.make_url("/?test=true"))
redirect_uri = create_redirect_uri(client_id)
encoded = encode_redirect_uri(redirect_uri)
resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
allow_redirects=False,
)
assert resp.status in (200, 302)
assert "value" in captured_redirect_uri
parsed_state_redirect = urlparse(captured_redirect_uri["value"])
state_redirect_query = parse_qs(parsed_state_redirect.query)
nested_redirect_uri = unquote(state_redirect_query["redirect_uri"][0])
if should_store_token:
assert "storeToken=true" in nested_redirect_uri
else:
assert "storeToken=true" not in nested_redirect_uri
if is_mobile:
assert "https://home-assistant.io/" in nested_redirect_uri
@pytest.mark.asyncio
async def test_welcome_sets_secure_state_cookie_flags(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Welcome should set secure cookie flags for the OIDC state cookie."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp = await client.get(
@@ -105,14 +225,14 @@ async def test_welcome_sets_strict_state_cookie_flags(hass: HomeAssistant, hass_
set_cookie = resp.headers.get("Set-Cookie", "")
assert "Path=/auth/" in set_cookie
assert "SameSite=Strict" in set_cookie
assert "SameSite=Lax" in set_cookie
assert "HttpOnly" in set_cookie
assert "Max-Age=300" in set_cookie
@pytest.mark.asyncio
async def test_welcome_mobile_device_code_generation_failure(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Welcome should error if device code generation fails for mobile clients."""
await setup(hass)
@@ -137,13 +257,13 @@ async def test_welcome_mobile_device_code_generation_failure(
@pytest.mark.asyncio
async def test_welcome_shows_alternative_sign_in_link_when_other_providers_exist(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Welcome should render fallback auth link when other providers are present."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -158,7 +278,7 @@ async def test_welcome_shows_alternative_sign_in_link_when_other_providers_exist
@pytest.mark.asyncio
async def test_welcome_desktop_auto_redirects_without_other_providers(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Welcome should auto-redirect desktop clients when no other providers exist."""
@@ -167,7 +287,7 @@ async def test_welcome_desktop_auto_redirects_without_other_providers(
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -179,7 +299,7 @@ async def test_welcome_desktop_auto_redirects_without_other_providers(
@pytest.mark.asyncio
async def test_redirect_without_cookie_goes_to_welcome(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Redirect endpoint should bounce to welcome when no state cookie exists."""
await setup(hass)
@@ -192,13 +312,13 @@ async def test_redirect_without_cookie_goes_to_welcome(
@pytest.mark.asyncio
async def test_redirect_shows_error_on_oidc_runtime_error(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Redirect should show a configuration error when OIDC URL generation raises."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -220,13 +340,13 @@ async def test_redirect_shows_error_on_oidc_runtime_error(
@pytest.mark.asyncio
async def test_redirect_shows_error_when_auth_url_empty(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Redirect should show error page if OIDC returns no authorization URL."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -247,7 +367,9 @@ async def test_redirect_shows_error_when_auth_url_empty(
@pytest.mark.asyncio
async def test_callback_registration(hass: HomeAssistant, hass_client):
async def test_callback_registration(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that callback page is reachable."""
await setup(hass)
@@ -258,12 +380,14 @@ async def test_callback_registration(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio
async def test_callback_rejects_missing_code_or_state(hass: HomeAssistant, hass_client):
async def test_callback_rejects_missing_code_or_state(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Callback must reject requests missing either code or state."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -287,12 +411,14 @@ async def test_callback_rejects_missing_code_or_state(hass: HomeAssistant, hass_
@pytest.mark.asyncio
async def test_callback_rejects_state_mismatch(hass: HomeAssistant, hass_client):
async def test_callback_rejects_state_mismatch(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Callback must reject state mismatch to protect against CSRF."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -310,13 +436,13 @@ async def test_callback_rejects_state_mismatch(hass: HomeAssistant, hass_client)
@pytest.mark.asyncio
async def test_callback_rejects_when_user_details_fetch_fails(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Callback should error when token exchange/userinfo retrieval fails."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -340,12 +466,14 @@ async def test_callback_rejects_when_user_details_fetch_fails(
@pytest.mark.asyncio
async def test_callback_rejects_invalid_role(hass: HomeAssistant, hass_client):
async def test_callback_rejects_invalid_role(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Callback should reject users marked with invalid role."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -378,7 +506,10 @@ async def test_callback_rejects_invalid_role(hass: HomeAssistant, hass_client):
],
)
async def test_finish_requires_state_cookie(
hass: HomeAssistant, hass_client, method: str, data: dict | None
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
method: str,
data: dict | None,
):
"""Finish endpoint should require the OIDC state cookie for both GET and POST."""
await setup(hass)
@@ -395,12 +526,14 @@ async def test_finish_requires_state_cookie(
@pytest.mark.asyncio
async def test_finish_post_rejects_invalid_state(hass: HomeAssistant, hass_client):
async def test_finish_post_rejects_invalid_state(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Finish POST should error when the state cookie does not resolve to redirect_uri."""
await setup(hass)
client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID)
redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -418,7 +551,9 @@ async def test_finish_post_rejects_invalid_state(hass: HomeAssistant, hass_clien
@pytest.mark.asyncio
async def test_device_sse_requires_state_cookie(hass: HomeAssistant, hass_client):
async def test_device_sse_requires_state_cookie(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""SSE endpoint should reject requests without state cookie."""
await setup(hass)
@@ -430,7 +565,7 @@ async def test_device_sse_requires_state_cookie(hass: HomeAssistant, hass_client
@pytest.mark.asyncio
async def test_device_sse_emits_expired_for_unknown_state(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""SSE should emit expired when the state can no longer be resolved."""
await setup(hass)
@@ -455,7 +590,9 @@ async def test_device_sse_emits_expired_for_unknown_state(
@pytest.mark.asyncio
async def test_device_sse_emits_timeout(hass: HomeAssistant, hass_client):
async def test_device_sse_emits_timeout(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""SSE should emit timeout if the polling window is exceeded."""
await setup(hass)
@@ -493,7 +630,7 @@ async def test_device_sse_emits_timeout(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio
async def test_device_sse_handles_runtime_error_and_returns_cleanly(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""SSE should swallow runtime errors from stream loop and finish response."""
await setup(hass)
@@ -523,7 +660,7 @@ async def test_device_sse_handles_runtime_error_and_returns_cleanly(
@pytest.mark.asyncio
async def test_device_sse_ignores_write_eof_connection_reset(
hass: HomeAssistant, hass_client
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""SSE should ignore ConnectionResetError while closing the stream."""
await setup(hass)
@@ -553,29 +690,20 @@ async def test_device_sse_ignores_write_eof_connection_reset(
# Test the frontend injection
@pytest.mark.asyncio
async def test_frontend_injection(hass: HomeAssistant, hass_client):
async def test_frontend_injection(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that frontend injection works."""
# Because there is no frontend in the test setup,
# we'll have to fake /auth/authorize for the changes to register
await async_setup_component(hass, HTTP_DOMAIN, {})
mock_html_path = os.path.join(os.path.dirname(__file__), "mocks", "auth_page.html")
await hass.http.async_register_static_paths(
[
StaticPathConfig(
"/auth/authorize",
mock_html_path,
cache_headers=False,
)
]
)
# we'll have to fake /auth/authorize for the changes to register.
await setup_mock_authorize_route(hass)
await setup(hass)
client = await hass_client()
resp = await client.get("/auth/authorize", allow_redirects=False)
assert resp.status == 200
assert resp.status == 200 # 200 because there is no redirect_uri
text = await resp.text()
assert "<script src='/auth/oidc/static/injection.js" in text
@@ -606,7 +734,7 @@ async def test_frontend_injection_logs_and_returns_when_route_handler_is_unexpec
return iter([FakeRoute()])
with patch.object(hass.http.app.router, "resources", return_value=[FakeResource()]):
await frontend_injection(hass)
await frontend_injection(hass, force_https=False)
assert "Unexpected route handler type" in caplog.text
assert (
@@ -625,6 +753,61 @@ async def test_injected_auth_page_inject_logs_errors(hass: HomeAssistant, caplog
"custom_components.auth_oidc.endpoints.injected_auth_page.frontend_injection",
side_effect=RuntimeError("boom"),
):
await OIDCInjectedAuthPage.inject(hass)
await OIDCInjectedAuthPage.inject(hass, force_https=False)
assert "Failed to inject OIDC auth page: boom" in caplog.text
@pytest.mark.asyncio
async def test_injected_auth_page_redirects_to_welcome_when_not_skipped(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Injected auth page should redirect into OIDC when skip flags are absent."""
await setup_mock_authorize_route(hass)
await setup(hass)
client = await hass_client()
encoded_redirect_uri = quote(create_redirect_uri(client.make_url("/")), safe="")
resp = await client.get(
f"/auth/authorize?redirect_uri={encoded_redirect_uri}",
allow_redirects=False,
)
assert resp.status == 302
location = resp.headers["Location"]
parsed_location = urlparse(location)
assert parsed_location.path == "/auth/oidc/welcome"
query = parse_qs(parsed_location.query)
assert "redirect_uri" in query
original_url = base64.b64decode(unquote(query["redirect_uri"][0]), validate=True)
original_url = original_url.decode("utf-8")
assert "/auth/authorize?redirect_uri=" in original_url
@pytest.mark.asyncio
@pytest.mark.parametrize(
"request_target",
[
"/auth/authorize?skip_oidc_redirect=true",
"/auth/authorize?redirect_uri=http%3A%2F%2Fexample.com%2Fauth%2Fauthorize%3Fskip_oidc_redirect%3Dtrue",
],
)
async def test_injected_auth_page_returns_original_html_when_skipped(
hass: HomeAssistant,
hass_client,
request_target: str,
):
"""Injected auth page should render HTML when redirect suppression is requested."""
await setup_mock_authorize_route(hass)
await setup(hass)
client = await hass_client()
response = await client.get(request_target, allow_redirects=False)
assert response.status == 200
assert "<script src='/auth/oidc/static/injection.js" in await response.text()

2
uv.lock generated
View File

@@ -958,7 +958,6 @@ version = "1.0.0"
source = { editable = "." }
dependencies = [
{ name = "aiofiles" },
{ name = "bcrypt" },
{ name = "jinja2" },
{ name = "joserfc" },
]
@@ -977,7 +976,6 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "aiofiles", specifier = "~=25.1" },
{ name = "bcrypt", specifier = "~=5.0" },
{ name = "jinja2", specifier = "~=3.1" },
{ name = "joserfc", specifier = "~=1.6.0" },
]