from dataclasses import dataclass from typing import AsyncGenerator from uuid import UUID from fastapi import Depends, Header, HTTPException, status from psycopg import AsyncConnection from sqlalchemy.ext.asyncio import AsyncSession from app.auth.keycloak_dependencies import get_current_keycloak_sub from app.core.config import settings from app.infra.db.dynamic_manager import project_connection_manager from app.infra.db.metadata.database import get_metadata_session from app.infra.repositories.metadata_repository import MetadataRepository DB_ROLE_BIZ_PG = "biz_pg" DB_ROLE_IOT_TS = "iot_ts" DB_TYPE_POSTGRES = "postgresql" DB_TYPE_TIMESCALE = "timescaledb" @dataclass(frozen=True) class ProjectContext: project_id: UUID user_id: UUID project_role: str async def get_metadata_repository( session: AsyncSession = Depends(get_metadata_session), ) -> MetadataRepository: return MetadataRepository(session) async def get_project_context( x_project_id: str = Header(..., alias="X-Project-Id"), keycloak_sub: UUID = Depends(get_current_keycloak_sub), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> ProjectContext: try: project_uuid = UUID(x_project_id) except ValueError as exc: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid project id" ) from exc project = await metadata_repo.get_project_by_id(project_uuid) if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Project not found" ) if project.status != "active": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Project is not active" ) user = await metadata_repo.get_user_by_keycloak_id(keycloak_sub) if not user: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User not registered" ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) membership_role = await metadata_repo.get_membership_role(project_uuid, user.id) if not membership_role: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="No access to project" ) return ProjectContext( project_id=project.id, user_id=user.id, project_role=membership_role, ) async def get_project_pg_session( ctx: ProjectContext = Depends(get_project_context), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> AsyncGenerator[AsyncSession, None]: routing = await metadata_repo.get_project_db_routing( ctx.project_id, DB_ROLE_BIZ_PG ) if not routing: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL not configured", ) if routing.db_type != DB_TYPE_POSTGRES: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL type mismatch", ) pool_min_size = routing.pool_min_size or settings.PROJECT_PG_POOL_SIZE pool_max_size = routing.pool_max_size or settings.PROJECT_PG_POOL_SIZE sessionmaker = await project_connection_manager.get_pg_sessionmaker( ctx.project_id, DB_ROLE_BIZ_PG, routing.dsn, pool_min_size, pool_max_size, ) async with sessionmaker() as session: yield session async def get_project_pg_connection( ctx: ProjectContext = Depends(get_project_context), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> AsyncGenerator[AsyncConnection, None]: routing = await metadata_repo.get_project_db_routing( ctx.project_id, DB_ROLE_BIZ_PG ) if not routing: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL not configured", ) if routing.db_type != DB_TYPE_POSTGRES: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project PostgreSQL type mismatch", ) pool_min_size = routing.pool_min_size or settings.PROJECT_PG_POOL_SIZE pool_max_size = routing.pool_max_size or settings.PROJECT_PG_POOL_SIZE pool = await project_connection_manager.get_pg_pool( ctx.project_id, DB_ROLE_BIZ_PG, routing.dsn, pool_min_size, pool_max_size, ) async with pool.connection() as conn: yield conn async def get_project_timescale_connection( ctx: ProjectContext = Depends(get_project_context), metadata_repo: MetadataRepository = Depends(get_metadata_repository), ) -> AsyncGenerator[AsyncConnection, None]: routing = await metadata_repo.get_project_db_routing( ctx.project_id, DB_ROLE_IOT_TS ) if not routing: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project TimescaleDB not configured", ) if routing.db_type != DB_TYPE_TIMESCALE: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Project TimescaleDB type mismatch", ) pool_min_size = routing.pool_min_size or settings.PROJECT_TS_POOL_MIN_SIZE pool_max_size = routing.pool_max_size or settings.PROJECT_TS_POOL_MAX_SIZE pool = await project_connection_manager.get_timescale_pool( ctx.project_id, DB_ROLE_IOT_TS, routing.dsn, pool_min_size, pool_max_size, ) async with pool.connection() as conn: yield conn