Files

134 lines
4.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from typing import Any
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Query, Path, Body
from pydantic import BaseModel, Field
from app.auth.keycloak_dependencies import get_current_keycloak_username
from app.services.leakage_identifier import (
get_leakage_identify_scheme_detail,
list_leakage_identify_schemes,
run_leakage_identification,
)
router = APIRouter()
DEFAULT_N_WORKERS = max(1, min((os.cpu_count() or 1) - 1, 4))
class LeakageIdentifyRequest(BaseModel):
"""漏损识别请求模型"""
network: str = Field(..., description="管网名称(或数据库名称)")
observed_pressure_data: str | dict[str, list[Any]] | list[dict[str, Any]] | None = Field(
None, description="观测的压力数据"
)
start_time: float = Field(0, description="起始时间(小时)")
duration: float = Field(24, description="持续时间(小时)")
timestep: float = Field(5, description="时间步长(分钟)")
q_sum: float = Field(0.2, description="总流量(m3/s")
q_sum_unit: str = Field("m3/s", description="流量单位")
output_dir: str = Field("db_inp", description="输出目录")
pop_size: int = Field(50, description="种群大小")
max_gen: int = Field(100, description="最大代数")
n_workers: int = Field(DEFAULT_N_WORKERS, description="工作线程数")
output_flow_unit: str = Field("m3/s", description="输出流量单位")
dma_count: int | None = Field(None, description="DMA区域数量")
scada_start: datetime | None = Field(None, description="SCADA数据起始时间")
scada_end: datetime | None = Field(None, description="SCADA数据结束时间")
sensor_nodes: list[str] | None = Field(None, description="传感器节点列表")
scheme_name: str | None = Field(None, description="方案名称")
@router.post(
"/identify/",
summary="执行漏损识别",
description="基于压力观测数据和遗传算法识别管网中的漏损位置和大小"
)
async def identify_leakage(
data: LeakageIdentifyRequest = Body(..., description="漏损识别请求数据"),
username: str = Depends(get_current_keycloak_username),
) -> dict[str, Any]:
"""
执行漏损识别分析。
使用遗传算法对比模型计算和实测压力数据,
识别管网中的漏损节点和漏水量。
Args:
data: 包含管网名称(或数据库名称)、压力数据及优化参数的请求体
username: 当前认证用户名
Returns:
包含识别结果的字典
Raises:
HTTPException: 当处理过程中发生错误时
"""
try:
return run_leakage_identification(**data.model_dump(), username=username)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
@router.get(
"/schemes/",
summary="查询漏损识别方案列表",
description="获取指定网络的所有漏损识别方案"
)
async def query_leakage_schemes(
network: str = Query(..., description="管网名称(或数据库名称)"),
query_date: datetime | None = Query(None, description="查询日期(可选)")
) -> list[dict[str, Any]]:
"""
获取漏损识别方案列表。
查询指定网络的所有已配置的漏损识别方案,
可按日期进行筛选。
Args:
network: 管网名称(或数据库名称)
query_date: 查询日期(可选)
Returns:
漏损识别方案列表
Raises:
HTTPException: 当查询失败时
"""
try:
return list_leakage_identify_schemes(network=network, query_date=query_date)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
@router.get(
"/schemes/{scheme_name}",
summary="获取漏损识别方案详情",
description="获取指定漏损识别方案的详细信息"
)
async def query_leakage_scheme_detail(
network: str = Query(..., description="管网名称(或数据库名称)"),
scheme_name: str = Path(..., description="漏损识别方案名称")
) -> dict[str, Any]:
"""
获取漏损识别方案详情。
查询指定漏损识别方案的完整配置和参数信息。
Args:
network: 管网名称(或数据库名称)
scheme_name: 漏损识别方案名称
Returns:
包含方案详情的字典
Raises:
HTTPException: 当查询失败时
"""
try:
return get_leakage_identify_scheme_detail(
network=network, scheme_name=scheme_name
)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))