From 1fad2fde2c6ff6e7f19484e62d1e4a6302a38058 Mon Sep 17 00:00:00 2001 From: JIANG Date: Wed, 17 Dec 2025 11:12:47 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Efeatures=E6=9F=A5=E8=AF=A2sim?= =?UTF-8?q?ulation=E7=9A=84=E6=96=B9=E6=B3=95=E5=92=8C=E8=B7=AF=E7=94=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- simulation.py | 2 +- timescaledb/composite_queries.py | 113 +++++++++++++++++++++++++++++-- timescaledb/router.py | 60 ++++++++++++++++ 3 files changed, 170 insertions(+), 5 deletions(-) diff --git a/simulation.py b/simulation.py index f72edc8..61ab738 100644 --- a/simulation.py +++ b/simulation.py @@ -517,7 +517,7 @@ def get_realtime_region_patterns( name: str, source_outflow_region_id: dict, realtime_region_pipe_flow_and_demand_id: dict, -) -> (dict, dict): +) -> Tuple[dict, dict]: """ 根据每个 region,从 scada_info 表中查询 api_query_id 对应的 associated_pattern。 将结果分别存储到 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 两个字典中。 diff --git a/timescaledb/composite_queries.py b/timescaledb/composite_queries.py index 813b556..baf4c35 100644 --- a/timescaledb/composite_queries.py +++ b/timescaledb/composite_queries.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Any, Dict +from typing import List, Optional, Any, Dict, Tuple from datetime import datetime from psycopg import AsyncConnection import pandas as pd @@ -36,7 +36,6 @@ class CompositeQueries: device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 - field: 要查询的字段名 Returns: 模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id @@ -92,7 +91,7 @@ class CompositeQueries: scheme_name: str, ) -> Dict[str, List[Dict[str, Any]]]: """ - 获取 SCADA 关联的 link/node 模拟值 + 获取 SCADA 关联的 link/node scheme 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 @@ -103,7 +102,6 @@ class CompositeQueries: device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 - field: 要查询的字段名 Returns: 模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id @@ -160,6 +158,113 @@ class CompositeQueries: result[device_id] = res return result + @staticmethod + async def get_realtime_simulation_data( + timescale_conn: AsyncConnection, + featureInfos: List[Tuple[str, str]], + start_time: datetime, + end_time: datetime, + ) -> Dict[str, List[Dict[str, Any]]]: + """ + 获取 link/node 模拟值 + + 根据传入的 featureInfos,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + + Args: + timescale_conn: TimescaleDB 异步连接 + featureInfos: 传入的 feature 信息列表,包含 (element_id, type) + start_time: 开始时间 + end_time: 结束时间 + + Returns: + 模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id + + Raises: + ValueError: 当 SCADA 设备未找到或字段无效时 + """ + result = {} + for feature_id, type in featureInfos: + + if type.lower() == "pipe": + # 查询 link 模拟数据 + res = await RealtimeRepository.get_link_field_by_time_range( + timescale_conn, start_time, end_time, feature_id, "flow" + ) + elif type.lower() == "junction": + # 查询 node 模拟数据 + res = await RealtimeRepository.get_node_field_by_time_range( + timescale_conn, start_time, end_time, feature_id, "pressure" + ) + else: + raise ValueError(f"Unknown type: {type}") + # 添加 scada_id 到每个数据项 + for item in res: + item["feature_id"] = feature_id + result[feature_id] = res + return result + + @staticmethod + async def get_scheme_simulation_data( + timescale_conn: AsyncConnection, + featureInfos: List[Tuple[str, str]], + start_time: datetime, + end_time: datetime, + scheme_type: str, + scheme_name: str, + ) -> Dict[str, List[Dict[str, Any]]]: + """ + 获取 link/node scheme 模拟值 + + 根据传入的 featureInfos,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + + Args: + timescale_conn: TimescaleDB 异步连接 + featureInfos: 传入的 feature 信息列表,包含 (element_id, type) + start_time: 开始时间 + end_time: 结束时间 + scheme_type: 工况类型 + scheme_name: 工况名称 + + Returns: + 模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id + + Raises: + ValueError: 当类型无效时 + """ + result = {} + for feature_id, type in featureInfos: + if type.lower() == "pipe": + # 查询 link 模拟数据 + res = await SchemeRepository.get_link_field_by_scheme_and_time_range( + timescale_conn, + scheme_type, + scheme_name, + start_time, + end_time, + feature_id, + "flow", + ) + elif type.lower() == "junction": + # 查询 node 模拟数据 + res = await SchemeRepository.get_node_field_by_scheme_and_time_range( + timescale_conn, + scheme_type, + scheme_name, + start_time, + end_time, + feature_id, + "pressure", + ) + else: + raise ValueError(f"Unknown type: {type}") + # 添加 feature_id 到每个数据项 + for item in res: + item["feature_id"] = feature_id + result[feature_id] = res + return result + @staticmethod async def get_element_associated_scada_data( timescale_conn: AsyncConnection, diff --git a/timescaledb/router.py b/timescaledb/router.py index 40ad3be..4bab5c4 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -479,6 +479,66 @@ async def get_scada_associated_simulation_data( raise HTTPException(status_code=400, detail=str(e)) +@router.get("/composite/element-simulation") +async def get_feature_simulation_data( + start_time: datetime, + end_time: datetime, + feature_infos: str = Query( + ..., description="特征信息,格式: id1:type1,id2:type2,type为pipe或junction" + ), + scheme_type: str = Query(None, description="指定方案类型,若为空则查询实时数据"), + scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), + timescale_conn: AsyncConnection = Depends(get_database_connection), +): + """ + 获取 link/node 模拟值 + + 根据传入的 featureInfos,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + + Args: + feature_infos: 格式为 "element_id1:type1,element_id2:type2" + 例如: "P1:pipe,J1:junction" + """ + try: + # 解析 feature_infos 为 List[Tuple[str, str]] + feature_infos_list = [] + if feature_infos: + for item in feature_infos.split(","): + item = item.strip() + if ":" in item: + element_id, element_type = item.split(":", 1) + feature_infos_list.append( + (element_id.strip(), element_type.strip()) + ) + + if not feature_infos_list: + raise HTTPException(status_code=400, detail="feature_infos cannot be empty") + + if scheme_type and scheme_name: + result = await CompositeQueries.get_scheme_simulation_data( + timescale_conn, + feature_infos_list, + start_time, + end_time, + scheme_type, + scheme_name, + ) + else: + result = await CompositeQueries.get_realtime_simulation_data( + timescale_conn, + feature_infos_list, + start_time, + end_time, + ) + + if result is None: + raise HTTPException(status_code=404, detail="No simulation data found") + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + @router.get("/composite/element-scada") async def get_element_associated_scada_data( element_id: str,