Files

128 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Any, List, Dict
from fastapi import APIRouter, Query, Path
from app.services.tjnetwork import (
get_pipe_risk_probability_now,
get_pipe_risk_probability,
get_pipes_risk_probability,
get_network_pipe_risk_probability_now,
get_pipe_risk_probability_geometries,
)
router = APIRouter()
@router.get(
"/getpiperiskprobabilitynow/",
summary="获取管道当前风险概率",
description="获取指定管道当前时刻的风险概率值"
)
async def fastapi_get_pipe_risk_probability_now(
network: str = Query(..., description="管网名称(或数据库名称)"),
pipe_id: str = Query(..., description="管道ID")
) -> dict[str, Any]:
"""
获取管道当前风险概率。
查询指定管道在当前时刻的风险概率值。
Args:
network: 管网名称(或数据库名称)
pipe_id: 管道ID
Returns:
包含风险概率信息的字典
"""
return get_pipe_risk_probability_now(network, pipe_id)
@router.get(
"/getpiperiskprobability/",
summary="获取管道风险概率历史",
description="获取指定管道的风险概率历史数据"
)
async def fastapi_get_pipe_risk_probability(
network: str = Query(..., description="管网名称(或数据库名称)"),
pipe_id: str = Query(..., description="管道ID")
) -> dict[str, Any]:
"""
获取管道风险概率历史。
查询指定管道的历史风险概率数据。
Args:
network: 管网名称(或数据库名称)
pipe_id: 管道ID
Returns:
包含风险概率历史的字典
"""
return get_pipe_risk_probability(network, pipe_id)
@router.get(
"/getpipesriskprobability/",
summary="批量获取多条管道风险概率",
description="批量获取多条管道的风险概率值"
)
async def fastapi_get_pipes_risk_probability(
network: str = Query(..., description="管网名称(或数据库名称)"),
pipe_ids: str = Query(..., description="逗号分隔的管道ID列表")
) -> list[dict[str, Any]]:
"""
批量获取多条管道风险概率。
查询多条指定管道的风险概率值。
Args:
network: 管网名称(或数据库名称)
pipe_ids: 逗号分隔的管道ID列表(例如:pipe1,pipe2,pipe3
Returns:
包含多条管道风险概率的列表
"""
pipeids = pipe_ids.split(",")
return get_pipes_risk_probability(network, pipeids)
@router.get(
"/getnetworkpiperiskprobabilitynow/",
summary="获取整个网络的管道风险概率",
description="获取指定网络中所有管道的当前风险概率值"
)
async def fastapi_get_network_pipe_risk_probability_now(
network: str = Query(..., description="管网名称(或数据库名称)"),
) -> list[dict[str, Any]]:
"""
获取整个网络的管道风险概率。
查询指定网络中所有管道在当前时刻的风险概率值。
Args:
network: 管网名称(或数据库名称)
Returns:
包含网络内所有管道风险概率的列表
"""
return get_network_pipe_risk_probability_now(network)
@router.get(
"/getpiperiskprobabilitygeometries/",
summary="获取管道风险几何信息",
description="获取指定网络中管道的风险相关几何数据"
)
async def fastapi_get_pipe_risk_probability_geometries(
network: str = Query(..., description="管网名称(或数据库名称)")
) -> dict[str, Any]:
"""
获取管道风险几何信息。
查询指定网络中管道的地理和风险相关的几何数据。
Args:
network: 管网名称(或数据库名称)
Returns:
包含几何信息和风险数据的字典
"""
return get_pipe_risk_probability_geometries(network)