Files
TJWaterServerBinary/app/api/v1/endpoints/leakage.py
T

31 lines
834 B
Python

from typing import Any
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from app.services.leakage_identifier import run_leakage_identification
router = APIRouter()
class LeakageIdentifyRequest(BaseModel):
network: str
observed_pressure_data: str | dict[str, list[Any]] | list[dict[str, Any]]
start_time: float = 0
duration: float = 24
timestep: float = 5
q_sum: float = 0.2
q_sum_unit: str = "m3/s"
output_dir: str = "Results"
pop_size: int = 50
max_gen: int = 100
output_flow_unit: str = "m3/s"
@router.post("/identify/")
async def identify_leakage(data: LeakageIdentifyRequest) -> dict[str, Any]:
try:
return run_leakage_identification(**data.dict())
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))