from fastapi import APIRouter, Depends, HTTPException, Query from datetime import datetime from psycopg import AsyncConnection from app.infra.db.timescaledb.composite_queries import CompositeQueries from .dependencies import get_timescale_connection, get_postgres_connection router = APIRouter() @router.get("/composite/scada-simulation") async def get_scada_associated_simulation_data( start_time: datetime, end_time: datetime, device_ids: str, scheme_type: str = Query(None, description="指定方案名称,若为空则查询实时数据"), scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), timescale_conn: AsyncConnection = Depends(get_timescale_connection), postgres_conn: AsyncConnection = Depends(get_postgres_connection), ): """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 """ try: device_ids_list = ( [id.strip() for id in device_ids.split(",") if id.strip()] if device_ids else [] ) if scheme_type and scheme_name: result = await CompositeQueries.get_scada_associated_scheme_simulation_data( timescale_conn, postgres_conn, device_ids_list, start_time, end_time, scheme_type, scheme_name, ) else: result = ( await CompositeQueries.get_scada_associated_realtime_simulation_data( timescale_conn, postgres_conn, device_ids_list, start_time, end_time, ) ) if result is None: raise HTTPException(status_code=404, detail="No simulation data found") return result except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/composite/element-simulation") async def get_feature_simulation_data( start_time: datetime, end_time: datetime, feature_infos: str = Query( ..., description="特征信息,格式: id1:type1,id2:type2,type为pipe或junction" ), scheme_type: str = Query(None, description="指定方案类型,若为空则查询实时数据"), scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), timescale_conn: AsyncConnection = Depends(get_timescale_connection), ): """ 获取 link/node 模拟值 根据传入的 featureInfos,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: feature_infos: 格式为 "element_id1:type1,element_id2:type2" 例如: "P1:pipe,J1:junction" """ try: feature_infos_list = [] if feature_infos: for item in feature_infos.split(","): item = item.strip() if ":" in item: element_id, element_type = item.split(":", 1) feature_infos_list.append( (element_id.strip(), element_type.strip()) ) if not feature_infos_list: raise HTTPException(status_code=400, detail="feature_infos cannot be empty") if scheme_type and scheme_name: result = await CompositeQueries.get_scheme_simulation_data( timescale_conn, feature_infos_list, start_time, end_time, scheme_type, scheme_name, ) else: result = await CompositeQueries.get_realtime_simulation_data( timescale_conn, feature_infos_list, start_time, end_time, ) if result is None: raise HTTPException(status_code=404, detail="No simulation data found") return result except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/composite/element-scada") async def get_element_associated_scada_data( element_id: str, start_time: datetime, end_time: datetime, use_cleaned: bool = Query(False, description="是否使用清洗后的数据"), timescale_conn: AsyncConnection = Depends(get_timescale_connection), postgres_conn: AsyncConnection = Depends(get_postgres_connection), ): """ 获取 link/node 关联的 SCADA 监测值 根据传入的 link/node id,匹配 SCADA 信息, 如果存在关联的 SCADA device_id,获取实际的监测数据 """ try: result = await CompositeQueries.get_element_associated_scada_data( timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned ) if result is None: raise HTTPException( status_code=404, detail="No associated SCADA data found" ) return result except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.post("/composite/clean-scada") async def clean_scada_data( device_ids: str, start_time: datetime = Query(...), end_time: datetime = Query(...), timescale_conn: AsyncConnection = Depends(get_timescale_connection), postgres_conn: AsyncConnection = Depends(get_postgres_connection), ): """ 清洗 SCADA 数据 根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value """ try: if device_ids == "all": device_ids_list = [] else: device_ids_list = ( [id.strip() for id in device_ids.split(",") if id.strip()] if device_ids else [] ) return await CompositeQueries.clean_scada_data( timescale_conn, postgres_conn, device_ids_list, start_time, end_time ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @router.get("/composite/pipeline-health-prediction") async def predict_pipeline_health( query_time: datetime = Query(..., description="查询时间"), network_name: str = Query(..., description="管网数据库名称"), timescale_conn: AsyncConnection = Depends(get_timescale_connection), ): """ 预测管道健康状况 根据管网名称和当前时间,查询管道信息和实时数据, 使用随机生存森林模型预测管道的生存概率 Args: query_time: 查询时间 db_name: 管网数据库名称 Returns: 预测结果列表,每个元素包含 link_id 和对应的生存函数 """ try: return await CompositeQueries.predict_pipeline_health( timescale_conn, network_name, query_time ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except FileNotFoundError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")