Files
TJWaterServerBinary/SECURITY_README.md

500 lines
10 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 安全功能使用指南
TJWater Server 安全体系实施完成,包含:数据加密、身份认证、权限管理、审计日志
## 📋 目录
1. [快速开始](#快速开始)
2. [数据加密](#数据加密)
3. [身份认证](#身份认证)
4. [权限管理](#权限管理)
5. [审计日志](#审计日志)
6. [数据库迁移](#数据库迁移)
7. [API 使用示例](#api-使用示例)
---
## 🚀 快速开始
### 1. 配置环境变量
复制 `.env.example``.env` 并配置:
```bash
cp .env.example .env
```
生成必要的密钥:
```bash
# 生成 JWT 密钥
openssl rand -hex 32
# 生成加密密钥
python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
```
编辑 `.env` 文件:
```env
SECRET_KEY=your-generated-jwt-secret-key
ENCRYPTION_KEY=your-generated-encryption-key
DB_NAME=tjwater
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASSWORD=your-db-password
```
### 2. 执行数据库迁移
```bash
# 连接到 PostgreSQL
psql -U postgres -d tjwater
# 执行迁移脚本
\i migrations/001_create_users_table.sql
\i migrations/002_create_audit_logs_table.sql
```
或使用命令行:
```bash
psql -U postgres -d tjwater -f migrations/001_create_users_table.sql
psql -U postgres -d tjwater -f migrations/002_create_audit_logs_table.sql
```
### 3. 验证安装
默认创建了两个管理员账号:
- **用户名**: `admin` / **密码**: `admin123`
- **用户名**: `tjwater` / **密码**: `tjwater@123`
---
## 🔐 数据加密
### 使用加密器
```python
from app.core.encryption import get_encryptor
encryptor = get_encryptor()
# 加密敏感数据
encrypted_data = encryptor.encrypt("sensitive information")
# 解密
decrypted_data = encryptor.decrypt(encrypted_data)
```
### 生成新密钥
```python
from app.core.encryption import Encryptor
new_key = Encryptor.generate_key()
print(f"New encryption key: {new_key}")
```
---
## 👤 身份认证
### 用户角色
系统定义了 4 个角色(权限由低到高):
| 角色 | 权限说明 |
|------|---------|
| `VIEWER` | 仅查询权限 |
| `USER` | 读写权限 |
| `OPERATOR` | 操作员,可修改数据 |
| `ADMIN` | 管理员,完全权限 |
### API 接口
#### 用户注册
```http
POST /api/v1/auth/register
Content-Type: application/json
{
"username": "newuser",
"email": "user@example.com",
"password": "password123",
"role": "USER"
}
```
#### 用户登录OAuth2 标准)
```http
POST /api/v1/auth/login
Content-Type: application/x-www-form-urlencoded
username=admin&password=admin123
```
响应:
```json
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 1800
}
```
#### 用户登录(简化版)
```http
POST /api/v1/auth/login/simple?username=admin&password=admin123
```
#### 获取当前用户信息
```http
GET /api/v1/auth/me
Authorization: Bearer {access_token}
```
#### 刷新 Token
```http
POST /api/v1/auth/refresh
Content-Type: application/json
{
"refresh_token": "your-refresh-token"
}
```
---
## 🔑 权限管理
### 在 API 中使用权限控制
#### 方式 1: 使用预定义依赖
```python
from fastapi import APIRouter, Depends
from app.auth.permissions import get_current_admin, get_current_operator
from app.domain.schemas.user import UserInDB
router = APIRouter()
@router.post("/admin-only")
async def admin_endpoint(
current_user: UserInDB = Depends(get_current_admin)
):
"""仅管理员可访问"""
return {"message": "Admin access granted"}
@router.post("/operator-only")
async def operator_endpoint(
current_user: UserInDB = Depends(get_current_operator)
):
"""操作员及以上可访问"""
return {"message": "Operator access granted"}
```
#### 方式 2: 使用 require_role
```python
from app.auth.permissions import require_role
from app.domain.models.role import UserRole
@router.get("/viewer-access")
async def viewer_endpoint(
current_user: UserInDB = Depends(require_role(UserRole.VIEWER))
):
"""所有认证用户可访问"""
return {"data": "visible to all"}
```
#### 方式 3: 手动检查权限
```python
from app.auth.dependencies import get_current_active_user
from app.auth.permissions import check_resource_owner
@router.put("/users/{user_id}")
async def update_user(
user_id: int,
current_user: UserInDB = Depends(get_current_active_user)
):
"""检查是否是资源拥有者或管理员"""
if not check_resource_owner(user_id, current_user):
raise HTTPException(status_code=403, detail="Permission denied")
# 执行更新操作
...
```
---
## 📝 审计日志
### 自动审计
使用中间件自动记录关键操作,在 `main.py` 中添加:
```python
from app.infra.audit.middleware import AuditMiddleware
app.add_middleware(AuditMiddleware)
```
自动记录:
- 所有 POST/PUT/DELETE 请求
- 登录/登出事件
- 关键资源访问
### 手动记录审计日志
```python
from app.core.audit import log_audit_event, AuditAction
await log_audit_event(
action=AuditAction.UPDATE,
user_id=current_user.id,
username=current_user.username,
resource_type="project",
resource_id="123",
ip_address=request.client.host,
request_data={"field": "value"},
response_status=200
)
```
### 查询审计日志
#### 获取所有审计日志(仅管理员)
```http
GET /api/v1/audit/logs?skip=0&limit=100
Authorization: Bearer {admin_token}
```
#### 按条件过滤
```http
GET /api/v1/audit/logs?user_id=1&action=LOGIN&start_time=2024-01-01T00:00:00
Authorization: Bearer {admin_token}
```
#### 获取我的操作记录
```http
GET /api/v1/audit/logs/my
Authorization: Bearer {access_token}
```
#### 获取日志总数
```http
GET /api/v1/audit/logs/count?action=LOGIN
Authorization: Bearer {admin_token}
```
---
## 💾 数据库迁移
### 用户表结构
```sql
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
hashed_password VARCHAR(255) NOT NULL,
role VARCHAR(20) DEFAULT 'USER' NOT NULL,
is_active BOOLEAN DEFAULT TRUE NOT NULL,
is_superuser BOOLEAN DEFAULT FALSE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
```
### 审计日志表结构
```sql
CREATE TABLE audit_logs (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
username VARCHAR(50),
action VARCHAR(50) NOT NULL,
resource_type VARCHAR(50),
resource_id VARCHAR(100),
ip_address VARCHAR(45),
user_agent TEXT,
request_method VARCHAR(10),
request_path TEXT,
request_data JSONB,
response_status INTEGER,
error_message TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
```
---
## 🔧 API 使用示例
### Python 客户端示例
```python
import requests
BASE_URL = "http://localhost:8000/api/v1"
# 1. 登录
response = requests.post(
f"{BASE_URL}/auth/login",
data={"username": "admin", "password": "admin123"}
)
token = response.json()["access_token"]
# 2. 设置 Authorization Header
headers = {"Authorization": f"Bearer {token}"}
# 3. 获取当前用户信息
response = requests.get(f"{BASE_URL}/auth/me", headers=headers)
print(response.json())
# 4. 创建新用户(需要管理员权限)
response = requests.post(
f"{BASE_URL}/auth/register",
headers=headers,
json={
"username": "newuser",
"email": "new@example.com",
"password": "password123",
"role": "USER"
}
)
print(response.json())
# 5. 查询审计日志(需要管理员权限)
response = requests.get(
f"{BASE_URL}/audit/logs?action=LOGIN",
headers=headers
)
print(response.json())
```
### cURL 示例
```bash
# 登录
curl -X POST "http://localhost:8000/api/v1/auth/login" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=admin&password=admin123"
# 使用 Token 访问受保护接口
TOKEN="your-access-token"
curl -X GET "http://localhost:8000/api/v1/auth/me" \
-H "Authorization: Bearer $TOKEN"
# 注册新用户
curl -X POST "http://localhost:8000/api/v1/auth/register" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $TOKEN" \
-d '{
"username": "testuser",
"email": "test@example.com",
"password": "password123",
"role": "USER"
}'
```
---
## 🛡️ 安全最佳实践
1. **密钥管理**
- 绝不在代码中硬编码密钥
- 定期轮换 JWT 密钥
- 使用强随机密钥
2. **密码策略**
- 最小长度 6 个字符(建议 12+
- 强制密码复杂度(可在注册时添加验证)
- 定期提醒用户更换密码
3. **Token 管理**
- Access Token 短期有效(默认 30 分钟)
- Refresh Token 长期有效(默认 7 天)
- 实施 Token 黑名单(可选)
4. **审计日志**
- 审计日志不可删除
- 定期归档旧日志
- 监控异常登录行为
5. **权限控制**
- 遵循最小权限原则
- 定期审查用户权限
- 记录所有权限变更
---
## 📚 相关文件
- **配置**: `app/core/config.py`
- **加密**: `app/core/encryption.py`
- **安全**: `app/core/security.py`
- **审计**: `app/core/audit.py`
- **认证**: `app/api/v1/endpoints/auth.py`
- **权限**: `app/auth/permissions.py`
- **用户管理**: `app/api/v1/endpoints/user_management.py`
- **审计日志**: `app/api/v1/endpoints/audit.py`
- **迁移脚本**: `migrations/`
---
## ❓ 常见问题
### Q: 忘记密码怎么办?
A: 目前需要管理员通过数据库重置。未来可添加邮件重置功能。
```sql
-- 重置密码为 "newpassword123"
UPDATE users
SET hashed_password = '$2b$12$...' -- 使用 bcrypt 生成哈希
WHERE username = 'targetuser';
```
### Q: 如何添加新角色?
A: 编辑 `app/domain/models/role.py` 中的 `UserRole` 枚举,并更新数据库约束。
### Q: 审计日志占用太多空间?
A: 建议定期归档旧日志到冷存储:
```sql
-- 归档 90 天前的日志
CREATE TABLE audit_logs_archive AS
SELECT * FROM audit_logs WHERE timestamp < NOW() - INTERVAL '90 days';
DELETE FROM audit_logs WHERE timestamp < NOW() - INTERVAL '90 days';
```
---
## 📞 技术支持
如有问题,请查看:
- 日志文件: `logs/`
- 数据库表结构: `migrations/`
- 单元测试: `tests/`