From 1d55bf49920855d4462d0b48853af8d7a20f3eaf Mon Sep 17 00:00:00 2001 From: JIANG Date: Tue, 9 Dec 2025 15:10:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=86=85=E9=83=A8=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2=E6=96=B9=E6=B3=95=EF=BC=8C=E6=9B=BF=E6=8D=A2simulatio?= =?UTF-8?q?n=E4=B8=ADscada=E8=AE=BE=E5=A4=87=E6=9F=A5=E8=AF=A2=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | Bin 2764 -> 3050 bytes simulation.py | 44 +++++++++++++++---------------- timescaledb/internal_queries.py | 45 ++++++++++++++++++++++++++++++++ timescaledb/router.py | 16 ++++++------ timescaledb/schemas/scada.py | 39 +++++++++++++++++++-------- 5 files changed, 102 insertions(+), 42 deletions(-) diff --git a/requirements.txt b/requirements.txt index 27c2bd7914008ee8a2808be6ca61124c11c660db..cd35c031be9899b1687761da78db3115c3b41f10 100644 GIT binary patch delta 296 zcmX>j`bvDm8LoO|1_g!!hDwH9hE#?UhD?THAm0Iq(}3(8pqLedJ{Y=ySxG>#6d)f} zM?8ZegE@m4gAs!XgDDUi0%=Pi--N*csJ5ITpP`5$8)&A~gya1R49P%#9?+CTph#zZ z^wAD<2k0==Pw)*CtH0R&Zt9E~s2VbXT8n`WO$0h52WWsU&^TljxtZ49MRFKqHHQEL$KnV9*0&BOnO?AwW9j delta 7 OcmaDQenxb|87=@0`2%SH diff --git a/simulation.py b/simulation.py index 70d3adc..374e322 100644 --- a/simulation.py +++ b/simulation.py @@ -774,8 +774,8 @@ def run_simulation( if globals.reservoirs_id: # reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'} # 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'} - reservoir_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.reservoirs_id.values()), + reservoir_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(globals.reservoirs_id.values()), query_time=modify_pattern_start_time, ) # 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'} @@ -798,8 +798,8 @@ def run_simulation( set_pattern(name_c, cs) if globals.tanks_id: # 修改tank初始液位 - tank_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.tanks_id.values()), + tank_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(globals.tanks_id.values()), query_time=modify_pattern_start_time, ) tank_dict = { @@ -814,11 +814,9 @@ def run_simulation( set_tank(name_c, cs) if globals.fixed_pumps_id: # 修改工频泵的pattern - fixed_pump_SCADA_data_dict = ( - influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.fixed_pumps_id.values()), - query_time=modify_pattern_start_time, - ) + fixed_pump_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(globals.fixed_pumps_id.values()), + query_time=modify_pattern_start_time, ) # print(fixed_pump_SCADA_data_dict) fixed_pump_dict = { @@ -840,8 +838,8 @@ def run_simulation( if globals.variable_pumps_id: # 修改变频泵的pattern variable_pump_SCADA_data_dict = ( - influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.variable_pumps_id.values()), + TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time, ) ) @@ -860,8 +858,8 @@ def run_simulation( set_pattern(name_c, cs) if globals.demand_id: # 基于实时数据,修改大用户节点的pattern - demand_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.demand_id.values()), + demand_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(globals.demand_id.values()), query_time=modify_pattern_start_time, ) demand_dict = { @@ -887,8 +885,8 @@ def run_simulation( if globals.source_outflow_pattern_id: # 基于实时的出厂流量计数据,修改出厂流量计绑定的pattern source_outflow_SCADA_data_dict = ( - influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.source_outflow_pattern_id.values()), + TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time, ) ) @@ -925,8 +923,8 @@ def run_simulation( if globals.realtime_pipe_flow_pattern_id: # 基于实时的pipe_flow类数据,修改pipe_flow类绑定的pattern realtime_pipe_flow_SCADA_data_dict = ( - influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.realtime_pipe_flow_pattern_id.values()), + TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(globals.realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time, ) ) @@ -962,8 +960,8 @@ def run_simulation( query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region) temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id temp_realtime_pipe_flow_SCADA_data_dict = ( - influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(temp_realtime_pipe_flow_pattern_id.values()), + TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=list(temp_realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time, ) ) @@ -1020,14 +1018,14 @@ def run_simulation( globals.non_realtime_region_patterns.get(region, []) ) region_source_outflow_data_dict = ( - influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=temp_source_outflow_region_id, + TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=temp_source_outflow_region_id, query_time=modify_pattern_start_time, ) ) region_realtime_region_pipe_flow_and_demand_data_dict = ( - influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=temp_realtime_region_pipe_flow_and_demand_id, + TimescaleInternalStorage.query_scada_by_ids_time( + device_ids=temp_realtime_region_pipe_flow_and_demand_id, query_time=modify_pattern_start_time, ) ) diff --git a/timescaledb/internal_queries.py b/timescaledb/internal_queries.py index 5d17d6d..9bbc724 100644 --- a/timescaledb/internal_queries.py +++ b/timescaledb/internal_queries.py @@ -4,6 +4,8 @@ from fastapi.logger import logger from timescaledb.schemas.scheme import SchemeRepository from timescaledb.schemas.realtime import RealtimeRepository import timescaledb.timescaledb_info as timescaledb_info +from datetime import datetime, timedelta +from timescaledb.schemas.scada import ScadaRepository import psycopg import time @@ -75,3 +77,46 @@ class InternalStorage: time.sleep(1) # 重试前等待 else: raise # 达到最大重试次数后抛出异常 + + @staticmethod + def query_scada_by_ids_time( + device_ids: List[str], + query_time: str, + db_name: str = None, + max_retries: int = 3, + ) -> dict: + """查询指定时间点的 SCADA 数据""" + + # 解析时间,假设是北京时间 + beijing_time = datetime.fromisoformat(query_time) + start_time = beijing_time - timedelta(seconds=1) + end_time = beijing_time + timedelta(seconds=1) + + for attempt in range(max_retries): + try: + conn_string = ( + timescaledb_info.get_pgconn_string(db_name=db_name) + if db_name + else timescaledb_info.get_pgconn_string() + ) + with psycopg.Connection.connect(conn_string) as conn: + rows = ScadaRepository.get_scada_by_ids_time_range_sync( + conn, device_ids, start_time, end_time + ) + # 处理结果,返回每个 device_id 的第一个值 + result = {} + for device_id in device_ids: + device_rows = [ + row for row in rows if row["device_id"] == device_id + ] + if device_rows: + result[device_id] = device_rows[0]["monitored_value"] + else: + result[device_id] = None + return result + except Exception as e: + logger.error(f"查询尝试 {attempt + 1} 失败: {e}") + if attempt < max_retries - 1: + time.sleep(1) + else: + raise diff --git a/timescaledb/router.py b/timescaledb/router.py index 845f4b6..ad99abc 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -358,20 +358,20 @@ async def insert_scada_data( @router.get("/scada") -async def get_scada_by_id_time_range( - device_id: str, +async def get_scada_by_ids_time_range( + device_ids: List[str], start_time: datetime, end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - return await ScadaRepository.get_scada_by_id_time_range( - conn, device_id, start_time, end_time + return await ScadaRepository.get_scada_by_ids_time_range( + conn, device_ids, start_time, end_time ) -@router.get("/scada/{device_id}/field") -async def get_scada_field_by_id_time_range( - device_id: str, +@router.get("/scada/field") +async def get_scada_field_by_ids_time_range( + device_ids: List[str], start_time: datetime, end_time: datetime, field: str, @@ -379,7 +379,7 @@ async def get_scada_field_by_id_time_range( ): try: return await ScadaRepository.get_scada_field_by_id_time_range( - conn, device_id, start_time, end_time, field + conn, device_ids, start_time, end_time, field ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) diff --git a/timescaledb/schemas/scada.py b/timescaledb/schemas/scada.py index 03271f4..6ce7a8a 100644 --- a/timescaledb/schemas/scada.py +++ b/timescaledb/schemas/scada.py @@ -1,6 +1,6 @@ from typing import List, Any from datetime import datetime -from psycopg import AsyncConnection, sql +from psycopg import AsyncConnection, Connection, sql class ScadaRepository: @@ -25,36 +25,53 @@ class ScadaRepository: ) @staticmethod - async def get_scada_by_id_time_range( - conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime + async def get_scada_by_ids_time_range( + conn: AsyncConnection, + device_ids: List[str], + start_time: datetime, + end_time: datetime, ) -> List[dict]: async with conn.cursor() as cur: await cur.execute( - "SELECT * FROM scada.scada_data WHERE device_id = %s AND time >= %s AND time <= %s", - (device_id, start_time, end_time), + "SELECT * FROM scada.scada_data WHERE device_id = ANY(%s) AND time >= %s AND time <= %s", + (device_ids, start_time, end_time), ) return await cur.fetchall() + @staticmethod + def get_scada_by_ids_time_range_sync( + conn: Connection, + device_ids: List[str], + start_time: datetime, + end_time: datetime, + ) -> List[dict]: + with conn.cursor() as cur: + cur.execute( + "SELECT * FROM scada.scada_data WHERE device_id = ANY(%s) AND time >= %s AND time <= %s", + (device_ids, start_time, end_time), + ) + return cur.fetchall() + @staticmethod async def get_scada_field_by_id_time_range( conn: AsyncConnection, - device_id: str, + device_ids: List[str], start_time: datetime, end_time: datetime, field: str, - ) -> Any: + ) -> List[dict]: valid_fields = {"monitored_value", "cleaned_value"} if field not in valid_fields: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scada.scada_data WHERE time >= %s AND time <= %s AND device_id = %s" + "SELECT device_id, {} FROM scada.scada_data WHERE time >= %s AND time <= %s AND device_id = ANY(%s)" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (start_time, end_time, device_id)) - row = await cur.fetchone() - return row[field] if row else None + await cur.execute(query, (start_time, end_time, device_ids)) + rows = await cur.fetchall() + return [{"device_id": row["device_id"], field: row[field]} for row in rows] @staticmethod async def update_scada_field(