Files
TJWaterServerBinary/app/core/audit.py

155 lines
4.1 KiB
Python

"""
审计日志模块
记录系统关键操作,用于安全审计和合规追踪
"""
from typing import Optional
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class AuditAction:
"""审计操作类型常量"""
# 认证相关
LOGIN = "LOGIN"
LOGOUT = "LOGOUT"
REGISTER = "REGISTER"
PASSWORD_CHANGE = "PASSWORD_CHANGE"
# 数据操作
CREATE = "CREATE"
READ = "READ"
UPDATE = "UPDATE"
DELETE = "DELETE"
# 权限相关
PERMISSION_CHANGE = "PERMISSION_CHANGE"
ROLE_CHANGE = "ROLE_CHANGE"
# 系统操作
CONFIG_CHANGE = "CONFIG_CHANGE"
SYSTEM_START = "SYSTEM_START"
SYSTEM_STOP = "SYSTEM_STOP"
async def log_audit_event(
action: str,
user_id: Optional[int] = None,
username: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
request_method: Optional[str] = None,
request_path: Optional[str] = None,
request_data: Optional[dict] = None,
response_status: Optional[int] = None,
error_message: Optional[str] = None,
db=None, # 新增:可选的数据库实例
):
"""
记录审计日志
Args:
action: 操作类型
user_id: 用户ID
username: 用户名
resource_type: 资源类型
resource_id: 资源ID
ip_address: IP地址
user_agent: User-Agent
request_method: 请求方法
request_path: 请求路径
request_data: 请求数据(敏感字段需脱敏)
response_status: 响应状态码
error_message: 错误消息
db: 数据库实例(可选,如果不提供则尝试获取)
"""
from app.infra.repositories.audit_repository import AuditRepository
try:
# 脱敏敏感数据
if request_data:
request_data = sanitize_sensitive_data(request_data)
# 如果没有提供数据库实例,尝试从全局获取
if db is None:
try:
from app.infra.db.postgresql.database import db as default_db
# 仅当连接池已初始化时使用
if default_db.pool:
db = default_db
except ImportError:
pass
# 如果仍然没有数据库实例
if db is None:
# 在某些上下文中可能无法获取,此时静默失败
logger.warning("No database instance provided for audit logging")
return
audit_repo = AuditRepository(db)
await audit_repo.create_log(
user_id=user_id,
username=username,
action=action,
resource_type=resource_type,
resource_id=resource_id,
ip_address=ip_address,
user_agent=user_agent,
request_method=request_method,
request_path=request_path,
request_data=request_data,
response_status=response_status,
error_message=error_message,
)
logger.info(
f"Audit log created: action={action}, user={username or user_id}, "
f"resource={resource_type}:{resource_id}"
)
except Exception as e:
# 审计日志失败不应影响业务流程
logger.error(f"Failed to create audit log: {e}", exc_info=True)
def sanitize_sensitive_data(data: dict) -> dict:
"""
脱敏敏感数据
Args:
data: 原始数据
Returns:
脱敏后的数据
"""
sensitive_fields = [
"password",
"passwd",
"pwd",
"secret",
"token",
"api_key",
"apikey",
"credit_card",
"ssn",
"social_security",
]
sanitized = data.copy()
for key in sanitized:
if isinstance(sanitized[key], dict):
sanitized[key] = sanitize_sensitive_data(sanitized[key])
elif any(sensitive in key.lower() for sensitive in sensitive_fields):
sanitized[key] = "***REDACTED***"
return sanitized