添加审计中间件排除路径、用户按用户名查询功能;完善审计资源记录
This commit is contained in:
@@ -55,6 +55,12 @@ class AuditMiddleware(BaseHTTPMiddleware):
|
||||
|
||||
# 需要审计的HTTP方法
|
||||
AUDIT_METHODS = ["POST", "PUT", "DELETE", "PATCH"]
|
||||
EXCLUDED_PATHS = {
|
||||
"/api/v1/meta/projects",
|
||||
"/meta/projects",
|
||||
"/api/v1/openproject/",
|
||||
"/openproject/",
|
||||
}
|
||||
|
||||
async def dispatch(self, request: Request, call_next: Callable) -> Response:
|
||||
# 提取开始时间
|
||||
@@ -84,6 +90,11 @@ class AuditMiddleware(BaseHTTPMiddleware):
|
||||
response = await call_next(request)
|
||||
|
||||
# 3. 决定是否审计
|
||||
if request.url.path in self.EXCLUDED_PATHS:
|
||||
process_time = time.time() - start_time
|
||||
response.headers["X-Process-Time"] = str(process_time)
|
||||
return response
|
||||
|
||||
# 检查方法
|
||||
is_audit_method = request.method in self.AUDIT_METHODS
|
||||
# 检查路径
|
||||
@@ -155,6 +166,7 @@ class AuditMiddleware(BaseHTTPMiddleware):
|
||||
token = auth_header.split(" ", 1)[1].strip()
|
||||
if not token:
|
||||
return None
|
||||
sub = None
|
||||
try:
|
||||
key = (
|
||||
settings.KEYCLOAK_PUBLIC_KEY.replace("\\n", "\n")
|
||||
@@ -166,17 +178,25 @@ class AuditMiddleware(BaseHTTPMiddleware):
|
||||
if settings.KEYCLOAK_PUBLIC_KEY
|
||||
else [settings.ALGORITHM]
|
||||
)
|
||||
payload = jwt.decode(token, key, algorithms=algorithms)
|
||||
payload = jwt.decode(
|
||||
token,
|
||||
key,
|
||||
algorithms=algorithms,
|
||||
audience=settings.KEYCLOAK_AUDIENCE or None,
|
||||
)
|
||||
sub = payload.get("sub")
|
||||
if not sub:
|
||||
return None
|
||||
keycloak_id = UUID(sub)
|
||||
except (JWTError, ValueError):
|
||||
except JWTError:
|
||||
return None
|
||||
|
||||
async with SessionLocal() as session:
|
||||
repo = MetadataRepository(session)
|
||||
user = await repo.get_user_by_keycloak_id(keycloak_id)
|
||||
try:
|
||||
keycloak_id = UUID(sub)
|
||||
user = await repo.get_user_by_keycloak_id(keycloak_id)
|
||||
except ValueError:
|
||||
user = await repo.get_user_by_username(sub)
|
||||
if user and user.is_active:
|
||||
return user.id
|
||||
return None
|
||||
@@ -218,7 +238,25 @@ class AuditMiddleware(BaseHTTPMiddleware):
|
||||
if len(path_parts) >= 4:
|
||||
resource_type = path_parts[3].rstrip("s") # 移除复数s
|
||||
|
||||
if len(path_parts) >= 5 and path_parts[4].isdigit():
|
||||
if len(path_parts) >= 5 and path_parts[4]:
|
||||
resource_id = path_parts[4]
|
||||
|
||||
# 无路径ID时,尝试从查询参数提取业务ID
|
||||
if not resource_id:
|
||||
for key in (
|
||||
"id",
|
||||
"resource_id",
|
||||
"device_id",
|
||||
"device_ids",
|
||||
"element_id",
|
||||
"user_id",
|
||||
"project_id",
|
||||
"network",
|
||||
"name",
|
||||
):
|
||||
value = request.query_params.get(key)
|
||||
if value:
|
||||
resource_id = value
|
||||
break
|
||||
|
||||
return resource_type, resource_id
|
||||
|
||||
@@ -77,6 +77,12 @@ class MetadataRepository:
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def get_user_by_username(self, username: str) -> Optional[models.User]:
|
||||
result = await self.session.execute(
|
||||
select(models.User).where(models.User.username == username)
|
||||
)
|
||||
return result.scalar_one_or_none()
|
||||
|
||||
async def get_project_by_id(self, project_id: UUID) -> Optional[models.Project]:
|
||||
result = await self.session.execute(
|
||||
select(models.Project).where(models.Project.id == project_id)
|
||||
|
||||
Reference in New Issue
Block a user