Files
TJWaterServerBinary/app/auth/keycloak_dependencies.py

64 lines
1.8 KiB
Python

# import logging
from uuid import UUID
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from app.core.config import settings
oauth2_optional = OAuth2PasswordBearer(
tokenUrl=f"{settings.API_V1_STR}/auth/login", auto_error=False
)
# logger = logging.getLogger(__name__)
async def get_current_keycloak_sub(
token: str | None = Depends(oauth2_optional),
) -> UUID:
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
if settings.KEYCLOAK_PUBLIC_KEY:
key = settings.KEYCLOAK_PUBLIC_KEY.replace("\\n", "\n")
algorithms = [settings.KEYCLOAK_ALGORITHM]
else:
key = settings.SECRET_KEY
algorithms = [settings.ALGORITHM]
try:
payload = jwt.decode(
token,
key,
algorithms=algorithms,
audience=settings.KEYCLOAK_AUDIENCE or None,
)
except JWTError as exc:
# logger.warning("Keycloak token validation failed: %s", exc)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
) from exc
sub = payload.get("sub")
if not sub:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing subject claim",
headers={"WWW-Authenticate": "Bearer"},
)
try:
return UUID(sub)
except ValueError as exc:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid subject claim",
headers={"WWW-Authenticate": "Bearer"},
) from exc