Files
TJWaterServerBinary/app/auth/permissions.py

107 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
权限控制依赖项和装饰器
基于角色的访问控制RBAC
"""
from typing import Callable
from fastapi import Depends, HTTPException, status
from app.domain.models.role import UserRole
from app.domain.schemas.user import UserInDB
from app.auth.dependencies import get_current_active_user
def require_role(required_role: UserRole):
"""
要求特定角色或更高权限
用法:
@router.get("/admin-only")
async def admin_endpoint(user: UserInDB = Depends(require_role(UserRole.ADMIN))):
...
Args:
required_role: 需要的最低角色
Returns:
依赖函数
"""
async def role_checker(
current_user: UserInDB = Depends(get_current_active_user)
) -> UserInDB:
user_role = UserRole(current_user.role)
if not user_role.has_permission(required_role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient permissions. Required role: {required_role.value}, "
f"Your role: {user_role.value}"
)
return current_user
return role_checker
# 预定义的权限检查依赖
require_admin = require_role(UserRole.ADMIN)
require_operator = require_role(UserRole.OPERATOR)
require_user = require_role(UserRole.USER)
def get_current_admin(
current_user: UserInDB = Depends(require_admin)
) -> UserInDB:
"""
获取当前管理员用户
等同于 Depends(require_role(UserRole.ADMIN))
"""
return current_user
def get_current_operator(
current_user: UserInDB = Depends(require_operator)
) -> UserInDB:
"""
获取当前操作员用户(或更高权限)
等同于 Depends(require_role(UserRole.OPERATOR))
"""
return current_user
def check_resource_owner(user_id: int, current_user: UserInDB) -> bool:
"""
检查是否是资源拥有者或管理员
Args:
user_id: 资源拥有者ID
current_user: 当前用户
Returns:
是否有权限
"""
# 管理员可以访问所有资源
if UserRole(current_user.role).has_permission(UserRole.ADMIN):
return True
# 检查是否是资源拥有者
return current_user.id == user_id
def require_owner_or_admin(user_id: int):
"""
要求是资源拥有者或管理员
Args:
user_id: 资源拥有者ID
Returns:
依赖函数
"""
async def owner_or_admin_checker(
current_user: UserInDB = Depends(get_current_active_user)
) -> UserInDB:
if not check_resource_owner(user_id, current_user):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="You don't have permission to access this resource"
)
return current_user
return owner_or_admin_checker