from dataclasses import dataclass from typing import Optional, List from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.encryption import get_encryptor from app.infra.db.metadata import models @dataclass(frozen=True) class ProjectDbRouting: project_id: UUID db_role: str db_type: str dsn: str pool_min_size: int pool_max_size: int @dataclass(frozen=True) class ProjectGeoServerInfo: project_id: UUID gs_base_url: Optional[str] gs_admin_user: Optional[str] gs_admin_password: Optional[str] gs_datastore_name: str default_extent: Optional[dict] srid: int @dataclass(frozen=True) class ProjectSummary: project_id: UUID name: str code: str description: Optional[str] gs_workspace: str status: str project_role: str class MetadataRepository: """元数据访问层(system_hub)""" def __init__(self, session: AsyncSession): self.session = session async def get_user_by_keycloak_id( self, keycloak_id: UUID ) -> Optional[models.User]: result = await self.session.execute( select(models.User).where(models.User.keycloak_id == keycloak_id) ) return result.scalar_one_or_none() async def get_project_by_id(self, project_id: UUID) -> Optional[models.Project]: result = await self.session.execute( select(models.Project).where(models.Project.id == project_id) ) return result.scalar_one_or_none() async def get_membership_role( self, project_id: UUID, user_id: UUID ) -> Optional[str]: result = await self.session.execute( select(models.UserProjectMembership.project_role).where( models.UserProjectMembership.project_id == project_id, models.UserProjectMembership.user_id == user_id, ) ) return result.scalar_one_or_none() async def get_project_db_routing( self, project_id: UUID, db_role: str ) -> Optional[ProjectDbRouting]: result = await self.session.execute( select(models.ProjectDatabase).where( models.ProjectDatabase.project_id == project_id, models.ProjectDatabase.db_role == db_role, ) ) record = result.scalar_one_or_none() if not record: return None encryptor = get_encryptor() dsn = encryptor.decrypt(record.dsn_encrypted) return ProjectDbRouting( project_id=record.project_id, db_role=record.db_role, db_type=record.db_type, dsn=dsn, pool_min_size=record.pool_min_size, pool_max_size=record.pool_max_size, ) async def get_geoserver_config( self, project_id: UUID ) -> Optional[ProjectGeoServerInfo]: result = await self.session.execute( select(models.ProjectGeoServerConfig).where( models.ProjectGeoServerConfig.project_id == project_id ) ) record = result.scalar_one_or_none() if not record: return None encryptor = get_encryptor() password = ( encryptor.decrypt(record.gs_admin_password_encrypted) if record.gs_admin_password_encrypted else None ) return ProjectGeoServerInfo( project_id=record.project_id, gs_base_url=record.gs_base_url, gs_admin_user=record.gs_admin_user, gs_admin_password=password, gs_datastore_name=record.gs_datastore_name, default_extent=record.default_extent, srid=record.srid, ) async def list_projects_for_user(self, user_id: UUID) -> List[ProjectSummary]: stmt = ( select(models.Project, models.UserProjectMembership.project_role) .join( models.UserProjectMembership, models.UserProjectMembership.project_id == models.Project.id, ) .where(models.UserProjectMembership.user_id == user_id) .order_by(models.Project.name) ) result = await self.session.execute(stmt) return [ ProjectSummary( project_id=project.id, name=project.name, code=project.code, description=project.description, gs_workspace=project.gs_workspace, status=project.status, project_role=role, ) for project, role in result.all() ] async def list_all_projects(self) -> List[ProjectSummary]: result = await self.session.execute( select(models.Project).order_by(models.Project.name) ) return [ ProjectSummary( project_id=project.id, name=project.name, code=project.code, description=project.description, gs_workspace=project.gs_workspace, status=project.status, project_role="owner", ) for project in result.scalars().all() ]