更新metadb引用路径

This commit is contained in:
2026-03-13 16:22:13 +08:00
parent 1673396e1a
commit 1a76c89054
9 changed files with 165 additions and 115 deletions
+29 -10
View File
@@ -3,12 +3,13 @@
仅管理员可访问
"""
from typing import List, Optional
from uuid import UUID
from datetime import datetime
from fastapi import APIRouter, Depends, Query, Path
from app.domain.schemas.audit import AuditLogResponse
from app.infra.repositories.audit_repository import AuditRepository
from app.infra.db.metadb.repositories.audit_repository import AuditRepository
from app.auth.metadata_dependencies import (
get_current_metadata_admin,
get_current_metadata_user,
@@ -18,13 +19,20 @@ from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter()
async def get_audit_repository(
session: AsyncSession = Depends(get_metadata_session),
) -> AuditRepository:
"""获取审计日志仓储"""
return AuditRepository(session)
@router.get("/logs", summary="查询审计日志", description="查询审计日志(仅管理员)", response_model=List[AuditLogResponse])
@router.get(
"/logs",
summary="查询审计日志",
description="查询审计日志(仅管理员)",
response_model=List[AuditLogResponse],
)
async def get_audit_logs(
user_id: Optional[UUID] = Query(None, description="按用户ID过滤"),
project_id: Optional[UUID] = Query(None, description="按项目ID过滤"),
@@ -39,7 +47,7 @@ async def get_audit_logs(
) -> List[AuditLogResponse]:
"""
查询审计日志
支持按用户、时间、操作类型等条件过滤,仅管理员可访问
"""
logs = await audit_repo.get_logs(
@@ -50,11 +58,16 @@ async def get_audit_logs(
start_time=start_time,
end_time=end_time,
skip=skip,
limit=limit
limit=limit,
)
return logs
@router.get("/logs/count", summary="获取审计日志总数", description="获取审计日志总数(仅管理员)")
@router.get(
"/logs/count",
summary="获取审计日志总数",
description="获取审计日志总数(仅管理员)",
)
async def get_audit_logs_count(
user_id: Optional[UUID] = Query(None, description="按用户ID过滤"),
project_id: Optional[UUID] = Query(None, description="按项目ID过滤"),
@@ -67,7 +80,7 @@ async def get_audit_logs_count(
) -> dict:
"""
获取审计日志总数
获取符合条件的审计日志的总数,仅管理员可访问
"""
count = await audit_repo.get_log_count(
@@ -76,11 +89,17 @@ async def get_audit_logs_count(
action=action,
resource_type=resource_type,
start_time=start_time,
end_time=end_time
end_time=end_time,
)
return {"count": count}
@router.get("/logs/my", summary="查询我的审计日志", description="查询当前用户的审计日志", response_model=List[AuditLogResponse])
@router.get(
"/logs/my",
summary="查询我的审计日志",
description="查询当前用户的审计日志",
response_model=List[AuditLogResponse],
)
async def get_my_audit_logs(
action: Optional[str] = Query(None, description="按操作类型过滤"),
start_time: Optional[datetime] = Query(None, description="开始时间"),
@@ -92,7 +111,7 @@ async def get_my_audit_logs(
) -> List[AuditLogResponse]:
"""
查询当前用户的审计日志
普通用户只能查看自己的操作记录
"""
logs = await audit_repo.get_logs(
@@ -101,6 +120,6 @@ async def get_my_audit_logs(
start_time=start_time,
end_time=end_time,
skip=skip,
limit=limit
limit=limit,
)
return logs
+48 -44
View File
@@ -5,7 +5,7 @@ from fastapi.security import OAuth2PasswordRequestForm
from app.core.config import settings
from app.core.security import create_access_token, create_refresh_token, verify_password
from app.domain.schemas.user import UserCreate, UserResponse, UserLogin, Token
from app.infra.repositories.user_repository import UserRepository
from app.infra.db.metadb.repositories.user_repository import UserRepository
from app.auth.dependencies import get_user_repository, get_current_active_user
from app.domain.schemas.user import UserInDB
import logging
@@ -14,53 +14,55 @@ logger = logging.getLogger(__name__)
router = APIRouter()
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
@router.post(
"/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED
)
async def register(
user_data: UserCreate,
user_repo: UserRepository = Depends(get_user_repository)
user_data: UserCreate, user_repo: UserRepository = Depends(get_user_repository)
) -> UserResponse:
"""
用户注册
创建新用户账号
"""
# 检查用户名和邮箱是否已存在
if await user_repo.user_exists(username=user_data.username):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already registered"
detail="Username already registered",
)
if await user_repo.user_exists(email=user_data.email):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered"
status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered"
)
# 创建用户
try:
user = await user_repo.create_user(user_data)
if not user:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user"
detail="Failed to create user",
)
return UserResponse.model_validate(user)
except Exception as e:
logger.error(f"Error during user registration: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Registration failed"
detail="Registration failed",
)
@router.post("/login", response_model=Token)
async def login(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> Token:
"""
用户登录(OAuth2 标准格式)
返回 JWT Access Token 和 Refresh Token
"""
# 验证用户(支持用户名或邮箱登录)
@@ -68,119 +70,121 @@ async def login(
if not user:
# 尝试用邮箱登录
user = await user_repo.get_user_by_email(form_data.username)
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user account"
status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user account"
)
# 生成 Token
access_token = create_access_token(subject=user.username)
refresh_token = create_refresh_token(subject=user.username)
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
@router.post("/login/simple", response_model=Token)
async def login_simple(
username: str,
password: str,
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> Token:
"""
简化版登录接口(保持向后兼容)
直接使用 username 和 password 参数
"""
# 验证用户
user = await user_repo.get_user_by_username(username)
if not user:
user = await user_repo.get_user_by_email(username)
if not user or not verify_password(password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password"
detail="Incorrect username or password",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user account"
status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user account"
)
# 生成 Token
access_token = create_access_token(subject=user.username)
refresh_token = create_refresh_token(subject=user.username)
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
@router.get("/me", response_model=UserResponse)
async def get_current_user_info(
current_user: UserInDB = Depends(get_current_active_user)
current_user: UserInDB = Depends(get_current_active_user),
) -> UserResponse:
"""
获取当前登录用户信息
"""
return UserResponse.model_validate(current_user)
@router.post("/refresh", response_model=Token)
async def refresh_token(
refresh_token: str,
user_repo: UserRepository = Depends(get_user_repository)
refresh_token: str, user_repo: UserRepository = Depends(get_user_repository)
) -> Token:
"""
刷新 Access Token
使用 Refresh Token 获取新的 Access Token
"""
from jose import jwt, JWTError
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
payload = jwt.decode(
refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
username: str = payload.get("sub")
token_type: str = payload.get("type")
if username is None or token_type != "refresh":
raise credentials_exception
except JWTError:
raise credentials_exception
# 验证用户仍然存在且激活
user = await user_repo.get_user_by_username(username)
if not user or not user.is_active:
raise credentials_exception
# 生成新的 Access Token
new_access_token = create_access_token(subject=user.username)
return Token(
access_token=new_access_token,
refresh_token=refresh_token, # 保持原 refresh token
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
+1 -1
View File
@@ -19,7 +19,7 @@ from app.domain.schemas.metadata import (
ProjectMetaResponse,
ProjectSummaryResponse,
)
from app.infra.repositories.metadata_repository import MetadataRepository
from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository
router = APIRouter()
logger = logging.getLogger(__name__)
+76 -49
View File
@@ -3,186 +3,213 @@
演示权限控制的使用
"""
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status, Path, Query
from app.domain.schemas.user import UserResponse, UserUpdate, UserCreate
from app.domain.models.role import UserRole
from app.domain.schemas.user import UserInDB
from app.infra.repositories.user_repository import UserRepository
from app.infra.db.metadb.repositories.user_repository import UserRepository
from app.auth.dependencies import get_user_repository, get_current_active_user
from app.auth.permissions import get_current_admin, require_role, check_resource_owner
router = APIRouter()
@router.get("/", summary="列出所有用户", description="获取用户列表(仅管理员)", response_model=List[UserResponse])
@router.get(
"/",
summary="列出所有用户",
description="获取用户列表(仅管理员)",
response_model=List[UserResponse],
)
async def list_users(
skip: int = Query(0, ge=0, description="跳过的用户数"),
limit: int = Query(100, ge=1, le=1000, description="返回的最大用户数"),
current_user: UserInDB = Depends(require_role(UserRole.ADMIN)),
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> List[UserResponse]:
"""
获取用户列表
获取系统中所有的用户信息(需要管理员权限)
"""
users = await user_repo.get_all_users(skip=skip, limit=limit)
return [UserResponse.model_validate(user) for user in users]
@router.get("/{user_id}", summary="获取用户详情", description="获取指定用户的详细信息", response_model=UserResponse)
@router.get(
"/{user_id}",
summary="获取用户详情",
description="获取指定用户的详细信息",
response_model=UserResponse,
)
async def get_user(
user_id: int = Path(..., gt=0, description="用户ID"),
current_user: UserInDB = Depends(get_current_active_user),
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> UserResponse:
"""
获取用户详情
管理员可查看所有用户,普通用户只能查看自己
"""
# 检查权限
if not check_resource_owner(user_id, current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to view this user"
detail="You don't have permission to view this user",
)
user = await user_repo.get_user_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return UserResponse.model_validate(user)
@router.put("/{user_id}", summary="更新用户信息", description="更新指定用户的信息", response_model=UserResponse)
@router.put(
"/{user_id}",
summary="更新用户信息",
description="更新指定用户的信息",
response_model=UserResponse,
)
async def update_user(
user_id: int = Path(..., gt=0, description="用户ID"),
user_update: UserUpdate = None,
current_user: UserInDB = Depends(get_current_active_user),
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> UserResponse:
"""
更新用户信息
管理员可更新所有用户,普通用户只能更新自己(且不能修改角色)
"""
# 检查用户是否存在
target_user = await user_repo.get_user_by_id(user_id)
if not target_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
# 权限检查
is_owner = current_user.id == user_id
is_admin = UserRole(current_user.role).has_permission(UserRole.ADMIN)
if not is_owner and not is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to update this user"
detail="You don't have permission to update this user",
)
# 非管理员不能修改角色和激活状态
if not is_admin:
if user_update.role is not None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can change user roles"
detail="Only admins can change user roles",
)
if user_update.is_active is not None:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Only admins can change user active status"
detail="Only admins can change user active status",
)
# 更新用户
updated_user = await user_repo.update_user(user_id, user_update)
if not updated_user:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to update user"
detail="Failed to update user",
)
return UserResponse.model_validate(updated_user)
@router.delete("/{user_id}", summary="删除用户", description="删除指定用户(仅管理员)")
async def delete_user(
user_id: int = Path(..., gt=0, description="用户ID"),
current_user: UserInDB = Depends(get_current_admin),
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> dict:
"""
删除用户
删除指定用户(需要管理员权限,不能删除自己)
"""
# 不能删除自己
if current_user.id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You cannot delete your own account"
detail="You cannot delete your own account",
)
success = await user_repo.delete_user(user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return {"message": "User deleted successfully"}
@router.post("/{user_id}/activate", summary="激活用户", description="激活指定用户账户(仅管理员)", response_model=UserResponse)
@router.post(
"/{user_id}/activate",
summary="激活用户",
description="激活指定用户账户(仅管理员)",
response_model=UserResponse,
)
async def activate_user(
user_id: int = Path(..., gt=0, description="用户ID"),
current_user: UserInDB = Depends(get_current_admin),
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> UserResponse:
"""
激活用户
激活指定用户的账户(需要管理员权限)
"""
user_update = UserUpdate(is_active=True)
updated_user = await user_repo.update_user(user_id, user_update)
if not updated_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return UserResponse.model_validate(updated_user)
@router.post("/{user_id}/deactivate", summary="停用用户", description="停用指定用户账户(仅管理员)", response_model=UserResponse)
@router.post(
"/{user_id}/deactivate",
summary="停用用户",
description="停用指定用户账户(仅管理员)",
response_model=UserResponse,
)
async def deactivate_user(
user_id: int = Path(..., gt=0, description="用户ID"),
current_user: UserInDB = Depends(get_current_admin),
user_repo: UserRepository = Depends(get_user_repository)
user_repo: UserRepository = Depends(get_user_repository),
) -> UserResponse:
"""
停用用户
停用指定用户的账户(需要管理员权限,不能停用自己)
"""
# 不能停用自己
if current_user.id == user_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="You cannot deactivate your own account"
detail="You cannot deactivate your own account",
)
user_update = UserUpdate(is_active=False)
updated_user = await user_repo.update_user(user_id, user_update)
if not updated_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
)
return UserResponse.model_validate(updated_user)
+1 -1
View File
@@ -4,7 +4,7 @@ from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from app.core.config import settings
from app.domain.schemas.user import UserInDB, TokenPayload
from app.infra.repositories.user_repository import UserRepository
from app.infra.db.metadb.repositories.user_repository import UserRepository
from app.infra.db.postgresql.database import Database
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
+1 -1
View File
@@ -9,7 +9,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.keycloak_dependencies import get_current_keycloak_sub
from app.core.config import settings
from app.infra.db.metadb.database import get_metadata_session
from app.infra.repositories.metadata_repository import MetadataRepository
from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository
logger = logging.getLogger(__name__)
+1 -1
View File
@@ -12,7 +12,7 @@ from app.auth.keycloak_dependencies import get_current_keycloak_sub
from app.core.config import settings
from app.infra.db.dynamic_manager import project_connection_manager
from app.infra.db.metadb.database import get_metadata_session
from app.infra.repositories.metadata_repository import MetadataRepository
from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository
DB_ROLE_BIZ_DATA = "biz_data"
DB_ROLE_IOT_DATA = "iot_data"
+1 -1
View File
@@ -16,7 +16,7 @@ from jose import JWTError, jwt
from app.core.config import settings
from app.infra.db.metadb.database import SessionLocal
from app.infra.repositories.metadata_repository import MetadataRepository
from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository
logger = logging.getLogger(__name__)
@@ -6,7 +6,7 @@ from uuid import uuid4
import pytest
from cryptography.fernet import InvalidToken
from app.infra.repositories.metadata_repository import MetadataRepository
from app.infra.db.metadb.repositories.metadata_repository import MetadataRepository
class _DummyResult:
@@ -51,11 +51,11 @@ def test_invalid_token_with_plaintext_dsn_value_raises_clear_error(monkeypatch):
repo = MetadataRepository(session)
monkeypatch.setattr(
"app.infra.repositories.metadata_repository.is_database_encryption_configured",
"app.infra.db.metadb.repositories.metadata_repository.is_database_encryption_configured",
lambda: True,
)
monkeypatch.setattr(
"app.infra.repositories.metadata_repository.get_database_encryptor",
"app.infra.db.metadb.repositories.metadata_repository.get_database_encryptor",
lambda: encryptor,
)
@@ -78,11 +78,11 @@ def test_invalid_token_with_non_dsn_value_raises_clear_error(monkeypatch):
repo = MetadataRepository(session)
monkeypatch.setattr(
"app.infra.repositories.metadata_repository.is_database_encryption_configured",
"app.infra.db.metadb.repositories.metadata_repository.is_database_encryption_configured",
lambda: True,
)
monkeypatch.setattr(
"app.infra.repositories.metadata_repository.get_database_encryptor",
"app.infra.db.metadb.repositories.metadata_repository.get_database_encryptor",
lambda: _DummyEncryptor(raise_invalid_token=True),
)
@@ -105,11 +105,11 @@ def test_encrypted_dsn_decrypts_without_migration(monkeypatch):
repo = MetadataRepository(session)
monkeypatch.setattr(
"app.infra.repositories.metadata_repository.is_database_encryption_configured",
"app.infra.db.metadb.repositories.metadata_repository.is_database_encryption_configured",
lambda: True,
)
monkeypatch.setattr(
"app.infra.repositories.metadata_repository.get_database_encryptor",
"app.infra.db.metadb.repositories.metadata_repository.get_database_encryptor",
lambda: _DummyEncryptor(decrypted="postgresql://u:p@ss@host/db"),
)