214 lines
7.2 KiB
Python
214 lines
7.2 KiB
Python
from dataclasses import dataclass
|
|
from typing import AsyncGenerator
|
|
from uuid import UUID
|
|
|
|
import logging
|
|
from fastapi import Depends, Header, HTTPException, status
|
|
from psycopg import AsyncConnection
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.auth.keycloak_dependencies import get_current_keycloak_sub
|
|
from app.core.config import settings
|
|
from app.infra.db.dynamic_manager import project_connection_manager
|
|
from app.infra.db.metadata.database import get_metadata_session
|
|
from app.infra.repositories.metadata_repository import MetadataRepository
|
|
|
|
DB_ROLE_BIZ_DATA = "biz_data"
|
|
DB_ROLE_IOT_DATA = "iot_data"
|
|
DB_TYPE_POSTGRES = "postgresql"
|
|
DB_TYPE_TIMESCALE = "timescaledb"
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ProjectContext:
|
|
project_id: UUID
|
|
user_id: UUID
|
|
project_role: str
|
|
|
|
|
|
async def get_metadata_repository(
|
|
session: AsyncSession = Depends(get_metadata_session),
|
|
) -> MetadataRepository:
|
|
return MetadataRepository(session)
|
|
|
|
|
|
async def get_project_context(
|
|
x_project_id: str = Header(..., alias="X-Project-Id"),
|
|
keycloak_sub: UUID = Depends(get_current_keycloak_sub),
|
|
metadata_repo: MetadataRepository = Depends(get_metadata_repository),
|
|
) -> ProjectContext:
|
|
try:
|
|
project_uuid = UUID(x_project_id)
|
|
except ValueError as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid project id"
|
|
) from exc
|
|
|
|
try:
|
|
project = await metadata_repo.get_project_by_id(project_uuid)
|
|
if not project:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND, detail="Project not found"
|
|
)
|
|
if project.status != "active":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="Project is not active"
|
|
)
|
|
|
|
user = await metadata_repo.get_user_by_keycloak_id(keycloak_sub)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="User not registered"
|
|
)
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user"
|
|
)
|
|
|
|
membership_role = await metadata_repo.get_membership_role(project_uuid, user.id)
|
|
if not membership_role:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN, detail="No access to project"
|
|
)
|
|
except SQLAlchemyError as exc:
|
|
logger.error(
|
|
"Metadata DB error while resolving project context",
|
|
exc_info=True,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=f"Metadata database error: {exc}",
|
|
) from exc
|
|
|
|
return ProjectContext(
|
|
project_id=project.id,
|
|
user_id=user.id,
|
|
project_role=membership_role,
|
|
)
|
|
|
|
|
|
async def get_project_pg_session(
|
|
ctx: ProjectContext = Depends(get_project_context),
|
|
metadata_repo: MetadataRepository = Depends(get_metadata_repository),
|
|
) -> AsyncGenerator[AsyncSession, None]:
|
|
try:
|
|
routing = await metadata_repo.get_project_db_routing(
|
|
ctx.project_id, DB_ROLE_BIZ_DATA
|
|
)
|
|
except ValueError as exc:
|
|
logger.error(
|
|
"Invalid project PostgreSQL routing DSN configuration",
|
|
exc_info=True,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=f"Project PostgreSQL routing DSN is invalid: {exc}",
|
|
) from exc
|
|
if not routing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Project PostgreSQL not configured",
|
|
)
|
|
if routing.db_type != DB_TYPE_POSTGRES:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Project PostgreSQL type mismatch",
|
|
)
|
|
|
|
pool_min_size = routing.pool_min_size or settings.PROJECT_PG_POOL_SIZE
|
|
pool_max_size = routing.pool_max_size or settings.PROJECT_PG_POOL_SIZE
|
|
sessionmaker = await project_connection_manager.get_pg_sessionmaker(
|
|
ctx.project_id,
|
|
DB_ROLE_BIZ_DATA,
|
|
routing.dsn,
|
|
pool_min_size,
|
|
pool_max_size,
|
|
)
|
|
async with sessionmaker() as session:
|
|
yield session
|
|
|
|
|
|
async def get_project_pg_connection(
|
|
ctx: ProjectContext = Depends(get_project_context),
|
|
metadata_repo: MetadataRepository = Depends(get_metadata_repository),
|
|
) -> AsyncGenerator[AsyncConnection, None]:
|
|
try:
|
|
routing = await metadata_repo.get_project_db_routing(
|
|
ctx.project_id, DB_ROLE_BIZ_DATA
|
|
)
|
|
except ValueError as exc:
|
|
logger.error(
|
|
"Invalid project PostgreSQL routing DSN configuration",
|
|
exc_info=True,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=f"Project PostgreSQL routing DSN is invalid: {exc}",
|
|
) from exc
|
|
if not routing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Project PostgreSQL not configured",
|
|
)
|
|
if routing.db_type != DB_TYPE_POSTGRES:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Project PostgreSQL type mismatch",
|
|
)
|
|
|
|
pool_min_size = routing.pool_min_size or settings.PROJECT_PG_POOL_SIZE
|
|
pool_max_size = routing.pool_max_size or settings.PROJECT_PG_POOL_SIZE
|
|
pool = await project_connection_manager.get_pg_pool(
|
|
ctx.project_id,
|
|
DB_ROLE_BIZ_DATA,
|
|
routing.dsn,
|
|
pool_min_size,
|
|
pool_max_size,
|
|
)
|
|
async with pool.connection() as conn:
|
|
yield conn
|
|
|
|
|
|
async def get_project_timescale_connection(
|
|
ctx: ProjectContext = Depends(get_project_context),
|
|
metadata_repo: MetadataRepository = Depends(get_metadata_repository),
|
|
) -> AsyncGenerator[AsyncConnection, None]:
|
|
try:
|
|
routing = await metadata_repo.get_project_db_routing(
|
|
ctx.project_id, DB_ROLE_IOT_DATA
|
|
)
|
|
except ValueError as exc:
|
|
logger.error(
|
|
"Invalid project TimescaleDB routing DSN configuration",
|
|
exc_info=True,
|
|
)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail=f"Project TimescaleDB routing DSN is invalid: {exc}",
|
|
) from exc
|
|
if not routing:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Project TimescaleDB not configured",
|
|
)
|
|
if routing.db_type != DB_TYPE_TIMESCALE:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
|
|
detail="Project TimescaleDB type mismatch",
|
|
)
|
|
|
|
pool_min_size = routing.pool_min_size or settings.PROJECT_TS_POOL_MIN_SIZE
|
|
pool_max_size = routing.pool_max_size or settings.PROJECT_TS_POOL_MAX_SIZE
|
|
pool = await project_connection_manager.get_timescale_pool(
|
|
ctx.project_id,
|
|
DB_ROLE_IOT_DATA,
|
|
routing.dsn,
|
|
pool_min_size,
|
|
pool_max_size,
|
|
)
|
|
async with pool.connection() as conn:
|
|
yield conn
|