Files

132 lines
4.6 KiB
Python

from datetime import datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query, Path, Body
from pydantic import BaseModel, Field
from app.auth.keycloak_dependencies import get_current_keycloak_username
from app.services.burst_detection import (
get_burst_detection_scheme_detail,
list_burst_detection_schemes,
run_burst_detection,
)
router = APIRouter()
class BurstDetectionRequest(BaseModel):
"""爆管检测请求模型"""
network: str = Field(..., description="管网名称(或数据库名称)")
observed_pressure_data: (
dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]] | None
) = Field(
default=None,
description=(
"压力观测数据。支持列式字典 {sensor_id: [values,...]}、"
"逐时刻对象数组 [{sensor_id: value,...}, ...]、"
"或二维数组 [[t1_s1, t1_s2], [t2_s1, t2_s2], ...]。"
),
)
points_per_day: int = Field(1440, description="每天的数据点数")
mu: int = Field(100, description="异常值检测的参数")
iforest_params: dict[str, Any] | None = Field(None, description="隔离森林算法参数")
scada_start: datetime | None = Field(None, description="SCADA数据起始时间")
scada_end: datetime | None = Field(None, description="SCADA数据结束时间")
sensor_nodes: list[str] | None = Field(None, description="传感器节点列表")
scheme_name: str | None = Field(None, description="方案名称")
data_source: str = Field("monitoring", description="数据来源:monitoring(监测)或simulation(模拟)")
simulation_scheme_name: str | None = Field(None, description="模拟方案名称")
simulation_scheme_type: str | None = Field(None, description="模拟方案类型")
@router.post(
"/detect/",
summary="执行爆管检测",
description="基于压力观测数据和其他参数执行爆管检测分析"
)
async def detect_burst(
data: BurstDetectionRequest = Body(..., description="爆管检测请求数据"),
username: str = Depends(get_current_keycloak_username),
) -> dict[str, Any]:
"""
执行爆管检测分析。
使用异常检测算法(隔离森林)识别压力时间序列中的异常,
将其作为潜在的爆管事件。
Args:
data: 包含管网名称(或数据库名称)、压力数据及相关参数的请求体
username: 当前认证用户名
Returns:
包含检测结果的字典
Raises:
HTTPException: 当处理过程中发生错误时
"""
try:
return run_burst_detection(**data.model_dump(), username=username)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
@router.get(
"/schemes/",
summary="查询爆管检测方案列表",
description="获取指定网络的所有爆管检测方案"
)
async def query_burst_detection_schemes(
network: str = Query(..., description="管网名称(或数据库名称)"),
query_date: datetime | None = Query(None, description="查询日期(可选)"),
) -> list[dict[str, Any]]:
"""
获取爆管检测方案列表。
查询指定网络的所有已配置的爆管检测方案,
可按日期进行筛选。
Args:
network: 管网名称(或数据库名称)
query_date: 查询日期(可选)
Returns:
爆管检测方案列表
Raises:
HTTPException: 当查询失败时
"""
try:
return list_burst_detection_schemes(network=network, query_date=query_date)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
@router.get(
"/schemes/{scheme_name}",
summary="获取爆管检测方案详情",
description="获取指定爆管检测方案的详细信息"
)
async def query_burst_detection_scheme_detail(
network: str = Query(..., description="管网名称(或数据库名称)"),
scheme_name: str = Path(..., description="爆管检测方案名称"),
) -> dict[str, Any]:
"""
获取爆管检测方案详情。
查询指定爆管检测方案的完整配置和参数信息。
Args:
network: 管网名称(或数据库名称)
scheme_name: 爆管检测方案名称
Returns:
包含方案详情的字典
Raises:
HTTPException: 当查询失败时
"""
try:
return get_burst_detection_scheme_detail(network=network, scheme_name=scheme_name)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))