from dataclasses import dataclass from typing import AsyncGenerator from uuid import UUID import logging from fastapi import Depends, Header, HTTPException, status from psycopg import AsyncConnection from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.auth.keycloak_dependencies import get_current_keycloak_sub from app.core.config import settings from app.infra.db.dynamic_manager import project_connection_manager from app.infra.db.metadata.database import get_metadata_session from app.infra.repositories.metadata_repository import MetadataRepository DB_ROLE_BIZ_DATA = "biz_data" DB_ROLE_IOT_DATA = "iot_data" DB_TYPE_POSTGRES = "postgresql" DB_TYPE_TIMESCALE = "timescaledb" logger = logging.getLogger(__name__) @dataclass(frozen=True) class ProjectContext: project_id: UUID user_id: UUID project_role: str async def get_metadata_repository( session: AsyncSession = Depends(get_metadata_session), ) -> MetadataRepository: return MetadataRepository(session) async def get_project_context( x_project_id: str = Header(..., alias="X-Project-Id"), keycloak_sub: UUID = Depends(get_current_keycloak_sub), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> ProjectContext: try: project_uuid = UUID(x_project_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid project id" ) from exc try: project = await metadata_repo.get_project_by_id(project_uuid) if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Project not found" ) if project.status != "active": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Project is not active" ) user = await metadata_repo.get_user_by_keycloak_id(keycloak_sub) if not user: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User not registered" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) membership_role = await metadata_repo.get_membership_role( project_uuid, user.id ) if not membership_role: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No access to project" ) except SQLAlchemyError as exc: logger.error( "Metadata DB error while resolving project context", exc_info=True, ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=f"Metadata database error: {exc}", ) from exc return ProjectContext( project_id=project.id, user_id=user.id, project_role=membership_role, ) async def get_project_pg_session( ctx: ProjectContext = Depends(get_project_context), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> AsyncGenerator[AsyncSession, None]: try: routing = await metadata_repo.get_project_db_routing( ctx.project_id, DB_ROLE_BIZ_DATA ) except ValueError as exc: logger.error( "Missing ENCRYPTION_KEY while resolving project PostgreSQL routing", exc_info=True, ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="ENCRYPTION_KEY is not configured for project PostgreSQL access", ) from exc if not routing: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL not configured", ) if routing.db_type != DB_TYPE_POSTGRES: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL type mismatch", ) pool_min_size = routing.pool_min_size or settings.PROJECT_PG_POOL_SIZE pool_max_size = routing.pool_max_size or settings.PROJECT_PG_POOL_SIZE sessionmaker = await project_connection_manager.get_pg_sessionmaker( ctx.project_id, DB_ROLE_BIZ_DATA, routing.dsn, pool_min_size, pool_max_size, ) async with sessionmaker() as session: yield session async def get_project_pg_connection( ctx: ProjectContext = Depends(get_project_context), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> AsyncGenerator[AsyncConnection, None]: try: routing = await metadata_repo.get_project_db_routing( ctx.project_id, DB_ROLE_BIZ_DATA ) except ValueError as exc: logger.error( "Missing ENCRYPTION_KEY while resolving project PostgreSQL routing", exc_info=True, ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="ENCRYPTION_KEY is not configured for project PostgreSQL access", ) from exc if not routing: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL not configured", ) if routing.db_type != DB_TYPE_POSTGRES: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL type mismatch", ) pool_min_size = routing.pool_min_size or settings.PROJECT_PG_POOL_SIZE pool_max_size = routing.pool_max_size or settings.PROJECT_PG_POOL_SIZE pool = await project_connection_manager.get_pg_pool( ctx.project_id, DB_ROLE_BIZ_DATA, routing.dsn, pool_min_size, pool_max_size, ) async with pool.connection() as conn: yield conn async def get_project_timescale_connection( ctx: ProjectContext = Depends(get_project_context), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> AsyncGenerator[AsyncConnection, None]: try: routing = await metadata_repo.get_project_db_routing( ctx.project_id, DB_ROLE_IOT_DATA ) except ValueError as exc: logger.error( "Missing ENCRYPTION_KEY while resolving project TimescaleDB routing", exc_info=True, ) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="ENCRYPTION_KEY is not configured for project TimescaleDB access", ) from exc if not routing: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project TimescaleDB not configured", ) if routing.db_type != DB_TYPE_TIMESCALE: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project TimescaleDB type mismatch", ) pool_min_size = routing.pool_min_size or settings.PROJECT_TS_POOL_MIN_SIZE pool_max_size = routing.pool_max_size or settings.PROJECT_TS_POOL_MAX_SIZE pool = await project_connection_manager.get_timescale_pool( ctx.project_id, DB_ROLE_IOT_DATA, routing.dsn, pool_min_size, pool_max_size, ) async with pool.connection() as conn: yield conn