Files
TJWaterServerBinary/app/api/v1/endpoints/timeseries/composite.py
T

269 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import APIRouter, Depends, HTTPException, Query
from datetime import datetime
from psycopg import AsyncConnection
from app.infra.db.timescaledb.composite_queries import CompositeQueries
from .dependencies import get_timescale_connection, get_postgres_connection
router = APIRouter()
@router.get("/composite/scada-simulation", summary="获取SCADA关联的模拟数据",
tags=["复合查询"])
async def get_scada_associated_simulation_data(
start_time: datetime = Query(..., description="查询开始时间"),
end_time: datetime = Query(..., description="查询结束时间"),
device_ids: str = Query(..., description="SCADA设备ID列表,逗号分隔"),
scheme_type: str = Query(None, description="方案类型,若为空则查询实时数据"),
scheme_name: str = Query(None, description="方案名称,若为空则查询实时数据"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
获取SCADA关联的link/node模拟值
根据传入的SCADA device_ids,找到关联的link/node
并根据对应的type,查询对应的模拟数据。支持查询实时或方案数据。
Args:
start_time: 查询开始时间
end_time: 查询结束时间
device_ids: SCADA设备ID列表,用逗号分隔
scheme_type: 方案类型,若为空则查询实时数据
scheme_name: 方案名称,若为空则查询实时数据
timescale_conn: TimescaleDB连接
postgres_conn: PostgreSQL连接
Returns:
SCADA关联的模拟数据
Raises:
HTTPException: 当查询参数无效时返回400错误,未找到数据时返回404错误
"""
try:
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
if scheme_type and scheme_name:
result = await CompositeQueries.get_scada_associated_scheme_simulation_data(
timescale_conn,
postgres_conn,
device_ids_list,
start_time,
end_time,
scheme_type,
scheme_name,
)
else:
result = (
await CompositeQueries.get_scada_associated_realtime_simulation_data(
timescale_conn,
postgres_conn,
device_ids_list,
start_time,
end_time,
)
)
if result is None:
raise HTTPException(status_code=404, detail="No simulation data found")
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/element-simulation", summary="获取管网元素的模拟数据",
tags=["复合查询"])
async def get_feature_simulation_data(
start_time: datetime = Query(..., description="查询开始时间"),
end_time: datetime = Query(..., description="查询结束时间"),
feature_infos: str = Query(
..., description="特征信息,格式: id1:type1,id2:type2type为pipe(管道)或junction(节点)"
),
scheme_type: str = Query(None, description="方案类型,若为空则查询实时数据"),
scheme_name: str = Query(None, description="方案名称,若为空则查询实时数据"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
获取link/node模拟值
根据传入的featureInfos,找到关联的link/node
并根据对应的type,查询对应的模拟数据。支持查询实时或方案数据。
Args:
start_time: 查询开始时间
end_time: 查询结束时间
feature_infos: 格式为 "element_id1:type1,element_id2:type2"
例如: "P1:pipe,J1:junction"
scheme_type: 方案类型,若为空则查询实时数据
scheme_name: 方案名称,若为空则查询实时数据
timescale_conn: TimescaleDB连接
Returns:
管网元素的模拟数据
Raises:
HTTPException: 当feature_infos为空返回400错误,未找到数据返回404错误,其他错误返回400错误
"""
try:
feature_infos_list = []
if feature_infos:
for item in feature_infos.split(","):
item = item.strip()
if ":" in item:
element_id, element_type = item.split(":", 1)
feature_infos_list.append(
(element_id.strip(), element_type.strip())
)
if not feature_infos_list:
raise HTTPException(status_code=400, detail="feature_infos cannot be empty")
if scheme_type and scheme_name:
result = await CompositeQueries.get_scheme_simulation_data(
timescale_conn,
feature_infos_list,
start_time,
end_time,
scheme_type,
scheme_name,
)
else:
result = await CompositeQueries.get_realtime_simulation_data(
timescale_conn,
feature_infos_list,
start_time,
end_time,
)
if result is None:
raise HTTPException(status_code=404, detail="No simulation data found")
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/element-scada", summary="获取管网元素关联的SCADA监测数据",
tags=["复合查询"])
async def get_element_associated_scada_data(
element_id: str = Query(..., description="管网元素ID(管道或节点)"),
start_time: datetime = Query(..., description="查询开始时间"),
end_time: datetime = Query(..., description="查询结束时间"),
use_cleaned: bool = Query(False, description="是否使用清洗后的数据"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
获取link/node关联的SCADA监测值
根据传入的link/node id,匹配SCADA信息,
如果存在关联的SCADA device_id,获取实际的监测数据。
Args:
element_id: 管网元素ID
start_time: 查询开始时间
end_time: 查询结束时间
use_cleaned: 是否使用清洗后的数据,默认为False使用原始数据
timescale_conn: TimescaleDB连接
postgres_conn: PostgreSQL连接
Returns:
管网元素关联的SCADA监测数据
Raises:
HTTPException: 当查询参数无效时返回400错误,未找到关联数据返回404错误
"""
try:
result = await CompositeQueries.get_element_associated_scada_data(
timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned
)
if result is None:
raise HTTPException(
status_code=404, detail="No associated SCADA data found"
)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/composite/clean-scada", summary="清洗SCADA监测数据",
tags=["复合查询"])
async def clean_scada_data(
device_ids: str = Query(..., description="设备ID列表或 'all' 表示清洗所有设备"),
start_time: datetime = Query(..., description="清洗数据的开始时间"),
end_time: datetime = Query(..., description="清洗数据的结束时间"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
清洗SCADA监测数据
根据device_ids查询monitored_value,清洗后更新cleaned_value。
支持清洗指定设备或所有设备的数据。
Args:
device_ids: 设备ID列表,用逗号分隔,或 'all' 表示清洗所有设备
start_time: 清洗数据的开始时间
end_time: 清洗数据的结束时间
timescale_conn: TimescaleDB连接
postgres_conn: PostgreSQL连接
Returns:
清洗结果信息
Raises:
HTTPException: 当清洗过程出现错误时返回400错误
"""
try:
if device_ids == "all":
device_ids_list = []
else:
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
return await CompositeQueries.clean_scada_data(
timescale_conn, postgres_conn, device_ids_list, start_time, end_time
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/pipeline-health-prediction", summary="预测管道健康状况",
tags=["复合查询"])
async def predict_pipeline_health(
query_time: datetime = Query(..., description="查询时间"),
network_name: str = Query(..., description="管网名称(或数据库名称)"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
预测管道健康状况
根据管网名称和当前时间,查询管道信息和实时数据,
使用随机生存森林模型预测管道的生存概率。
Args:
query_time: 查询时间
network_name: 管网名称(或数据库名称)
timescale_conn: TimescaleDB连接
Returns:
预测结果列表,每个元素包含 link_id 和对应的生存函数
Raises:
HTTPException: 当模型文件不存在返回404错误,其他错误返回400或500错误
"""
try:
return await CompositeQueries.predict_pipeline_health(
timescale_conn, network_name, query_time
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")