Files
TJWaterServerBinary/SECURITY_README.md

10 KiB
Raw Blame History

安全功能使用指南

TJWater Server 安全体系实施完成,包含:数据加密、身份认证、权限管理、审计日志

📋 目录

  1. 快速开始
  2. 数据加密
  3. 身份认证
  4. 权限管理
  5. 审计日志
  6. 数据库迁移
  7. API 使用示例

🚀 快速开始

1. 配置环境变量

复制 .env.example.env 并配置:

cp .env.example .env

生成必要的密钥:

# 生成 JWT 密钥
openssl rand -hex 32

# 生成加密密钥
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"

编辑 .env 文件:

SECRET_KEY=your-generated-jwt-secret-key
ENCRYPTION_KEY=your-generated-encryption-key
DB_NAME=tjwater
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASSWORD=your-db-password

2. 执行数据库迁移

# 连接到 PostgreSQL
psql -U postgres -d tjwater

# 执行迁移脚本
\i migrations/001_create_users_table.sql
\i migrations/002_create_audit_logs_table.sql

或使用命令行:

psql -U postgres -d tjwater -f migrations/001_create_users_table.sql
psql -U postgres -d tjwater -f migrations/002_create_audit_logs_table.sql

3. 验证安装

默认创建了两个管理员账号:

  • 用户名: admin / 密码: admin123
  • 用户名: tjwater / 密码: tjwater@123

🔐 数据加密

使用加密器

from app.core.encryption import get_encryptor

encryptor = get_encryptor()

# 加密敏感数据
encrypted_data = encryptor.encrypt("sensitive information")

# 解密
decrypted_data = encryptor.decrypt(encrypted_data)

生成新密钥

from app.core.encryption import Encryptor

new_key = Encryptor.generate_key()
print(f"New encryption key: {new_key}")

👤 身份认证

用户角色

系统定义了 4 个角色(权限由低到高):

角色 权限说明
VIEWER 仅查询权限
USER 读写权限
OPERATOR 操作员,可修改数据
ADMIN 管理员,完全权限

API 接口

用户注册

POST /api/v1/auth/register
Content-Type: application/json

{
  "username": "newuser",
  "email": "user@example.com",
  "password": "password123",
  "role": "USER"
}

用户登录OAuth2 标准)

POST /api/v1/auth/login
Content-Type: application/x-www-form-urlencoded

username=admin&password=admin123

响应:

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "expires_in": 1800
}

用户登录(简化版)

POST /api/v1/auth/login/simple?username=admin&password=admin123

获取当前用户信息

GET /api/v1/auth/me
Authorization: Bearer {access_token}

刷新 Token

POST /api/v1/auth/refresh
Content-Type: application/json

{
  "refresh_token": "your-refresh-token"
}

🔑 权限管理

在 API 中使用权限控制

方式 1: 使用预定义依赖

from fastapi import APIRouter, Depends
from app.auth.permissions import get_current_admin, get_current_operator
from app.domain.schemas.user import UserInDB

router = APIRouter()

@router.post("/admin-only")
async def admin_endpoint(
    current_user: UserInDB = Depends(get_current_admin)
):
    """仅管理员可访问"""
    return {"message": "Admin access granted"}

@router.post("/operator-only")
async def operator_endpoint(
    current_user: UserInDB = Depends(get_current_operator)
):
    """操作员及以上可访问"""
    return {"message": "Operator access granted"}

方式 2: 使用 require_role

from app.auth.permissions import require_role
from app.domain.models.role import UserRole

@router.get("/viewer-access")
async def viewer_endpoint(
    current_user: UserInDB = Depends(require_role(UserRole.VIEWER))
):
    """所有认证用户可访问"""
    return {"data": "visible to all"}

方式 3: 手动检查权限

from app.auth.dependencies import get_current_active_user
from app.auth.permissions import check_resource_owner

@router.put("/users/{user_id}")
async def update_user(
    user_id: int,
    current_user: UserInDB = Depends(get_current_active_user)
):
    """检查是否是资源拥有者或管理员"""
    if not check_resource_owner(user_id, current_user):
        raise HTTPException(status_code=403, detail="Permission denied")
    
    # 执行更新操作
    ...

📝 审计日志

自动审计

使用中间件自动记录关键操作,在 main.py 中添加:

from app.infra.audit.middleware import AuditMiddleware

app.add_middleware(AuditMiddleware)

自动记录:

  • 所有 POST/PUT/DELETE 请求
  • 登录/登出事件
  • 关键资源访问

手动记录审计日志

from app.core.audit import log_audit_event, AuditAction

await log_audit_event(
    action=AuditAction.UPDATE,
    user_id=current_user.id,
    username=current_user.username,
    resource_type="project",
    resource_id="123",
    ip_address=request.client.host,
    request_data={"field": "value"},
    response_status=200
)

查询审计日志

获取所有审计日志(仅管理员)

GET /api/v1/audit/logs?skip=0&limit=100
Authorization: Bearer {admin_token}

按条件过滤

GET /api/v1/audit/logs?user_id=1&action=LOGIN&start_time=2024-01-01T00:00:00
Authorization: Bearer {admin_token}

获取我的操作记录

GET /api/v1/audit/logs/my
Authorization: Bearer {access_token}

获取日志总数

GET /api/v1/audit/logs/count?action=LOGIN
Authorization: Bearer {admin_token}

💾 数据库迁移

用户表结构

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email VARCHAR(100) UNIQUE NOT NULL,
    hashed_password VARCHAR(255) NOT NULL,
    role VARCHAR(20) DEFAULT 'USER' NOT NULL,
    is_active BOOLEAN DEFAULT TRUE NOT NULL,
    is_superuser BOOLEAN DEFAULT FALSE NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);

审计日志表结构

CREATE TABLE audit_logs (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    username VARCHAR(50),
    action VARCHAR(50) NOT NULL,
    resource_type VARCHAR(50),
    resource_id VARCHAR(100),
    ip_address VARCHAR(45),
    user_agent TEXT,
    request_method VARCHAR(10),
    request_path TEXT,
    request_data JSONB,
    response_status INTEGER,
    error_message TEXT,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);

🔧 API 使用示例

Python 客户端示例

import requests

BASE_URL = "http://localhost:8000/api/v1"

# 1. 登录
response = requests.post(
    f"{BASE_URL}/auth/login",
    data={"username": "admin", "password": "admin123"}
)
token = response.json()["access_token"]

# 2. 设置 Authorization Header
headers = {"Authorization": f"Bearer {token}"}

# 3. 获取当前用户信息
response = requests.get(f"{BASE_URL}/auth/me", headers=headers)
print(response.json())

# 4. 创建新用户(需要管理员权限)
response = requests.post(
    f"{BASE_URL}/auth/register",
    headers=headers,
    json={
        "username": "newuser",
        "email": "new@example.com",
        "password": "password123",
        "role": "USER"
    }
)
print(response.json())

# 5. 查询审计日志(需要管理员权限)
response = requests.get(
    f"{BASE_URL}/audit/logs?action=LOGIN",
    headers=headers
)
print(response.json())

cURL 示例

# 登录
curl -X POST "http://localhost:8000/api/v1/auth/login" \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "username=admin&password=admin123"

# 使用 Token 访问受保护接口
TOKEN="your-access-token"
curl -X GET "http://localhost:8000/api/v1/auth/me" \
  -H "Authorization: Bearer $TOKEN"

# 注册新用户
curl -X POST "http://localhost:8000/api/v1/auth/register" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $TOKEN" \
  -d '{
    "username": "testuser",
    "email": "test@example.com",
    "password": "password123",
    "role": "USER"
  }'

🛡️ 安全最佳实践

  1. 密钥管理

    • 绝不在代码中硬编码密钥
    • 定期轮换 JWT 密钥
    • 使用强随机密钥
  2. 密码策略

    • 最小长度 6 个字符(建议 12+
    • 强制密码复杂度(可在注册时添加验证)
    • 定期提醒用户更换密码
  3. Token 管理

    • Access Token 短期有效(默认 30 分钟)
    • Refresh Token 长期有效(默认 7 天)
    • 实施 Token 黑名单(可选)
  4. 审计日志

    • 审计日志不可删除
    • 定期归档旧日志
    • 监控异常登录行为
  5. 权限控制

    • 遵循最小权限原则
    • 定期审查用户权限
    • 记录所有权限变更

📚 相关文件

  • 配置: app/core/config.py
  • 加密: app/core/encryption.py
  • 安全: app/core/security.py
  • 审计: app/core/audit.py
  • 认证: app/api/v1/endpoints/auth.py
  • 权限: app/auth/permissions.py
  • 用户管理: app/api/v1/endpoints/user_management.py
  • 审计日志: app/api/v1/endpoints/audit.py
  • 迁移脚本: migrations/

常见问题

Q: 忘记密码怎么办?

A: 目前需要管理员通过数据库重置。未来可添加邮件重置功能。

-- 重置密码为 "newpassword123"
UPDATE users 
SET hashed_password = '$2b$12$...' -- 使用 bcrypt 生成哈希
WHERE username = 'targetuser';

Q: 如何添加新角色?

A: 编辑 app/domain/models/role.py 中的 UserRole 枚举,并更新数据库约束。

Q: 审计日志占用太多空间?

A: 建议定期归档旧日志到冷存储:

-- 归档 90 天前的日志
CREATE TABLE audit_logs_archive AS
SELECT * FROM audit_logs WHERE timestamp < NOW() - INTERVAL '90 days';

DELETE FROM audit_logs WHERE timestamp < NOW() - INTERVAL '90 days';

📞 技术支持

如有问题,请查看:

  • 日志文件: logs/
  • 数据库表结构: migrations/
  • 单元测试: tests/