初步实现数据加密、权限管理、日志审计等功能
This commit is contained in:
106
app/auth/permissions.py
Normal file
106
app/auth/permissions.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""
|
||||
权限控制依赖项和装饰器
|
||||
|
||||
基于角色的访问控制(RBAC)
|
||||
"""
|
||||
from typing import Callable
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from app.domain.models.role import UserRole
|
||||
from app.domain.schemas.user import UserInDB
|
||||
from app.auth.dependencies import get_current_active_user
|
||||
|
||||
def require_role(required_role: UserRole):
|
||||
"""
|
||||
要求特定角色或更高权限
|
||||
|
||||
用法:
|
||||
@router.get("/admin-only")
|
||||
async def admin_endpoint(user: UserInDB = Depends(require_role(UserRole.ADMIN))):
|
||||
...
|
||||
|
||||
Args:
|
||||
required_role: 需要的最低角色
|
||||
|
||||
Returns:
|
||||
依赖函数
|
||||
"""
|
||||
async def role_checker(
|
||||
current_user: UserInDB = Depends(get_current_active_user)
|
||||
) -> UserInDB:
|
||||
user_role = UserRole(current_user.role)
|
||||
|
||||
if not user_role.has_permission(required_role):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Insufficient permissions. Required role: {required_role.value}, "
|
||||
f"Your role: {user_role.value}"
|
||||
)
|
||||
|
||||
return current_user
|
||||
|
||||
return role_checker
|
||||
|
||||
# 预定义的权限检查依赖
|
||||
require_admin = require_role(UserRole.ADMIN)
|
||||
require_operator = require_role(UserRole.OPERATOR)
|
||||
require_user = require_role(UserRole.USER)
|
||||
|
||||
def get_current_admin(
|
||||
current_user: UserInDB = Depends(require_admin)
|
||||
) -> UserInDB:
|
||||
"""
|
||||
获取当前管理员用户
|
||||
|
||||
等同于 Depends(require_role(UserRole.ADMIN))
|
||||
"""
|
||||
return current_user
|
||||
|
||||
def get_current_operator(
|
||||
current_user: UserInDB = Depends(require_operator)
|
||||
) -> UserInDB:
|
||||
"""
|
||||
获取当前操作员用户(或更高权限)
|
||||
|
||||
等同于 Depends(require_role(UserRole.OPERATOR))
|
||||
"""
|
||||
return current_user
|
||||
|
||||
def check_resource_owner(user_id: int, current_user: UserInDB) -> bool:
|
||||
"""
|
||||
检查是否是资源拥有者或管理员
|
||||
|
||||
Args:
|
||||
user_id: 资源拥有者ID
|
||||
current_user: 当前用户
|
||||
|
||||
Returns:
|
||||
是否有权限
|
||||
"""
|
||||
# 管理员可以访问所有资源
|
||||
if UserRole(current_user.role).has_permission(UserRole.ADMIN):
|
||||
return True
|
||||
|
||||
# 检查是否是资源拥有者
|
||||
return current_user.id == user_id
|
||||
|
||||
def require_owner_or_admin(user_id: int):
|
||||
"""
|
||||
要求是资源拥有者或管理员
|
||||
|
||||
Args:
|
||||
user_id: 资源拥有者ID
|
||||
|
||||
Returns:
|
||||
依赖函数
|
||||
"""
|
||||
async def owner_or_admin_checker(
|
||||
current_user: UserInDB = Depends(get_current_active_user)
|
||||
) -> UserInDB:
|
||||
if not check_resource_owner(user_id, current_user):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail="You don't have permission to access this resource"
|
||||
)
|
||||
return current_user
|
||||
|
||||
return owner_or_admin_checker
|
||||
Reference in New Issue
Block a user