from datetime import datetime from typing import Any from fastapi import APIRouter, Depends, HTTPException, Query, Path, Body from pydantic import BaseModel, Field from app.auth.keycloak_dependencies import get_current_keycloak_username from app.services.burst_detection import ( get_burst_detection_scheme_detail, list_burst_detection_schemes, run_burst_detection, ) router = APIRouter() class BurstDetectionRequest(BaseModel): """爆管检测请求模型""" network: str = Field(..., description="管网名称(或数据库名称)") observed_pressure_data: ( dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]] | None ) = Field( default=None, description=( "压力观测数据。支持列式字典 {sensor_id: [values,...]}、" "逐时刻对象数组 [{sensor_id: value,...}, ...]、" "或二维数组 [[t1_s1, t1_s2], [t2_s1, t2_s2], ...]。" ), ) points_per_day: int = Field(1440, description="每天的数据点数") mu: int = Field(100, description="异常值检测的参数") iforest_params: dict[str, Any] | None = Field(None, description="隔离森林算法参数") scada_start: datetime | None = Field(None, description="SCADA数据起始时间") scada_end: datetime | None = Field(None, description="SCADA数据结束时间") sensor_nodes: list[str] | None = Field(None, description="传感器节点列表") scheme_name: str | None = Field(None, description="方案名称") data_source: str = Field("monitoring", description="数据来源:monitoring(监测)或simulation(模拟)") simulation_scheme_name: str | None = Field(None, description="模拟方案名称") simulation_scheme_type: str | None = Field(None, description="模拟方案类型") @router.post( "/detect/", summary="执行爆管检测", description="基于压力观测数据和其他参数执行爆管检测分析" ) async def detect_burst( data: BurstDetectionRequest = Body(..., description="爆管检测请求数据"), username: str = Depends(get_current_keycloak_username), ) -> dict[str, Any]: """ 执行爆管检测分析。 使用异常检测算法(隔离森林)识别压力时间序列中的异常, 将其作为潜在的爆管事件。 Args: data: 包含管网名称(或数据库名称)、压力数据及相关参数的请求体 username: 当前认证用户名 Returns: 包含检测结果的字典 Raises: HTTPException: 当处理过程中发生错误时 """ try: return run_burst_detection(**data.model_dump(), username=username) except Exception as exc: raise HTTPException(status_code=400, detail=str(exc)) @router.get( "/schemes/", summary="查询爆管检测方案列表", description="获取指定网络的所有爆管检测方案" ) async def query_burst_detection_schemes( network: str = Query(..., description="管网名称(或数据库名称)"), query_date: datetime | None = Query(None, description="查询日期(可选)"), ) -> list[dict[str, Any]]: """ 获取爆管检测方案列表。 查询指定网络的所有已配置的爆管检测方案, 可按日期进行筛选。 Args: network: 管网名称(或数据库名称) query_date: 查询日期(可选) Returns: 爆管检测方案列表 Raises: HTTPException: 当查询失败时 """ try: return list_burst_detection_schemes(network=network, query_date=query_date) except Exception as exc: raise HTTPException(status_code=400, detail=str(exc)) @router.get( "/schemes/{scheme_name}", summary="获取爆管检测方案详情", description="获取指定爆管检测方案的详细信息" ) async def query_burst_detection_scheme_detail( network: str = Query(..., description="管网名称(或数据库名称)"), scheme_name: str = Path(..., description="爆管检测方案名称"), ) -> dict[str, Any]: """ 获取爆管检测方案详情。 查询指定爆管检测方案的完整配置和参数信息。 Args: network: 管网名称(或数据库名称) scheme_name: 爆管检测方案名称 Returns: 包含方案详情的字典 Raises: HTTPException: 当查询失败时 """ try: return get_burst_detection_scheme_detail(network=network, scheme_name=scheme_name) except Exception as exc: raise HTTPException(status_code=400, detail=str(exc))