From 6582bf887989144e73bc955fada0e2bcd91fd30d Mon Sep 17 00:00:00 2001 From: JIANG Date: Wed, 10 Dec 2025 15:28:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=96=B9=E6=B3=95=E4=BB=A5?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=A4=9Aid=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- timescaledb/composite_queries.py | 145 ++++++++++++++++--------------- timescaledb/router.py | 36 ++++++-- 2 files changed, 103 insertions(+), 78 deletions(-) diff --git a/timescaledb/composite_queries.py b/timescaledb/composite_queries.py index 51caa3e..af1fe33 100644 --- a/timescaledb/composite_queries.py +++ b/timescaledb/composite_queries.py @@ -17,133 +17,140 @@ class CompositeQueries: async def get_scada_associated_realtime_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, - device_id: str, + device_ids: List[str], start_time: datetime, end_time: datetime, field: str, - ) -> Optional[Any]: + ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 - 根据传入的 SCADA device_id,找到关联的 link/node, + 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 - device_id: SCADA 设备ID + device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: - 模拟数据值,如果没有找到则返回 None + 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ + results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) - # 2. 根据 device_id 找到对应的 SCADA 信息 - target_scada = None - for scada in scada_infos: - if scada["id"] == device_id: - target_scada = scada - break + for device_id in device_ids: + # 2. 根据 device_id 找到对应的 SCADA 信息 + target_scada = None + for scada in scada_infos: + if scada["id"] == device_id: + target_scada = scada + break - if not target_scada: - raise ValueError(f"SCADA device {device_id} not found") + if not target_scada: + raise ValueError(f"SCADA device {device_id} not found") - # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 - element_id = target_scada["associated_element_id"] - scada_type = target_scada["type"] + # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 + element_id = target_scada["associated_element_id"] + scada_type = target_scada["type"] - if scada_type.lower() == "pipe_flow": - # 查询 link 模拟数据 - return await RealtimeRepository.get_link_field_by_time_range( - timescale_conn, start_time, end_time, element_id, field - ) - elif scada_type.lower() == "pressure": - # 查询 node 模拟数据 - return await RealtimeRepository.get_node_field_by_time_range( - timescale_conn, start_time, end_time, element_id, field - ) - else: - raise ValueError(f"Unknown SCADA type: {scada_type}") + if scada_type.lower() == "pipe_flow": + # 查询 link 模拟数据 + res = await RealtimeRepository.get_link_field_by_time_range( + timescale_conn, start_time, end_time, element_id, field + ) + elif scada_type.lower() == "pressure": + # 查询 node 模拟数据 + res = await RealtimeRepository.get_node_field_by_time_range( + timescale_conn, start_time, end_time, element_id, field + ) + else: + raise ValueError(f"Unknown SCADA type: {scada_type}") + results.append(res) + return results @staticmethod async def get_scada_associated_scheme_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, - device_id: str, + device_ids: List[str], start_time: datetime, end_time: datetime, scheme_type: str, scheme_name: str, - field: str, - ) -> Optional[Any]: + ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 - 根据传入的 SCADA device_id,找到关联的 link/node, + 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 - device_id: SCADA 设备ID + device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: - 模拟数据值,如果没有找到则返回 None + 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ + results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) - # 2. 根据 device_id 找到对应的 SCADA 信息 - target_scada = None - for scada in scada_infos: - if scada["id"] == device_id: - target_scada = scada - break + for device_id in device_ids: + # 2. 根据 device_id 找到对应的 SCADA 信息 + target_scada = None + for scada in scada_infos: + if scada["id"] == device_id: + target_scada = scada + break - if not target_scada: - raise ValueError(f"SCADA device {device_id} not found") + if not target_scada: + raise ValueError(f"SCADA device {device_id} not found") - # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 - element_id = target_scada["associated_element_id"] - scada_type = target_scada["type"] + # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 + element_id = target_scada["associated_element_id"] + scada_type = target_scada["type"] - if scada_type.lower() == "pipe_flow": - # 查询 link 模拟数据 - return await SchemeRepository.get_link_field_by_scheme_and_time_range( - timescale_conn, - scheme_type, - scheme_name, - start_time, - end_time, - element_id, - field, - ) - elif scada_type.lower() == "pressure": - # 查询 node 模拟数据 - return await SchemeRepository.get_node_field_by_scheme_and_time_range( - timescale_conn, - scheme_type, - scheme_name, - start_time, - end_time, - element_id, - field, - ) - else: - raise ValueError(f"Unknown SCADA type: {scada_type}") + if scada_type.lower() == "pipe_flow": + # 查询 link 模拟数据 + res = await SchemeRepository.get_link_field_by_scheme_and_time_range( + timescale_conn, + scheme_type, + scheme_name, + start_time, + end_time, + element_id, + "flow", + ) + elif scada_type.lower() == "pressure": + # 查询 node 模拟数据 + res = await SchemeRepository.get_node_field_by_scheme_and_time_range( + timescale_conn, + scheme_type, + scheme_name, + start_time, + end_time, + element_id, + "pressure", + ) + else: + raise ValueError(f"Unknown SCADA type: {scada_type}") + results.append(res) + return results @staticmethod async def get_element_associated_scada_data( diff --git a/timescaledb/router.py b/timescaledb/router.py index 4f396ba..ce99170 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -361,7 +361,7 @@ async def insert_scada_data( return {"message": f"Inserted {len(data)} records"} -@router.get("/scada") +@router.get("/scada/by-ids-time-range") async def get_scada_by_ids_time_range( device_ids: List[str], start_time: datetime, @@ -373,7 +373,7 @@ async def get_scada_by_ids_time_range( ) -@router.get("/scada/field") +@router.get("/scada/by-ids-field-time-range") async def get_scada_field_by_ids_time_range( device_ids: List[str], start_time: datetime, @@ -404,7 +404,7 @@ async def update_scada_field( raise HTTPException(status_code=400, detail=str(e)) -@router.delete("/scada") +@router.delete("/scada/by-id-time-range") async def delete_scada_data( device_id: str, start_time: datetime, @@ -422,23 +422,41 @@ async def delete_scada_data( @router.get("/composite/scada-simulation") async def get_scada_associated_simulation_data( - device_id: str, + device_ids: List[str], start_time: datetime, end_time: datetime, - field: str, + scheme_type: str = Query(None, description="指定方案名称,若为空则查询实时数据"), + scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), timescale_conn: AsyncConnection = Depends(get_database_connection), postgres_conn: AsyncConnection = Depends(get_postgres_connection), ): """ 获取 SCADA 关联的 link/node 模拟值 - 根据传入的 SCADA device_id,找到关联的 link/node, + 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 """ try: - result = await CompositeQueries.get_scada_associated_simulation_data( - timescale_conn, postgres_conn, device_id, start_time, end_time, field - ) + if scheme_type and scheme_name: + result = await CompositeQueries.get_scada_associated_scheme_simulation_data( + timescale_conn, + postgres_conn, + device_ids, + start_time, + end_time, + scheme_type, + scheme_name, + ) + else: + result = ( + await CompositeQueries.get_scada_associated_realtime_simulation_data( + timescale_conn, + postgres_conn, + device_ids, + start_time, + end_time, + ) + ) if result is None: raise HTTPException(status_code=404, detail="No simulation data found") return result