130 lines
4.9 KiB
Python
130 lines
4.9 KiB
Python
from typing import Any
|
||
from datetime import datetime
|
||
|
||
from typing import Literal
|
||
|
||
from fastapi import APIRouter, Depends, HTTPException, Query, Path, Body
|
||
from pydantic import BaseModel, Field
|
||
|
||
from app.auth.keycloak_dependencies import get_current_keycloak_username
|
||
from app.services.burst_location import (
|
||
get_burst_location_scheme_detail,
|
||
list_burst_location_schemes,
|
||
run_burst_location_by_network,
|
||
)
|
||
|
||
router = APIRouter()
|
||
|
||
|
||
class BurstLocationRequest(BaseModel):
|
||
"""爆管定位请求模型"""
|
||
network: str = Field(..., description="管网名称(或数据库名称)")
|
||
data_source: Literal["monitoring", "simulation"] = Field("monitoring", description="数据来源:monitoring(监测)或simulation(模拟)")
|
||
pressure_scada_ids: list[str] | None = Field(None, description="压力SCADA传感器ID列表")
|
||
burst_pressure: dict[str, float] | list[dict[str, Any]] | None = Field(None, description="爆管时的压力数据")
|
||
normal_pressure: dict[str, float] | list[dict[str, Any]] | None = Field(None, description="正常时的压力数据")
|
||
burst_leakage: float = Field(..., description="爆管时的漏水量")
|
||
flow_scada_ids: list[str] | None = Field(None, description="流量SCADA传感器ID列表")
|
||
burst_flow: dict[str, float] | list[dict[str, Any]] | None = Field(None, description="爆管时的流量数据")
|
||
normal_flow: dict[str, float] | list[dict[str, Any]] | None = Field(None, description="正常时的流量数据")
|
||
min_dpressure: float = Field(2.0, description="最小压力差(bar)")
|
||
basic_pressure: float = Field(10.0, description="基准压力(bar)")
|
||
scada_burst_start: datetime | None = Field(None, description="SCADA爆管开始时间")
|
||
scada_burst_end: datetime | None = Field(None, description="SCADA爆管结束时间")
|
||
use_scada_flow: bool = Field(False, description="是否使用SCADA流量数据")
|
||
scheme_name: str | None = Field(None, description="方案名称")
|
||
simulation_scheme_name: str | None = Field(None, description="模拟方案名称")
|
||
simulation_scheme_type: str | None = Field(None, description="模拟方案类型")
|
||
|
||
|
||
@router.post(
|
||
"/locate/",
|
||
summary="执行爆管定位",
|
||
description="基于压力和流量数据定位管网中的爆管位置"
|
||
)
|
||
async def locate_burst(
|
||
data: BurstLocationRequest = Body(..., description="爆管定位请求数据"),
|
||
username: str = Depends(get_current_keycloak_username),
|
||
) -> dict[str, Any]:
|
||
"""
|
||
执行爆管定位分析。
|
||
|
||
使用压力和流量SCADA数据,通过对比爆管和正常状态下的数据差异,
|
||
定位管网中的爆管位置。
|
||
|
||
Args:
|
||
data: 包含管网名称(或数据库名称)、压力、流量数据及相关参数的请求体
|
||
username: 当前认证用户名
|
||
|
||
Returns:
|
||
包含定位结果的字典
|
||
|
||
Raises:
|
||
HTTPException: 当数据类型或值不正确时
|
||
"""
|
||
try:
|
||
return run_burst_location_by_network(**data.model_dump(), username=username)
|
||
except (TypeError, ValueError) as exc:
|
||
raise HTTPException(status_code=400, detail=str(exc))
|
||
|
||
|
||
@router.get(
|
||
"/schemes/",
|
||
summary="查询爆管定位方案列表",
|
||
description="获取指定网络的所有爆管定位方案"
|
||
)
|
||
async def query_burst_schemes(
|
||
network: str = Query(..., description="管网名称(或数据库名称)"),
|
||
query_date: datetime | None = Query(None, description="查询日期(可选)")
|
||
) -> list[dict[str, Any]]:
|
||
"""
|
||
获取爆管定位方案列表。
|
||
|
||
查询指定网络的所有已配置的爆管定位方案,
|
||
可按日期进行筛选。
|
||
|
||
Args:
|
||
network: 管网名称(或数据库名称)
|
||
query_date: 查询日期(可选)
|
||
|
||
Returns:
|
||
爆管定位方案列表
|
||
|
||
Raises:
|
||
HTTPException: 当查询失败时
|
||
"""
|
||
try:
|
||
return list_burst_location_schemes(network=network, query_date=query_date)
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=400, detail=str(exc))
|
||
|
||
|
||
@router.get(
|
||
"/schemes/{scheme_name}",
|
||
summary="获取爆管定位方案详情",
|
||
description="获取指定爆管定位方案的详细信息"
|
||
)
|
||
async def query_burst_scheme_detail(
|
||
network: str = Query(..., description="管网名称(或数据库名称)"),
|
||
scheme_name: str = Path(..., description="爆管定位方案名称")
|
||
) -> dict[str, Any]:
|
||
"""
|
||
获取爆管定位方案详情。
|
||
|
||
查询指定爆管定位方案的完整配置和参数信息。
|
||
|
||
Args:
|
||
network: 管网名称(或数据库名称)
|
||
scheme_name: 爆管定位方案名称
|
||
|
||
Returns:
|
||
包含方案详情的字典
|
||
|
||
Raises:
|
||
HTTPException: 当查询失败时
|
||
"""
|
||
try:
|
||
return get_burst_location_scheme_detail(network=network, scheme_name=scheme_name)
|
||
except Exception as exc:
|
||
raise HTTPException(status_code=400, detail=str(exc))
|