新增爆管位置检测模块及相关API接口

This commit is contained in:
2026-03-06 15:27:59 +08:00
parent 63d3458fb4
commit b83b895e2b
11 changed files with 3084 additions and 0 deletions
+33
View File
@@ -0,0 +1,33 @@
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from app.auth.keycloak_dependencies import get_current_keycloak_username
from app.services.burst_location import run_burst_location_by_network
router = APIRouter()
class BurstLocationRequest(BaseModel):
network: str
pressure_scada_ids: list[str]
burst_pressure: dict[str, float] | list[dict[str, Any]]
normal_pressure: dict[str, float] | list[dict[str, Any]]
burst_leakage: float
flow_scada_ids: list[str] | None = None
burst_flow: dict[str, float] | list[dict[str, Any]] | None = None
normal_flow: dict[str, float] | list[dict[str, Any]] | None = None
min_dpressure: float = 2.0
basic_pressure: float = 10.0
@router.post("/locate/")
async def locate_burst(
data: BurstLocationRequest,
_username: str = Depends(get_current_keycloak_username),
) -> dict[str, Any]:
try:
return run_burst_location_by_network(**data.model_dump())
except (TypeError, ValueError) as exc:
raise HTTPException(status_code=400, detail=str(exc))