From 1a76c89054146d351e7400c25f12581478a13dfe Mon Sep 17 00:00:00 2001 From: Jiang Date: Fri, 13 Mar 2026 16:22:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0metadb=E5=BC=95=E7=94=A8?= =?UTF-8?q?=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/v1/endpoints/audit.py | 39 ++++-- app/api/v1/endpoints/auth.py | 92 +++++++------ app/api/v1/endpoints/meta.py | 2 +- app/api/v1/endpoints/user_management.py | 125 +++++++++++------- app/auth/dependencies.py | 2 +- app/auth/metadata_dependencies.py | 2 +- app/auth/project_dependencies.py | 2 +- app/infra/audit/middleware.py | 2 +- .../test_metadata_repository_dsn_decrypt.py | 14 +- 9 files changed, 165 insertions(+), 115 deletions(-) diff --git a/app/api/v1/endpoints/audit.py b/app/api/v1/endpoints/audit.py index 15d24a5..dd0d7d6 100644 --- a/app/api/v1/endpoints/audit.py +++ b/app/api/v1/endpoints/audit.py @@ -3,12 +3,13 @@ 仅管理员可访问 """ + from typing import List, Optional from uuid import UUID from datetime import datetime from fastapi import APIRouter, Depends, Query, Path from app.domain.schemas.audit import AuditLogResponse -from app.infra.repositories.audit_repository import AuditRepository +from app.infra.db.metadb.repositories.audit_repository import AuditRepository from app.auth.metadata_dependencies import ( get_current_metadata_admin, get_current_metadata_user, @@ -18,13 +19,20 @@ from sqlalchemy.ext.asyncio import AsyncSession router = APIRouter() + async def get_audit_repository( session: AsyncSession = Depends(get_metadata_session), ) -> AuditRepository: """获取审计日志仓储""" return AuditRepository(session) -@router.get("/logs", summary="查询审计日志", description="查询审计日志(仅管理员)", response_model=List[AuditLogResponse]) + +@router.get( + "/logs", + summary="查询审计日志", + description="查询审计日志(仅管理员)", + response_model=List[AuditLogResponse], +) async def get_audit_logs( user_id: Optional[UUID] = Query(None, description="按用户ID过滤"), project_id: Optional[UUID] = Query(None, description="按项目ID过滤"), @@ -39,7 +47,7 @@ async def get_audit_logs( ) -> List[AuditLogResponse]: """ 查询审计日志 - + 支持按用户、时间、操作类型等条件过滤,仅管理员可访问 """ logs = await audit_repo.get_logs( @@ -50,11 +58,16 @@ async def get_audit_logs( start_time=start_time, end_time=end_time, skip=skip, - limit=limit + limit=limit, ) return logs -@router.get("/logs/count", summary="获取审计日志总数", description="获取审计日志总数(仅管理员)") + +@router.get( + "/logs/count", + summary="获取审计日志总数", + description="获取审计日志总数(仅管理员)", +) async def get_audit_logs_count( user_id: Optional[UUID] = Query(None, description="按用户ID过滤"), project_id: Optional[UUID] = Query(None, description="按项目ID过滤"), @@ -67,7 +80,7 @@ async def get_audit_logs_count( ) -> dict: """ 获取审计日志总数 - + 获取符合条件的审计日志的总数,仅管理员可访问 """ count = await audit_repo.get_log_count( @@ -76,11 +89,17 @@ async def get_audit_logs_count( action=action, resource_type=resource_type, start_time=start_time, - end_time=end_time + end_time=end_time, ) return {"count": count} -@router.get("/logs/my", summary="查询我的审计日志", description="查询当前用户的审计日志", response_model=List[AuditLogResponse]) + +@router.get( + "/logs/my", + summary="查询我的审计日志", + description="查询当前用户的审计日志", + response_model=List[AuditLogResponse], +) async def get_my_audit_logs( action: Optional[str] = Query(None, description="按操作类型过滤"), start_time: Optional[datetime] = Query(None, description="开始时间"), @@ -92,7 +111,7 @@ async def get_my_audit_logs( ) -> List[AuditLogResponse]: """ 查询当前用户的审计日志 - + 普通用户只能查看自己的操作记录 """ logs = await audit_repo.get_logs( @@ -101,6 +120,6 @@ async def get_my_audit_logs( start_time=start_time, end_time=end_time, skip=skip, - limit=limit + limit=limit, ) return logs diff --git a/app/api/v1/endpoints/auth.py b/app/api/v1/endpoints/auth.py index 66b5cbd..819a094 100644 --- a/app/api/v1/endpoints/auth.py +++ b/app/api/v1/endpoints/auth.py @@ -5,7 +5,7 @@ from fastapi.security import OAuth2PasswordRequestForm from app.core.config import settings from app.core.security import create_access_token, create_refresh_token, verify_password from app.domain.schemas.user import UserCreate, UserResponse, UserLogin, Token -from app.infra.repositories.user_repository import UserRepository +from app.infra.db.metadb.repositories.user_repository import UserRepository from app.auth.dependencies import get_user_repository, get_current_active_user from app.domain.schemas.user import UserInDB import logging @@ -14,53 +14,55 @@ logger = logging.getLogger(__name__) router = APIRouter() -@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED) + +@router.post( + "/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED +) async def register( - user_data: UserCreate, - user_repo: UserRepository = Depends(get_user_repository) + user_data: UserCreate, user_repo: UserRepository = Depends(get_user_repository) ) -> UserResponse: """ 用户注册 - + 创建新用户账号 """ # 检查用户名和邮箱是否已存在 if await user_repo.user_exists(username=user_data.username): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Username already registered" + detail="Username already registered", ) - + if await user_repo.user_exists(email=user_data.email): raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail="Email already registered" + status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered" ) - + # 创建用户 try: user = await user_repo.create_user(user_data) if not user: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to create user" + detail="Failed to create user", ) return UserResponse.model_validate(user) except Exception as e: logger.error(f"Error during user registration: {e}") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Registration failed" + detail="Registration failed", ) + @router.post("/login", response_model=Token) async def login( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> Token: """ 用户登录(OAuth2 标准格式) - + 返回 JWT Access Token 和 Refresh Token """ # 验证用户(支持用户名或邮箱登录) @@ -68,119 +70,121 @@ async def login( if not user: # 尝试用邮箱登录 user = await user_repo.get_user_by_email(form_data.username) - + if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) - + if not user.is_active: raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Inactive user account" + status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user account" ) - + # 生成 Token access_token = create_access_token(subject=user.username) refresh_token = create_refresh_token(subject=user.username) - + return Token( access_token=access_token, refresh_token=refresh_token, token_type="bearer", - expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 + expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, ) + @router.post("/login/simple", response_model=Token) async def login_simple( username: str, password: str, - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> Token: """ 简化版登录接口(保持向后兼容) - + 直接使用 username 和 password 参数 """ # 验证用户 user = await user_repo.get_user_by_username(username) if not user: user = await user_repo.get_user_by_email(username) - + if not user or not verify_password(password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, - detail="Incorrect username or password" + detail="Incorrect username or password", ) - + if not user.is_active: raise HTTPException( - status_code=status.HTTP_403_FORBIDDEN, - detail="Inactive user account" + status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user account" ) - + # 生成 Token access_token = create_access_token(subject=user.username) refresh_token = create_refresh_token(subject=user.username) - + return Token( access_token=access_token, refresh_token=refresh_token, token_type="bearer", - expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 + expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, ) + @router.get("/me", response_model=UserResponse) async def get_current_user_info( - current_user: UserInDB = Depends(get_current_active_user) + current_user: UserInDB = Depends(get_current_active_user), ) -> UserResponse: """ 获取当前登录用户信息 """ return UserResponse.model_validate(current_user) + @router.post("/refresh", response_model=Token) async def refresh_token( - refresh_token: str, - user_repo: UserRepository = Depends(get_user_repository) + refresh_token: str, user_repo: UserRepository = Depends(get_user_repository) ) -> Token: """ 刷新 Access Token - + 使用 Refresh Token 获取新的 Access Token """ from jose import jwt, JWTError - + credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate refresh token", headers={"WWW-Authenticate": "Bearer"}, ) - + try: - payload = jwt.decode(refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) + payload = jwt.decode( + refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM] + ) username: str = payload.get("sub") token_type: str = payload.get("type") - + if username is None or token_type != "refresh": raise credentials_exception - + except JWTError: raise credentials_exception - + # 验证用户仍然存在且激活 user = await user_repo.get_user_by_username(username) if not user or not user.is_active: raise credentials_exception - + # 生成新的 Access Token new_access_token = create_access_token(subject=user.username) - + return Token( access_token=new_access_token, refresh_token=refresh_token, # 保持原 refresh token token_type="bearer", - expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 + expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, ) diff --git a/app/api/v1/endpoints/meta.py b/app/api/v1/endpoints/meta.py index 1e46710..455189e 100644 --- a/app/api/v1/endpoints/meta.py +++ b/app/api/v1/endpoints/meta.py @@ -19,7 +19,7 @@ from app.domain.schemas.metadata import ( ProjectMetaResponse, ProjectSummaryResponse, ) -from app.infra.repositories.metadata_repository import MetadataRepository +from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository router = APIRouter() logger = logging.getLogger(__name__) diff --git a/app/api/v1/endpoints/user_management.py b/app/api/v1/endpoints/user_management.py index af8b1ce..72e40f0 100644 --- a/app/api/v1/endpoints/user_management.py +++ b/app/api/v1/endpoints/user_management.py @@ -3,186 +3,213 @@ 演示权限控制的使用 """ + from typing import List from fastapi import APIRouter, Depends, HTTPException, status, Path, Query from app.domain.schemas.user import UserResponse, UserUpdate, UserCreate from app.domain.models.role import UserRole from app.domain.schemas.user import UserInDB -from app.infra.repositories.user_repository import UserRepository +from app.infra.db.metadb.repositories.user_repository import UserRepository from app.auth.dependencies import get_user_repository, get_current_active_user from app.auth.permissions import get_current_admin, require_role, check_resource_owner router = APIRouter() -@router.get("/", summary="列出所有用户", description="获取用户列表(仅管理员)", response_model=List[UserResponse]) + +@router.get( + "/", + summary="列出所有用户", + description="获取用户列表(仅管理员)", + response_model=List[UserResponse], +) async def list_users( skip: int = Query(0, ge=0, description="跳过的用户数"), limit: int = Query(100, ge=1, le=1000, description="返回的最大用户数"), current_user: UserInDB = Depends(require_role(UserRole.ADMIN)), - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> List[UserResponse]: """ 获取用户列表 - + 获取系统中所有的用户信息(需要管理员权限) """ users = await user_repo.get_all_users(skip=skip, limit=limit) return [UserResponse.model_validate(user) for user in users] -@router.get("/{user_id}", summary="获取用户详情", description="获取指定用户的详细信息", response_model=UserResponse) + +@router.get( + "/{user_id}", + summary="获取用户详情", + description="获取指定用户的详细信息", + response_model=UserResponse, +) async def get_user( user_id: int = Path(..., gt=0, description="用户ID"), current_user: UserInDB = Depends(get_current_active_user), - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> UserResponse: """ 获取用户详情 - + 管理员可查看所有用户,普通用户只能查看自己 """ # 检查权限 if not check_resource_owner(user_id, current_user): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="You don't have permission to view this user" + detail="You don't have permission to view this user", ) - + user = await user_repo.get_user_by_id(user_id) if not user: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="User not found" + status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) - + return UserResponse.model_validate(user) -@router.put("/{user_id}", summary="更新用户信息", description="更新指定用户的信息", response_model=UserResponse) + +@router.put( + "/{user_id}", + summary="更新用户信息", + description="更新指定用户的信息", + response_model=UserResponse, +) async def update_user( user_id: int = Path(..., gt=0, description="用户ID"), user_update: UserUpdate = None, current_user: UserInDB = Depends(get_current_active_user), - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> UserResponse: """ 更新用户信息 - + 管理员可更新所有用户,普通用户只能更新自己(且不能修改角色) """ # 检查用户是否存在 target_user = await user_repo.get_user_by_id(user_id) if not target_user: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="User not found" + status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) - + # 权限检查 is_owner = current_user.id == user_id is_admin = UserRole(current_user.role).has_permission(UserRole.ADMIN) - + if not is_owner and not is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="You don't have permission to update this user" + detail="You don't have permission to update this user", ) - + # 非管理员不能修改角色和激活状态 if not is_admin: if user_update.role is not None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Only admins can change user roles" + detail="Only admins can change user roles", ) if user_update.is_active is not None: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, - detail="Only admins can change user active status" + detail="Only admins can change user active status", ) - + # 更新用户 updated_user = await user_repo.update_user(user_id, user_update) if not updated_user: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail="Failed to update user" + detail="Failed to update user", ) - + return UserResponse.model_validate(updated_user) + @router.delete("/{user_id}", summary="删除用户", description="删除指定用户(仅管理员)") async def delete_user( user_id: int = Path(..., gt=0, description="用户ID"), current_user: UserInDB = Depends(get_current_admin), - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> dict: """ 删除用户 - + 删除指定用户(需要管理员权限,不能删除自己) """ # 不能删除自己 if current_user.id == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="You cannot delete your own account" + detail="You cannot delete your own account", ) - + success = await user_repo.delete_user(user_id) if not success: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="User not found" + status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) - + return {"message": "User deleted successfully"} -@router.post("/{user_id}/activate", summary="激活用户", description="激活指定用户账户(仅管理员)", response_model=UserResponse) + +@router.post( + "/{user_id}/activate", + summary="激活用户", + description="激活指定用户账户(仅管理员)", + response_model=UserResponse, +) async def activate_user( user_id: int = Path(..., gt=0, description="用户ID"), current_user: UserInDB = Depends(get_current_admin), - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> UserResponse: """ 激活用户 - + 激活指定用户的账户(需要管理员权限) """ user_update = UserUpdate(is_active=True) updated_user = await user_repo.update_user(user_id, user_update) - + if not updated_user: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="User not found" + status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) - + return UserResponse.model_validate(updated_user) -@router.post("/{user_id}/deactivate", summary="停用用户", description="停用指定用户账户(仅管理员)", response_model=UserResponse) + +@router.post( + "/{user_id}/deactivate", + summary="停用用户", + description="停用指定用户账户(仅管理员)", + response_model=UserResponse, +) async def deactivate_user( user_id: int = Path(..., gt=0, description="用户ID"), current_user: UserInDB = Depends(get_current_admin), - user_repo: UserRepository = Depends(get_user_repository) + user_repo: UserRepository = Depends(get_user_repository), ) -> UserResponse: """ 停用用户 - + 停用指定用户的账户(需要管理员权限,不能停用自己) """ # 不能停用自己 if current_user.id == user_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="You cannot deactivate your own account" + detail="You cannot deactivate your own account", ) - + user_update = UserUpdate(is_active=False) updated_user = await user_repo.update_user(user_id, user_update) - + if not updated_user: raise HTTPException( - status_code=status.HTTP_404_NOT_FOUND, - detail="User not found" + status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) - + return UserResponse.model_validate(updated_user) diff --git a/app/auth/dependencies.py b/app/auth/dependencies.py index 8b7bff3..3524f0a 100644 --- a/app/auth/dependencies.py +++ b/app/auth/dependencies.py @@ -4,7 +4,7 @@ from fastapi.security import OAuth2PasswordBearer from jose import jwt, JWTError from app.core.config import settings from app.domain.schemas.user import UserInDB, TokenPayload -from app.infra.repositories.user_repository import UserRepository +from app.infra.db.metadb.repositories.user_repository import UserRepository from app.infra.db.postgresql.database import Database oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login") diff --git a/app/auth/metadata_dependencies.py b/app/auth/metadata_dependencies.py index 50e22fa..8424429 100644 --- a/app/auth/metadata_dependencies.py +++ b/app/auth/metadata_dependencies.py @@ -9,7 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.auth.keycloak_dependencies import get_current_keycloak_sub from app.core.config import settings from app.infra.db.metadb.database import get_metadata_session -from app.infra.repositories.metadata_repository import MetadataRepository +from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository logger = logging.getLogger(__name__) diff --git a/app/auth/project_dependencies.py b/app/auth/project_dependencies.py index fd65703..6513c93 100644 --- a/app/auth/project_dependencies.py +++ b/app/auth/project_dependencies.py @@ -12,7 +12,7 @@ from app.auth.keycloak_dependencies import get_current_keycloak_sub from app.core.config import settings from app.infra.db.dynamic_manager import project_connection_manager from app.infra.db.metadb.database import get_metadata_session -from app.infra.repositories.metadata_repository import MetadataRepository +from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository DB_ROLE_BIZ_DATA = "biz_data" DB_ROLE_IOT_DATA = "iot_data" diff --git a/app/infra/audit/middleware.py b/app/infra/audit/middleware.py index b04b2b2..eb94fe8 100644 --- a/app/infra/audit/middleware.py +++ b/app/infra/audit/middleware.py @@ -16,7 +16,7 @@ from jose import JWTError, jwt from app.core.config import settings from app.infra.db.metadb.database import SessionLocal -from app.infra.repositories.metadata_repository import MetadataRepository +from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository logger = logging.getLogger(__name__) diff --git a/tests/unit/test_metadata_repository_dsn_decrypt.py b/tests/unit/test_metadata_repository_dsn_decrypt.py index 02796e1..e1a74f8 100644 --- a/tests/unit/test_metadata_repository_dsn_decrypt.py +++ b/tests/unit/test_metadata_repository_dsn_decrypt.py @@ -6,7 +6,7 @@ from uuid import uuid4 import pytest from cryptography.fernet import InvalidToken -from app.infra.repositories.metadata_repository import MetadataRepository +from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository class _DummyResult: @@ -51,11 +51,11 @@ def test_invalid_token_with_plaintext_dsn_value_raises_clear_error(monkeypatch): repo = MetadataRepository(session) monkeypatch.setattr( - "app.infra.repositories.metadata_repository.is_database_encryption_configured", + "app.infra.db.metadb.repositories.metadata_repository.is_database_encryption_configured", lambda: True, ) monkeypatch.setattr( - "app.infra.repositories.metadata_repository.get_database_encryptor", + "app.infra.db.metadb.repositories.metadata_repository.get_database_encryptor", lambda: encryptor, ) @@ -78,11 +78,11 @@ def test_invalid_token_with_non_dsn_value_raises_clear_error(monkeypatch): repo = MetadataRepository(session) monkeypatch.setattr( - "app.infra.repositories.metadata_repository.is_database_encryption_configured", + "app.infra.db.metadb.repositories.metadata_repository.is_database_encryption_configured", lambda: True, ) monkeypatch.setattr( - "app.infra.repositories.metadata_repository.get_database_encryptor", + "app.infra.db.metadb.repositories.metadata_repository.get_database_encryptor", lambda: _DummyEncryptor(raise_invalid_token=True), ) @@ -105,11 +105,11 @@ def test_encrypted_dsn_decrypts_without_migration(monkeypatch): repo = MetadataRepository(session) monkeypatch.setattr( - "app.infra.repositories.metadata_repository.is_database_encryption_configured", + "app.infra.db.metadb.repositories.metadata_repository.is_database_encryption_configured", lambda: True, ) monkeypatch.setattr( - "app.infra.repositories.metadata_repository.get_database_encryptor", + "app.infra.db.metadb.repositories.metadata_repository.get_database_encryptor", lambda: _DummyEncryptor(decrypted="postgresql://u:p@ss@host/db"), )