107 lines
2.9 KiB
Python
107 lines
2.9 KiB
Python
"""
|
||
权限控制依赖项和装饰器
|
||
|
||
基于角色的访问控制(RBAC)
|
||
"""
|
||
from typing import Callable
|
||
from fastapi import Depends, HTTPException, status
|
||
from app.domain.models.role import UserRole
|
||
from app.domain.schemas.user import UserInDB
|
||
from app.auth.dependencies import get_current_active_user
|
||
|
||
def require_role(required_role: UserRole):
|
||
"""
|
||
要求特定角色或更高权限
|
||
|
||
用法:
|
||
@router.get("/admin-only")
|
||
async def admin_endpoint(user: UserInDB = Depends(require_role(UserRole.ADMIN))):
|
||
...
|
||
|
||
Args:
|
||
required_role: 需要的最低角色
|
||
|
||
Returns:
|
||
依赖函数
|
||
"""
|
||
async def role_checker(
|
||
current_user: UserInDB = Depends(get_current_active_user)
|
||
) -> UserInDB:
|
||
user_role = UserRole(current_user.role)
|
||
|
||
if not user_role.has_permission(required_role):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail=f"Insufficient permissions. Required role: {required_role.value}, "
|
||
f"Your role: {user_role.value}"
|
||
)
|
||
|
||
return current_user
|
||
|
||
return role_checker
|
||
|
||
# 预定义的权限检查依赖
|
||
require_admin = require_role(UserRole.ADMIN)
|
||
require_operator = require_role(UserRole.OPERATOR)
|
||
require_user = require_role(UserRole.USER)
|
||
|
||
def get_current_admin(
|
||
current_user: UserInDB = Depends(require_admin)
|
||
) -> UserInDB:
|
||
"""
|
||
获取当前管理员用户
|
||
|
||
等同于 Depends(require_role(UserRole.ADMIN))
|
||
"""
|
||
return current_user
|
||
|
||
def get_current_operator(
|
||
current_user: UserInDB = Depends(require_operator)
|
||
) -> UserInDB:
|
||
"""
|
||
获取当前操作员用户(或更高权限)
|
||
|
||
等同于 Depends(require_role(UserRole.OPERATOR))
|
||
"""
|
||
return current_user
|
||
|
||
def check_resource_owner(user_id: int, current_user: UserInDB) -> bool:
|
||
"""
|
||
检查是否是资源拥有者或管理员
|
||
|
||
Args:
|
||
user_id: 资源拥有者ID
|
||
current_user: 当前用户
|
||
|
||
Returns:
|
||
是否有权限
|
||
"""
|
||
# 管理员可以访问所有资源
|
||
if UserRole(current_user.role).has_permission(UserRole.ADMIN):
|
||
return True
|
||
|
||
# 检查是否是资源拥有者
|
||
return current_user.id == user_id
|
||
|
||
def require_owner_or_admin(user_id: int):
|
||
"""
|
||
要求是资源拥有者或管理员
|
||
|
||
Args:
|
||
user_id: 资源拥有者ID
|
||
|
||
Returns:
|
||
依赖函数
|
||
"""
|
||
async def owner_or_admin_checker(
|
||
current_user: UserInDB = Depends(get_current_active_user)
|
||
) -> UserInDB:
|
||
if not check_resource_owner(user_id, current_user):
|
||
raise HTTPException(
|
||
status_code=status.HTTP_403_FORBIDDEN,
|
||
detail="You don't have permission to access this resource"
|
||
)
|
||
return current_user
|
||
|
||
return owner_or_admin_checker
|