8 Commits

Author SHA1 Message Date
Christiaan Goossens
7cc960e4db Bump to rc3 (#249) 2026-04-15 12:08:36 +02:00
Christiaan Goossens
07c1e3a4c4 Fix regression of storeToken parameter (#248)
* Try a different method to set ?storeToken

* Formatting

* Only insert storeToken on web client & fix tests
2026-04-15 12:07:19 +02:00
Christiaan Goossens
0ca300c385 Add tests for other signing methods (#246)
* Add tests for other signing methods #151

* Add doc for list source
2026-04-14 15:29:06 +02:00
Christiaan Goossens
a9483e2038 Change build script to align with HACS (#245)
* Change build script to align with HACS

* Fix path typo
2026-04-14 14:42:13 +02:00
Christiaan Goossens
6f1d2bcb3f Switch to creating releases by tag (#244) 2026-04-14 14:26:02 +02:00
Christiaan Goossens
67f58a39aa Better tag matching (#243)
* Better tag matching

* Split PR and release flows

* Undo PR archiving
2026-04-14 14:17:09 +02:00
Christiaan Goossens
ddb2952e64 Release with autogenerated zip files (#242)
* Try autobuilding

* Typo fix

* Entire components dir

* Directly upload zip
2026-04-14 13:55:09 +02:00
Christiaan Goossens
baf3ac6b5a Fixes for known bugs in v1.0.0-rc1 (#241)
* Fix #238 for same-site cookies

* Redirect in Python + bump to rc2
2026-04-14 09:43:58 +02:00
20 changed files with 602 additions and 166 deletions

21
.github/workflows/build-pr.yaml vendored Normal file
View File

@@ -0,0 +1,21 @@
name: Build pull request artifact
on:
pull_request:
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Build
run: scripts/build
- name: Upload Artifact
uses: actions/upload-artifact@v7
with:
path: ./hass-oidc-auth.zip
archive: false

27
.github/workflows/release.yaml vendored Normal file
View File

@@ -0,0 +1,27 @@
name: Build and create draft release
on:
push:
tags:
- v*.*.*
jobs:
release:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- uses: actions/checkout@v6
- name: Build
run: scripts/build
- name: Create or update draft release with ZIP
uses: softprops/action-gh-release@v3
with:
draft: true
fail_on_unmatched_files: true
generate_release_notes: true
files: ./hass-oidc-auth.zip

3
.gitignore vendored
View File

@@ -111,5 +111,6 @@ dmypy.json
.pytest_logs.log .pytest_logs.log
# Build NPM
node_modules node_modules
custom_components/auth_oidc/static/style.css

View File

@@ -157,6 +157,6 @@ async def _setup_oidc_provider(hass: HomeAssistant, my_config: dict, display_nam
_LOGGER.info("Registered OIDC views") _LOGGER.info("Registered OIDC views")
# Inject OIDC code into the frontend for /auth/authorize for automatic redirect # Inject OIDC code into the frontend for /auth/authorize for automatic redirect
await OIDCInjectedAuthPage.inject(hass) await OIDCInjectedAuthPage.inject(hass, force_https)
return True return True

View File

@@ -8,9 +8,7 @@ from typing import Any, Dict
DEFAULT_TITLE = "OpenID Connect (SSO)" DEFAULT_TITLE = "OpenID Connect (SSO)"
DOMAIN = "auth_oidc" DOMAIN = "auth_oidc"
REPO_ROOT_URL = ( REPO_ROOT_URL = "https://github.com/christiaangoossens/hass-oidc-auth/tree/v1.0.0-rc3"
"https://github.com/christiaangoossens/hass-oidc-auth/tree/v1.0.0-rc1"
)
## === ## ===
## Config keys ## Config keys

View File

@@ -61,11 +61,9 @@ class OIDCFinishView(HomeAssistantView):
if "?" in redirect_uri: if "?" in redirect_uri:
separator = "&" separator = "&"
# Redirect to this new URL for login # Redirect to this new URL for login, make sure to skip OIDC to prevent loops
new_url = ( redirect_uri = f"{redirect_uri}{separator}skip_oidc_redirect=true"
redirect_uri + separator + "storeToken=true&skip_oidc_redirect=true" raise web.HTTPFound(location=redirect_uri)
)
raise web.HTTPFound(location=new_url)
# Check if we can link this device # Check if we can link this device
linked = await self.oidc_provider.async_link_state_to_code( linked = await self.oidc_provider.async_link_state_to_code(

View File

@@ -1,12 +1,18 @@
"""Injected authorization page, replacing the original""" """Injected authorization page, replacing the original"""
import base64
import logging import logging
from functools import partial from functools import partial
from homeassistant.components.http import HomeAssistantView, StaticPathConfig from urllib.parse import quote, unquote
from homeassistant.core import HomeAssistant
from aiohttp import web from aiohttp import web
from aiofiles import open as async_open from aiofiles import open as async_open
from homeassistant.components.http import HomeAssistantView, StaticPathConfig
from homeassistant.core import HomeAssistant
from .welcome import PATH as WELCOME_PATH
from ..tools.helpers import get_url
PATH = "/auth/authorize" PATH = "/auth/authorize"
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -18,7 +24,7 @@ async def read_file(path: str) -> str:
return await f.read() return await f.read()
async def frontend_injection(hass: HomeAssistant) -> None: async def frontend_injection(hass: HomeAssistant, force_https: bool) -> None:
"""Inject new frontend code into /auth/authorize.""" """Inject new frontend code into /auth/authorize."""
router = hass.http.app.router router = hass.http.app.router
frontend_path = None frontend_path = None
@@ -61,7 +67,7 @@ async def frontend_injection(hass: HomeAssistant) -> None:
frontend_code = await read_file(frontend_path) frontend_code = await read_file(frontend_path)
# Inject JS and register that route # Inject JS and register that route
injection_js = "<script src='/auth/oidc/static/injection.js?v=4'></script>" injection_js = "<script src='/auth/oidc/static/injection.js?v=6'></script>"
frontend_code = frontend_code.replace("</body>", f"{injection_js}</body>") frontend_code = frontend_code.replace("</body>", f"{injection_js}</body>")
await hass.http.async_register_static_paths( await hass.http.async_register_static_paths(
@@ -80,7 +86,7 @@ async def frontend_injection(hass: HomeAssistant) -> None:
) )
# If everything is succesful, register a fake view that just returns the modified HTML # If everything is succesful, register a fake view that just returns the modified HTML
hass.http.register_view(OIDCInjectedAuthPage(frontend_code)) hass.http.register_view(OIDCInjectedAuthPage(frontend_code, force_https))
_LOGGER.info("Performed OIDC frontend injection") _LOGGER.info("Performed OIDC frontend injection")
@@ -91,18 +97,49 @@ class OIDCInjectedAuthPage(HomeAssistantView):
url = PATH url = PATH
name = "auth:oidc:authorize_page" name = "auth:oidc:authorize_page"
def __init__(self, html: str) -> None: def __init__(self, html: str, force_https: bool) -> None:
"""Initialize the injected auth page.""" """Initialize the injected auth page."""
self.html = html self.html = html
self.force_https = force_https
@staticmethod @staticmethod
async def inject(hass: HomeAssistant) -> None: async def inject(hass: HomeAssistant, force_https: bool) -> None:
"""Inject the OIDC auth page into the frontend.""" """Inject the OIDC auth page into the frontend."""
try: try:
await frontend_injection(hass) await frontend_injection(hass, force_https)
except Exception as e: # pylint: disable=broad-except except Exception as e: # pylint: disable=broad-except
_LOGGER.error("Failed to inject OIDC auth page: %s", e) _LOGGER.error("Failed to inject OIDC auth page: %s", e)
async def get(self, _) -> web.Response: @staticmethod
"""Return the screen""" def _should_do_oidc_redirect(req: web.Request) -> bool:
"""Check if we should redirect to the OIDC flow."""
# Set when we return from finish
if req.query.get("skip_oidc_redirect") == "true":
return False
# Set whenever you directly do /?skip_oidc_redirect=true,
# for example when you click the "other" button on the welcome screen
redirect_uri = req.query.get("redirect_uri")
if not redirect_uri:
return False
# Handle both encoded and plain redirect_uri values.
decoded_redirect_uri = unquote(redirect_uri)
return "skip_oidc_redirect=true" not in decoded_redirect_uri
def _get_welcome_redirect_location(self, req: web.Request) -> str:
"""Build the welcome URL for the injected auth page redirect."""
encoded_current_url = quote(
base64.b64encode(str(req.url).encode("utf-8")).decode("ascii")
)
return get_url(
f"{WELCOME_PATH}?redirect_uri={encoded_current_url}",
self.force_https,
)
async def get(self, req: web.Request) -> web.Response:
"""Return the original page or redirect into the OIDC flow."""
if self._should_do_oidc_redirect(req):
raise web.HTTPFound(location=self._get_welcome_redirect_location(req))
return web.Response(text=self.html, content_type="text/html") return web.Response(text=self.html, content_type="text/html")

View File

@@ -1,8 +1,9 @@
"""Welcome route to show the user the OIDC login button and give instructions.""" """Welcome route to show the user the OIDC login button and give instructions."""
from ast import List
import base64 import base64
import binascii import binascii
from urllib.parse import urlparse, parse_qs, unquote from urllib.parse import urlparse, parse_qs, unquote, urlencode
from aiohttp import web from aiohttp import web
from homeassistant.components.http import HomeAssistantView from homeassistant.components.http import HomeAssistantView
from ..tools.helpers import error_response, get_url, template_response from ..tools.helpers import error_response, get_url, template_response
@@ -30,13 +31,46 @@ class OIDCWelcomeView(HomeAssistantView):
self.force_https = force_https self.force_https = force_https
self.has_other_auth_providers = has_other_auth_providers self.has_other_auth_providers = has_other_auth_providers
def determine_if_mobile(self, redirect_uri: str) -> bool: async def _process_url(self, redirect_uri: str) -> List[str, bool]:
"""Determine if the client is a mobile client based on the redirect_uri.""" """Processes the redirect URI to determine if we need setTokens and if this is mobile."""
oauth2_url = urlparse(redirect_uri) # decodeURIComponent(btoa(...)) -> unquote first, then base64 decode
client_id = parse_qs(oauth2_url.query).get("client_id") redirect_uri = base64.b64decode(unquote(redirect_uri), validate=True).decode(
"utf-8"
)
# If the client_id starts with https://home-assistant.io/ we assume it's a mobile client oauth2_url = urlparse(redirect_uri)
return bool(client_id and client_id[0].startswith("https://home-assistant.io/")) oauth2_query = parse_qs(oauth2_url.query)
client_id = oauth2_query.get("client_id")[0]
original_redirect_uri = oauth2_query.get("redirect_uri")[0]
# If the client_id starts with https://home-assistant.io/
# we assume it's a mobile client
# Android = https://home-assistant.io/Android,
# iOS = https://home-assistant.io/iOS
is_mobile = client_id.startswith("https://home-assistant.io/")
# Check if we appear to be signing in to the web version,
# for which we want to store tokens.
# We don't want to set storeTokens on sign-in to Google for instance
base_url = get_url("/", self.force_https)
is_web_client = original_redirect_uri.startswith(base_url)
if is_web_client:
# Adjust the original_redirect_uri to include the storeTokens parameter
separator = "?"
if "?" in original_redirect_uri:
separator = "&"
original_redirect_uri = f"{original_redirect_uri}{separator}storeToken=true"
oauth2_query.update({"redirect_uri": original_redirect_uri})
# Create new redirect_uri with the updated query parameters
new_oauth2_url = oauth2_url._replace(
query=urlencode(oauth2_query, doseq=True)
)
redirect_uri = new_oauth2_url.geturl()
return redirect_uri, is_mobile
async def get(self, req: web.Request) -> web.Response: async def get(self, req: web.Request) -> web.Response:
"""Receive response.""" """Receive response."""
@@ -44,23 +78,26 @@ class OIDCWelcomeView(HomeAssistantView):
# Get the query parameter with the redirect_uri # Get the query parameter with the redirect_uri
redirect_uri = req.query.get("redirect_uri") redirect_uri = req.query.get("redirect_uri")
# If set, determine if this is a mobile client based on the redirect_uri, # Do some processing on the redirect_uri to correct it
# otherwise assume it's not mobile # and determine if this is a mobile client.
if redirect_uri: if redirect_uri:
try: try:
# decodeURIComponent(btoa(...)) -> unquote first, then base64 decode redirect_uri, is_mobile = await self._process_url(redirect_uri)
redirect_uri = base64.b64decode( except (
unquote(redirect_uri), validate=True binascii.Error,
).decode("utf-8") UnicodeDecodeError,
is_mobile = self.determine_if_mobile(redirect_uri) ValueError,
except (binascii.Error, UnicodeDecodeError, ValueError): KeyError,
TypeError,
):
return await error_response( return await error_response(
"Invalid redirect_uri, please restart login." "Invalid redirect_uri, please restart login."
) )
else: else:
# Backwards compatibility with older versions that directly go to /auth/oidc/welcome # Backwards compatibility with older versions that directly go to /auth/oidc/welcome
# If not set, redirect back to the main page and assume that this is a web client # If not set, redirect back to the main page and assume that this is a web client
redirect_uri = get_url("/", self.force_https) redirect_uri = get_url("/?storeToken=true", self.force_https)
is_mobile = False is_mobile = False
# Create OIDC state with the redirect_uri so we can use it later in the flow # Create OIDC state with the redirect_uri so we can use it later in the flow

View File

@@ -16,8 +16,7 @@
"requirements": [ "requirements": [
"aiofiles", "aiofiles",
"jinja2", "jinja2",
"bcrypt",
"joserfc" "joserfc"
], ],
"version": "1.0.0-rc1" "version": "1.0.0-rc3"
} }

View File

@@ -6,7 +6,6 @@ import logging
from typing import Dict, Optional from typing import Dict, Optional
import asyncio import asyncio
import bcrypt
from homeassistant.auth import EVENT_USER_ADDED from homeassistant.auth import EVENT_USER_ADDED
from homeassistant.auth.providers import ( from homeassistant.auth.providers import (
AUTH_PROVIDERS, AUTH_PROVIDERS,
@@ -236,7 +235,7 @@ class OpenIDAuthProvider(AuthProvider):
# Keep cookie lifetime aligned with state lifetime in storage (5 minutes). # Keep cookie lifetime aligned with state lifetime in storage (5 minutes).
"set-cookie": f"{COOKIE_NAME}=" "set-cookie": f"{COOKIE_NAME}="
+ state_id + state_id
+ "; Path=/auth/; SameSite=Strict; HttpOnly; Max-Age=300" + "; Path=/auth/; SameSite=Lax; HttpOnly; Max-Age=300"
+ secure_flag, + secure_flag,
} }
@@ -367,14 +366,6 @@ class OpenIdLoginFlow(LoginFlow):
"""Handler for the login flow.""" """Handler for the login flow."""
async def _finalize_user(self, state_id: str) -> AuthFlowResult: async def _finalize_user(self, state_id: str) -> AuthFlowResult:
# Verify a dummy hash to make it last a bit longer
# as security measure (limits the amount of attempts you have in 5 min)
# Similar to what the HomeAssistant auth provider does
dummy = b"$2b$12$CiuFGszHx9eNHxPuQcwBWez4CwDTOcLTX5CbOpV6gef2nYuXkY7BO"
bcrypt.checkpw(b"foo", dummy)
# Actually look up the auth provider after,
# this doesn't take a lot of time (regardless of it's in there or not)
sub = await self._auth_provider.async_get_subject(state_id) sub = await self._auth_provider.async_get_subject(state_id)
if sub: if sub:
return await self.async_finish( return await self.async_finish(
@@ -396,11 +387,10 @@ class OpenIdLoginFlow(LoginFlow):
state_cookie = req.cookies.get(COOKIE_NAME) state_cookie = req.cookies.get(COOKIE_NAME)
if state_cookie: if state_cookie:
_LOGGER.debug("State cookie found on login: %s", state_cookie)
try: try:
return await self._finalize_user(state_cookie) return await self._finalize_user(state_cookie)
except InvalidAuthError: except InvalidAuthError:
pass return self.async_abort(reason="oidc_cookie_invalid")
# If no cookie is found, abort. # If no cookie is found, abort.
# User should either be redirected or start manually on the welcome # User should either be redirected or start manually on the welcome

View File

@@ -1,58 +1,19 @@
/** /**
* OIDC Frontend Redirect injection script * hass-oidc-auth - UX script to automatically select the Home Assistant auth provider when the "Login aborted" message is shown.
* This script is injected because the 'hass-oidc-auth' custom component is active.
*/ */
function attempt_oidc_redirect() { let authFlowElement = null
// Get URL parameters
const urlParams = new URLSearchParams(window.location.search);
// Check if we have skip_oidc_redirect directly here function update() {
if (urlParams.get('skip_oidc_redirect') === 'true') { // Find ha-auth-flow
// No console log because this is intended behavior authFlowElement = document.querySelector('ha-auth-flow');
return;
}
const originalUrl = urlParams.get('redirect_uri');
if (!originalUrl) {
console.warn('[OIDC] No OAuth2 redirect_uri parameter found in the URL. Frontend redirect cancelled.');
return;
}
try {
// Parse the redirect URI
const redirectUrl = new URL(originalUrl);
// Check if redirect URI has a query parameter to stop OIDC injection
if (redirectUrl.searchParams.get('skip_oidc_redirect') === 'true') {
// No console log because this is intended behavior
return;
}
} catch (error) {
console.error('[OIDC] Invalid redirect_uri parameter:', error);
}
window.stop(); // Stop loading the current page before redirecting
// Redirect to the OIDC auth URL
const base64encodeUrl = btoa(window.location.href);
const oidcAuthUrl = '/auth/oidc/welcome?redirect_uri=' + encodeURIComponent(base64encodeUrl);
window.location.href = oidcAuthUrl;
}
function click_alternative_provider_instead() {
setTimeout(() => {
// Find ha-auth-flow
const authFlowElement = document.querySelector('ha-auth-flow');
if (!authFlowElement) { if (!authFlowElement) {
console.warn("[OIDC] ha-auth-flow element not found. Not automatically selecting HA provider.");
return; return;
} }
// Check if the text "Login aborted" is present on the page // Check if the text "Login aborted" is present on the page
if (!authFlowElement.innerText.includes('Login aborted')) { if (!authFlowElement.innerText.includes('Login aborted')) {
console.warn("[OIDC] 'Login aborted' text not found. Not automatically selecting HA provider.");
return; return;
} }
@@ -60,7 +21,6 @@ function click_alternative_provider_instead() {
const authProviderElement = document.querySelector('ha-pick-auth-provider'); const authProviderElement = document.querySelector('ha-pick-auth-provider');
if (!authProviderElement) { if (!authProviderElement) {
console.warn("[OIDC] ha-pick-auth-provider not found. Not automatically selecting HA provider.");
return; return;
} }
@@ -72,11 +32,30 @@ function click_alternative_provider_instead() {
} }
firstListItem.click(); firstListItem.click();
}, 500);
} }
// Run OIDC injection upon load // Hide the content until ready
(() => { let ready = false
attempt_oidc_redirect(); document.querySelector(".content").style.display = "none"
click_alternative_provider_instead();
})(); const observer = new MutationObserver((mutationsList, observer) => {
update();
if (!ready) {
ready = Boolean(authFlowElement)
if (ready) {
document.querySelector(".content").style.display = ""
}
}
})
observer.observe(document.body, { childList: true, subtree: true })
setTimeout(() => {
if (!ready) {
console.warn("[hass-oidc-auth]: Document was not ready after 300ms seconds, showing content anyway.")
}
// Force display the content
document.querySelector(".content").style.display = "";
}, 300)

File diff suppressed because one or more lines are too long

View File

@@ -1,6 +1,8 @@
{ {
"name": "OpenID Connect", "name": "OpenID Connect/SSO Authentication",
"hide_default_branch": true, "hide_default_branch": true,
"render_readme": true, "render_readme": true,
"homeassistant": "2025.11" "homeassistant": "2025.11",
"zip_release": true,
"filename": "hass-oidc-auth.zip"
} }

View File

@@ -9,7 +9,6 @@ license = "MIT"
dependencies = [ dependencies = [
"aiofiles~=25.1", "aiofiles~=25.1",
"jinja2~=3.1", "jinja2~=3.1",
"bcrypt~=5.0",
"joserfc~=1.6.0", "joserfc~=1.6.0",
] ]
readme = "README.md" readme = "README.md"

10
scripts/build Executable file
View File

@@ -0,0 +1,10 @@
#! /bin/bash
# Build the plugin CSS
npm install --frozen-lockfile
npm run css
# Create zip from the custom_components/auth_oidc directory
# HACS wants only the contents of this dir in a zip
cd custom_components/auth_oidc/
zip -r ../../hass-oidc-auth.zip ./*

View File

@@ -93,7 +93,7 @@ async def test_provider_cookie_header_sets_secure_when_requested(hass: HomeAssis
provider = hass.auth.get_auth_providers(DOMAIN)[0] provider = hass.auth.get_auth_providers(DOMAIN)[0]
cookie_header = provider.get_cookie_header("state-id", secure=True)["set-cookie"] cookie_header = provider.get_cookie_header("state-id", secure=True)["set-cookie"]
assert "SameSite=Strict" in cookie_header assert "SameSite=Lax" in cookie_header
assert "HttpOnly" in cookie_header assert "HttpOnly" in cookie_header
assert "Secure" in cookie_header assert "Secure" in cookie_header
@@ -342,5 +342,36 @@ async def test_login_with_invalid_cookie_aborts(hass: HomeAssistant):
result = await flow.async_step_init({}) result = await flow.async_step_init({})
assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "oidc_cookie_invalid"
@pytest.mark.asyncio
async def test_login_with_no_cookie_aborts(hass: HomeAssistant):
"""Missing cookie should fail closed."""
await setup(
hass,
{
CLIENT_ID: "dummy",
DISCOVERY_URL: MockOIDCServer.get_discovery_url(),
FEATURES: {
FEATURES_AUTOMATIC_PERSON_CREATION: False,
FEATURES_AUTOMATIC_USER_LINKING: False,
},
},
True,
)
provider = hass.auth.get_auth_providers(DOMAIN)[0]
flow = await provider.async_login_flow({})
fake_request = SimpleNamespace(cookies={}, remote="127.0.0.1")
with patch(
"custom_components.auth_oidc.provider.http.current_request"
) as current_request:
current_request.get.return_value = fake_request
result = await flow.async_step_init({})
assert result["type"] == FlowResultType.ABORT assert result["type"] == FlowResultType.ABORT
assert result["reason"] == "no_oidc_cookie_found" assert result["reason"] == "no_oidc_cookie_found"

View File

@@ -103,10 +103,10 @@ async def verify_back_redirect(client, expected_redirect_uri: str):
"""Verify that POST to finish without body redirects back to the original redirect_uri.""" """Verify that POST to finish without body redirects back to the original redirect_uri."""
resp_finish_post = await client.post("/auth/oidc/finish", allow_redirects=False) resp_finish_post = await client.post("/auth/oidc/finish", allow_redirects=False)
assert resp_finish_post.status == 302 assert resp_finish_post.status == 302
assert (
resp_finish_post.headers["Location"] location = resp_finish_post.headers["Location"]
== unquote(expected_redirect_uri) + "&storeToken=true&skip_oidc_redirect=true" assert location.startswith(unquote(expected_redirect_uri))
) assert "skip_oidc_redirect=true" in location
async def listen_for_sse_events( async def listen_for_sse_events(

View File

@@ -23,6 +23,25 @@ from custom_components.auth_oidc.tools.oidc_client import (
http_raise_for_status, http_raise_for_status,
) )
# List from https://jose.authlib.org/en/guide/algorithms/#json-web-signature
ALL_ID_TOKEN_SIGNING_ALGORITHMS = (
"HS256",
"HS384",
"HS512",
"RS256",
"RS384",
"RS512",
"ES256",
"ES384",
"ES512",
"PS256",
"PS384",
"PS512",
"ES256K",
"Ed25519",
"Ed448",
)
def make_client(hass: HomeAssistant, **kwargs) -> OIDCClient: def make_client(hass: HomeAssistant, **kwargs) -> OIDCClient:
"""Build an OIDC client with explicit defaults for unit testing.""" """Build an OIDC client with explicit defaults for unit testing."""
@@ -67,6 +86,59 @@ def make_signed_hs256_jwt(secret: str, claims: dict) -> str:
return jwt.encode({"alg": "HS256"}, claims, jwk_obj) return jwt.encode({"alg": "HS256"}, claims, jwk_obj)
def build_real_signed_token(
algorithm: str, claims: dict, secret: str
) -> tuple[str, dict]:
"""Build a real signed token and matching JWKS payload for a given algorithm."""
if algorithm.startswith("HS"):
signing_key = jwk.import_key(
{
"kty": "oct",
"k": base64.urlsafe_b64encode(secret.encode()).decode().rstrip("="),
"alg": algorithm,
}
)
token = jwt.encode(
{"alg": algorithm}, claims, signing_key, algorithms=[algorithm]
)
return token, {"keys": []}
if algorithm in ("RS256", "RS384", "RS512", "PS256", "PS384", "PS512"):
key = jwk.generate_key(
"RSA", 2048, {"alg": algorithm, "use": "sig"}, private=True, auto_kid=True
)
elif algorithm in ("ES256", "ES384", "ES512", "ES256K"):
curve = {
"ES256": "P-256",
"ES384": "P-384",
"ES512": "P-521",
"ES256K": "secp256k1",
}[algorithm]
key = jwk.generate_key(
"EC", curve, {"alg": algorithm, "use": "sig"}, private=True, auto_kid=True
)
elif algorithm in ("Ed25519", "Ed448"):
key = jwk.generate_key(
"OKP",
algorithm,
{"alg": algorithm, "use": "sig"},
private=True,
auto_kid=True,
)
else:
raise ValueError(f"Unsupported test algorithm: {algorithm}")
kid = key.kid
token = jwt.encode(
{"alg": algorithm, "kid": kid},
claims,
key,
algorithms=[algorithm],
)
public_key = key.as_dict(private=False)
return token, {"keys": [public_key]}
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_complete_token_flow_rejects_missing_state(hass: HomeAssistant): async def test_complete_token_flow_rejects_missing_state(hass: HomeAssistant):
"""Flow state must exist; missing state should fail closed.""" """Flow state must exist; missing state should fail closed."""
@@ -447,6 +519,62 @@ async def test_parse_id_token_rejects_invalid_registered_claims(hass: HomeAssist
assert parsed is None assert parsed is None
@pytest.mark.asyncio
@pytest.mark.parametrize("algorithm", ALL_ID_TOKEN_SIGNING_ALGORITHMS)
async def test_parse_id_token_validates_real_signed_tokens_and_decode_inputs(
hass: HomeAssistant, algorithm: str
):
"""Use real signatures and verify token/key/algorithm passed into joserfc."""
secret = "top-secret-value"
client_kwargs = {"id_token_signing_alg": algorithm}
if algorithm.startswith("HS"):
client_kwargs["client_secret"] = secret
client = make_client(hass, **client_kwargs)
client.discovery_document = {
"issuer": "https://issuer",
"jwks_uri": "https://issuer/jwks",
}
now = int(time.time())
claims = {
"sub": "subject-1",
"aud": "test-client",
"iss": "https://issuer",
"nbf": now,
"iat": now,
"exp": now + 3600,
}
token, jwks_payload = build_real_signed_token(algorithm, claims, secret)
with (
patch.object(client, "_fetch_jwks", new=AsyncMock(return_value=jwks_payload)),
patch(
"custom_components.auth_oidc.tools.oidc_client.jwt.decode",
wraps=jwt.decode,
) as decode_spy,
patch(
"custom_components.auth_oidc.tools.oidc_client.jwk.import_key",
wraps=jwk.import_key,
) as import_key_spy,
):
parsed = await client._parse_id_token(token)
assert parsed == claims
decode_spy.assert_called_once()
assert decode_spy.call_args.args[0] == token
assert decode_spy.call_args.kwargs["algorithms"] == [algorithm]
import_key_spy.assert_called()
imported_key_payload = import_key_spy.call_args.args[0]
assert imported_key_payload["alg"] == algorithm
if algorithm.startswith("HS"):
assert imported_key_payload["kty"] == "oct"
else:
assert imported_key_payload["kid"] is not None
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_authorization_url_returns_none_when_discovery_fails( async def test_get_authorization_url_returns_none_when_discovery_fails(
hass: HomeAssistant, hass: HomeAssistant,

View File

@@ -2,8 +2,11 @@
import base64 import base64
import os import os
from urllib.parse import parse_qs, quote, unquote, urlparse, urlencode
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
from auth_oidc.config.const import DISCOVERY_URL, CLIENT_ID from auth_oidc.config.const import DISCOVERY_URL, CLIENT_ID
from pytest_homeassistant_custom_component.typing import ClientSessionGenerator
import pytest import pytest
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@@ -16,14 +19,19 @@ from custom_components.auth_oidc.endpoints.injected_auth_page import (
frontend_injection, frontend_injection,
) )
WEB_CLIENT_ID = "https://example.com"
MOBILE_CLIENT_ID = "https://home-assistant.io/Android" MOBILE_CLIENT_ID = "https://home-assistant.io/Android"
def create_redirect_uri(client_id: str) -> str: def create_redirect_uri(client_id: str) -> str:
"""Build a redirect URI that includes a client_id query parameter.""" """Build a redirect URI that includes a client_id query parameter."""
return f"http://example.com/auth/authorize?client_id={client_id}" params = {
"response_type": "code",
"redirect_uri": client_id,
"client_id": client_id,
"state": "example",
}
return f"http://example.com/auth/authorize?{urlencode(params)}"
def encode_redirect_uri(redirect_uri: str) -> str: def encode_redirect_uri(redirect_uri: str) -> str:
@@ -45,8 +53,26 @@ async def setup(
assert result assert result
async def setup_mock_authorize_route(hass: HomeAssistant) -> None:
"""Register a mock /auth/authorize page so frontend injection can hook into it."""
await async_setup_component(hass, HTTP_DOMAIN, {})
mock_html_path = os.path.join(os.path.dirname(__file__), "mocks", "auth_page.html")
await hass.http.async_register_static_paths(
[
StaticPathConfig(
"/auth/authorize",
mock_html_path,
cache_headers=False,
)
]
)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_welcome_page_registration(hass: HomeAssistant, hass_client): async def test_welcome_page_registration(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that welcome page is present.""" """Test that welcome page is present."""
await setup(hass) await setup(hass)
@@ -57,7 +83,9 @@ async def test_welcome_page_registration(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_redirect_page_registration(hass: HomeAssistant, hass_client): async def test_redirect_page_registration(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that redirect page can be reached.""" """Test that redirect page can be reached."""
await setup(hass) await setup(hass)
@@ -72,7 +100,7 @@ async def test_redirect_page_registration(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_welcome_rejects_invalid_encoded_redirect_uri( async def test_welcome_rejects_invalid_encoded_redirect_uri(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Welcome should reject malformed base64 redirect_uri values.""" """Welcome should reject malformed base64 redirect_uri values."""
await setup(hass) await setup(hass)
@@ -87,12 +115,104 @@ async def test_welcome_rejects_invalid_encoded_redirect_uri(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_welcome_sets_strict_state_cookie_flags(hass: HomeAssistant, hass_client): @pytest.mark.parametrize(
"redirect_uri",
[
"http://example.com/auth/authorize?client_id=https://example.com",
"http://example.com/auth/authorize?redirect_uri=https://example.com",
],
)
async def test_welcome_rejects_redirect_uris_missing_required_query_params(
hass: HomeAssistant, hass_client: ClientSessionGenerator, redirect_uri: str
):
"""Welcome should reject redirect URIs that decode but are incomplete."""
await setup(hass)
client = await hass_client()
encoded = encode_redirect_uri(redirect_uri)
resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
allow_redirects=False,
)
assert resp.status == 400
assert "Invalid redirect_uri, please restart login." in await resp.text()
@pytest.mark.asyncio
@pytest.mark.parametrize(
("client_id", "should_store_token", "is_mobile"),
[
("", True, False),
(MOBILE_CLIENT_ID, False, True),
("https://random.example", False, False),
],
)
async def test_welcome_only_adds_store_token_for_web_clients(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
client_id: str,
should_store_token: bool,
is_mobile: bool,
):
"""Welcome should only append storeToken for clients aligned with the base URL."""
await setup(hass)
captured_redirect_uri = {}
async def fake_create_state(state_redirect_uri: str, *_args):
captured_redirect_uri["value"] = state_redirect_uri
return "state-id"
with (
patch(
"custom_components.auth_oidc.provider.OpenIDAuthProvider.async_create_state",
new=AsyncMock(side_effect=fake_create_state),
),
patch(
"custom_components.auth_oidc.provider.OpenIDAuthProvider.async_generate_device_code",
new=AsyncMock(return_value="123456"),
),
):
client = await hass_client()
if client_id == "":
# If not present, set it to the root URL to
# emulate the normal website/Lovelace/dashboard
client_id = str(client.make_url("/?test=true"))
redirect_uri = create_redirect_uri(client_id)
encoded = encode_redirect_uri(redirect_uri)
resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}",
allow_redirects=False,
)
assert resp.status in (200, 302)
assert "value" in captured_redirect_uri
parsed_state_redirect = urlparse(captured_redirect_uri["value"])
state_redirect_query = parse_qs(parsed_state_redirect.query)
nested_redirect_uri = unquote(state_redirect_query["redirect_uri"][0])
if should_store_token:
assert "storeToken=true" in nested_redirect_uri
else:
assert "storeToken=true" not in nested_redirect_uri
if is_mobile:
assert "https://home-assistant.io/" in nested_redirect_uri
@pytest.mark.asyncio
async def test_welcome_sets_secure_state_cookie_flags(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Welcome should set secure cookie flags for the OIDC state cookie.""" """Welcome should set secure cookie flags for the OIDC state cookie."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp = await client.get( resp = await client.get(
@@ -105,14 +225,14 @@ async def test_welcome_sets_strict_state_cookie_flags(hass: HomeAssistant, hass_
set_cookie = resp.headers.get("Set-Cookie", "") set_cookie = resp.headers.get("Set-Cookie", "")
assert "Path=/auth/" in set_cookie assert "Path=/auth/" in set_cookie
assert "SameSite=Strict" in set_cookie assert "SameSite=Lax" in set_cookie
assert "HttpOnly" in set_cookie assert "HttpOnly" in set_cookie
assert "Max-Age=300" in set_cookie assert "Max-Age=300" in set_cookie
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_welcome_mobile_device_code_generation_failure( async def test_welcome_mobile_device_code_generation_failure(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Welcome should error if device code generation fails for mobile clients.""" """Welcome should error if device code generation fails for mobile clients."""
await setup(hass) await setup(hass)
@@ -137,13 +257,13 @@ async def test_welcome_mobile_device_code_generation_failure(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_welcome_shows_alternative_sign_in_link_when_other_providers_exist( async def test_welcome_shows_alternative_sign_in_link_when_other_providers_exist(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Welcome should render fallback auth link when other providers are present.""" """Welcome should render fallback auth link when other providers are present."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp = await client.get( resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -158,7 +278,7 @@ async def test_welcome_shows_alternative_sign_in_link_when_other_providers_exist
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_welcome_desktop_auto_redirects_without_other_providers( async def test_welcome_desktop_auto_redirects_without_other_providers(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Welcome should auto-redirect desktop clients when no other providers exist.""" """Welcome should auto-redirect desktop clients when no other providers exist."""
@@ -167,7 +287,7 @@ async def test_welcome_desktop_auto_redirects_without_other_providers(
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp = await client.get( resp = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -179,7 +299,7 @@ async def test_welcome_desktop_auto_redirects_without_other_providers(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_redirect_without_cookie_goes_to_welcome( async def test_redirect_without_cookie_goes_to_welcome(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Redirect endpoint should bounce to welcome when no state cookie exists.""" """Redirect endpoint should bounce to welcome when no state cookie exists."""
await setup(hass) await setup(hass)
@@ -192,13 +312,13 @@ async def test_redirect_without_cookie_goes_to_welcome(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_redirect_shows_error_on_oidc_runtime_error( async def test_redirect_shows_error_on_oidc_runtime_error(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Redirect should show a configuration error when OIDC URL generation raises.""" """Redirect should show a configuration error when OIDC URL generation raises."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get( resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -220,13 +340,13 @@ async def test_redirect_shows_error_on_oidc_runtime_error(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_redirect_shows_error_when_auth_url_empty( async def test_redirect_shows_error_when_auth_url_empty(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Redirect should show error page if OIDC returns no authorization URL.""" """Redirect should show error page if OIDC returns no authorization URL."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get( resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -247,7 +367,9 @@ async def test_redirect_shows_error_when_auth_url_empty(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_callback_registration(hass: HomeAssistant, hass_client): async def test_callback_registration(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that callback page is reachable.""" """Test that callback page is reachable."""
await setup(hass) await setup(hass)
@@ -258,12 +380,14 @@ async def test_callback_registration(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_callback_rejects_missing_code_or_state(hass: HomeAssistant, hass_client): async def test_callback_rejects_missing_code_or_state(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Callback must reject requests missing either code or state.""" """Callback must reject requests missing either code or state."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get( resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -287,12 +411,14 @@ async def test_callback_rejects_missing_code_or_state(hass: HomeAssistant, hass_
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_callback_rejects_state_mismatch(hass: HomeAssistant, hass_client): async def test_callback_rejects_state_mismatch(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Callback must reject state mismatch to protect against CSRF.""" """Callback must reject state mismatch to protect against CSRF."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get( resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -310,13 +436,13 @@ async def test_callback_rejects_state_mismatch(hass: HomeAssistant, hass_client)
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_callback_rejects_when_user_details_fetch_fails( async def test_callback_rejects_when_user_details_fetch_fails(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""Callback should error when token exchange/userinfo retrieval fails.""" """Callback should error when token exchange/userinfo retrieval fails."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get( resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -340,12 +466,14 @@ async def test_callback_rejects_when_user_details_fetch_fails(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_callback_rejects_invalid_role(hass: HomeAssistant, hass_client): async def test_callback_rejects_invalid_role(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Callback should reject users marked with invalid role.""" """Callback should reject users marked with invalid role."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get( resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -378,7 +506,10 @@ async def test_callback_rejects_invalid_role(hass: HomeAssistant, hass_client):
], ],
) )
async def test_finish_requires_state_cookie( async def test_finish_requires_state_cookie(
hass: HomeAssistant, hass_client, method: str, data: dict | None hass: HomeAssistant,
hass_client: ClientSessionGenerator,
method: str,
data: dict | None,
): ):
"""Finish endpoint should require the OIDC state cookie for both GET and POST.""" """Finish endpoint should require the OIDC state cookie for both GET and POST."""
await setup(hass) await setup(hass)
@@ -395,12 +526,14 @@ async def test_finish_requires_state_cookie(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_finish_post_rejects_invalid_state(hass: HomeAssistant, hass_client): async def test_finish_post_rejects_invalid_state(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Finish POST should error when the state cookie does not resolve to redirect_uri.""" """Finish POST should error when the state cookie does not resolve to redirect_uri."""
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
redirect_uri = create_redirect_uri(WEB_CLIENT_ID) redirect_uri = create_redirect_uri(client.make_url("/"))
encoded = encode_redirect_uri(redirect_uri) encoded = encode_redirect_uri(redirect_uri)
resp_welcome = await client.get( resp_welcome = await client.get(
f"/auth/oidc/welcome?redirect_uri={encoded}", f"/auth/oidc/welcome?redirect_uri={encoded}",
@@ -418,7 +551,9 @@ async def test_finish_post_rejects_invalid_state(hass: HomeAssistant, hass_clien
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_device_sse_requires_state_cookie(hass: HomeAssistant, hass_client): async def test_device_sse_requires_state_cookie(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""SSE endpoint should reject requests without state cookie.""" """SSE endpoint should reject requests without state cookie."""
await setup(hass) await setup(hass)
@@ -430,7 +565,7 @@ async def test_device_sse_requires_state_cookie(hass: HomeAssistant, hass_client
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_device_sse_emits_expired_for_unknown_state( async def test_device_sse_emits_expired_for_unknown_state(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""SSE should emit expired when the state can no longer be resolved.""" """SSE should emit expired when the state can no longer be resolved."""
await setup(hass) await setup(hass)
@@ -455,7 +590,9 @@ async def test_device_sse_emits_expired_for_unknown_state(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_device_sse_emits_timeout(hass: HomeAssistant, hass_client): async def test_device_sse_emits_timeout(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""SSE should emit timeout if the polling window is exceeded.""" """SSE should emit timeout if the polling window is exceeded."""
await setup(hass) await setup(hass)
@@ -493,7 +630,7 @@ async def test_device_sse_emits_timeout(hass: HomeAssistant, hass_client):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_device_sse_handles_runtime_error_and_returns_cleanly( async def test_device_sse_handles_runtime_error_and_returns_cleanly(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""SSE should swallow runtime errors from stream loop and finish response.""" """SSE should swallow runtime errors from stream loop and finish response."""
await setup(hass) await setup(hass)
@@ -523,7 +660,7 @@ async def test_device_sse_handles_runtime_error_and_returns_cleanly(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_device_sse_ignores_write_eof_connection_reset( async def test_device_sse_ignores_write_eof_connection_reset(
hass: HomeAssistant, hass_client hass: HomeAssistant, hass_client: ClientSessionGenerator
): ):
"""SSE should ignore ConnectionResetError while closing the stream.""" """SSE should ignore ConnectionResetError while closing the stream."""
await setup(hass) await setup(hass)
@@ -553,29 +690,20 @@ async def test_device_sse_ignores_write_eof_connection_reset(
# Test the frontend injection # Test the frontend injection
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_frontend_injection(hass: HomeAssistant, hass_client): async def test_frontend_injection(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Test that frontend injection works.""" """Test that frontend injection works."""
# Because there is no frontend in the test setup, # Because there is no frontend in the test setup,
# we'll have to fake /auth/authorize for the changes to register # we'll have to fake /auth/authorize for the changes to register.
await async_setup_component(hass, HTTP_DOMAIN, {}) await setup_mock_authorize_route(hass)
mock_html_path = os.path.join(os.path.dirname(__file__), "mocks", "auth_page.html")
await hass.http.async_register_static_paths(
[
StaticPathConfig(
"/auth/authorize",
mock_html_path,
cache_headers=False,
)
]
)
await setup(hass) await setup(hass)
client = await hass_client() client = await hass_client()
resp = await client.get("/auth/authorize", allow_redirects=False) resp = await client.get("/auth/authorize", allow_redirects=False)
assert resp.status == 200 assert resp.status == 200 # 200 because there is no redirect_uri
text = await resp.text() text = await resp.text()
assert "<script src='/auth/oidc/static/injection.js" in text assert "<script src='/auth/oidc/static/injection.js" in text
@@ -606,7 +734,7 @@ async def test_frontend_injection_logs_and_returns_when_route_handler_is_unexpec
return iter([FakeRoute()]) return iter([FakeRoute()])
with patch.object(hass.http.app.router, "resources", return_value=[FakeResource()]): with patch.object(hass.http.app.router, "resources", return_value=[FakeResource()]):
await frontend_injection(hass) await frontend_injection(hass, force_https=False)
assert "Unexpected route handler type" in caplog.text assert "Unexpected route handler type" in caplog.text
assert ( assert (
@@ -625,6 +753,61 @@ async def test_injected_auth_page_inject_logs_errors(hass: HomeAssistant, caplog
"custom_components.auth_oidc.endpoints.injected_auth_page.frontend_injection", "custom_components.auth_oidc.endpoints.injected_auth_page.frontend_injection",
side_effect=RuntimeError("boom"), side_effect=RuntimeError("boom"),
): ):
await OIDCInjectedAuthPage.inject(hass) await OIDCInjectedAuthPage.inject(hass, force_https=False)
assert "Failed to inject OIDC auth page: boom" in caplog.text assert "Failed to inject OIDC auth page: boom" in caplog.text
@pytest.mark.asyncio
async def test_injected_auth_page_redirects_to_welcome_when_not_skipped(
hass: HomeAssistant, hass_client: ClientSessionGenerator
):
"""Injected auth page should redirect into OIDC when skip flags are absent."""
await setup_mock_authorize_route(hass)
await setup(hass)
client = await hass_client()
encoded_redirect_uri = quote(create_redirect_uri(client.make_url("/")), safe="")
resp = await client.get(
f"/auth/authorize?redirect_uri={encoded_redirect_uri}",
allow_redirects=False,
)
assert resp.status == 302
location = resp.headers["Location"]
parsed_location = urlparse(location)
assert parsed_location.path == "/auth/oidc/welcome"
query = parse_qs(parsed_location.query)
assert "redirect_uri" in query
original_url = base64.b64decode(unquote(query["redirect_uri"][0]), validate=True)
original_url = original_url.decode("utf-8")
assert "/auth/authorize?redirect_uri=" in original_url
@pytest.mark.asyncio
@pytest.mark.parametrize(
"request_target",
[
"/auth/authorize?skip_oidc_redirect=true",
"/auth/authorize?redirect_uri=http%3A%2F%2Fexample.com%2Fauth%2Fauthorize%3Fskip_oidc_redirect%3Dtrue",
],
)
async def test_injected_auth_page_returns_original_html_when_skipped(
hass: HomeAssistant,
hass_client,
request_target: str,
):
"""Injected auth page should render HTML when redirect suppression is requested."""
await setup_mock_authorize_route(hass)
await setup(hass)
client = await hass_client()
response = await client.get(request_target, allow_redirects=False)
assert response.status == 200
assert "<script src='/auth/oidc/static/injection.js" in await response.text()

2
uv.lock generated
View File

@@ -958,7 +958,6 @@ version = "1.0.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "aiofiles" }, { name = "aiofiles" },
{ name = "bcrypt" },
{ name = "jinja2" }, { name = "jinja2" },
{ name = "joserfc" }, { name = "joserfc" },
] ]
@@ -977,7 +976,6 @@ dev = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "aiofiles", specifier = "~=25.1" }, { name = "aiofiles", specifier = "~=25.1" },
{ name = "bcrypt", specifier = "~=5.0" },
{ name = "jinja2", specifier = "~=3.1" }, { name = "jinja2", specifier = "~=3.1" },
{ name = "joserfc", specifier = "~=1.6.0" }, { name = "joserfc", specifier = "~=1.6.0" },
] ]