From 4fbdea435bacfbbaf0584c053d8f6130ecf5f9f3 Mon Sep 17 00:00:00 2001 From: JIANG Date: Fri, 5 Dec 2025 18:27:58 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=20scheme=20=E8=A1=A8?= =?UTF-8?q?=E4=B8=8B=E7=9A=84=E5=AD=97=E6=AE=B5=20scheme=5Ftype=20scheme?= =?UTF-8?q?=5Fname?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- simulation.py | 42 ++++-- timescaledb/composite_queries.py | 76 ++++++++++- timescaledb/internal_queries.py | 45 +++++++ timescaledb/router.py | 55 +++++--- timescaledb/schemas/scheme.py | 217 +++++++++++++++++++------------ 5 files changed, 324 insertions(+), 111 deletions(-) create mode 100644 timescaledb/internal_queries.py diff --git a/simulation.py b/simulation.py index 2c14d4a..017d75a 100644 --- a/simulation.py +++ b/simulation.py @@ -21,6 +21,8 @@ import globals import uuid import project_info from api.postgresql_info import get_pgconn_string +import asyncio +from timescaledb.internal_queries import InternalStorage as TimescaleInternalStorage logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" @@ -1229,20 +1231,40 @@ def run_simulation( # print(node_result) # 存储 if simulation_type.upper() == "REALTIME": - influxdb_api.store_realtime_simulation_result_to_influxdb( - node_result, link_result, modify_pattern_start_time + asyncio.run( + TimescaleInternalStorage.store_realtime_simulation( + node_result, link_result, modify_pattern_start_time + ) ) elif simulation_type.upper() == "EXTENDED": - influxdb_api.store_scheme_simulation_result_to_influxdb( - node_result, - link_result, - modify_pattern_start_time, - num_periods_result, - scheme_Type, - scheme_Name, + asyncio.run( + TimescaleInternalStorage.store_scheme_simulation( + scheme_Type, + scheme_Name, + node_result, + link_result, + modify_pattern_start_time, + num_periods_result, + ) ) # 暂不需要再次存储 SCADA 模拟信息 - # influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) + # TimescaleInternalStorage.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) + + # if simulation_type.upper() == "REALTIME": + # influxdb_api.store_realtime_simulation_result_to_influxdb( + # node_result, link_result, modify_pattern_start_time + # ) + # elif simulation_type.upper() == "EXTENDED": + # influxdb_api.store_scheme_simulation_result_to_influxdb( + # node_result, + # link_result, + # modify_pattern_start_time, + # num_periods_result, + # scheme_Type, + # scheme_Name, + # ) + # 暂不需要再次存储 SCADA 模拟信息 + # influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) print("after store result") diff --git a/timescaledb/composite_queries.py b/timescaledb/composite_queries.py index d6c9ff7..51caa3e 100644 --- a/timescaledb/composite_queries.py +++ b/timescaledb/composite_queries.py @@ -4,6 +4,7 @@ from psycopg import AsyncConnection from postgresql.scada_info import ScadaRepository as PostgreScadaRepository from timescaledb.schemas.realtime import RealtimeRepository +from timescaledb.schemas.scheme import SchemeRepository from timescaledb.schemas.scada import ScadaRepository @@ -13,7 +14,7 @@ class CompositeQueries: """ @staticmethod - async def get_scada_associated_simulation_data( + async def get_scada_associated_realtime_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_id: str, @@ -71,6 +72,79 @@ class CompositeQueries: else: raise ValueError(f"Unknown SCADA type: {scada_type}") + @staticmethod + async def get_scada_associated_scheme_simulation_data( + timescale_conn: AsyncConnection, + postgres_conn: AsyncConnection, + device_id: str, + start_time: datetime, + end_time: datetime, + scheme_type: str, + scheme_name: str, + field: str, + ) -> Optional[Any]: + """ + 获取 SCADA 关联的 link/node 模拟值 + + 根据传入的 SCADA device_id,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + + Args: + timescale_conn: TimescaleDB 异步连接 + postgres_conn: PostgreSQL 异步连接 + device_id: SCADA 设备ID + start_time: 开始时间 + end_time: 结束时间 + field: 要查询的字段名 + + Returns: + 模拟数据值,如果没有找到则返回 None + + Raises: + ValueError: 当 SCADA 设备未找到或字段无效时 + """ + # 1. 查询所有 SCADA 信息 + scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) + + # 2. 根据 device_id 找到对应的 SCADA 信息 + target_scada = None + for scada in scada_infos: + if scada["id"] == device_id: + target_scada = scada + break + + if not target_scada: + raise ValueError(f"SCADA device {device_id} not found") + + # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 + element_id = target_scada["associated_element_id"] + scada_type = target_scada["type"] + + if scada_type.lower() == "pipe_flow": + # 查询 link 模拟数据 + return await SchemeRepository.get_link_field_by_scheme_and_time_range( + timescale_conn, + scheme_type, + scheme_name, + start_time, + end_time, + element_id, + field, + ) + elif scada_type.lower() == "pressure": + # 查询 node 模拟数据 + return await SchemeRepository.get_node_field_by_scheme_and_time_range( + timescale_conn, + scheme_type, + scheme_name, + start_time, + end_time, + element_id, + field, + ) + else: + raise ValueError(f"Unknown SCADA type: {scada_type}") + @staticmethod async def get_element_associated_scada_data( timescale_conn: AsyncConnection, diff --git a/timescaledb/internal_queries.py b/timescaledb/internal_queries.py new file mode 100644 index 0000000..5a89c96 --- /dev/null +++ b/timescaledb/internal_queries.py @@ -0,0 +1,45 @@ +from typing import List +from timescaledb.schemas.scheme import SchemeRepository +from timescaledb.schemas.realtime import RealtimeRepository +from timescaledb.database import get_database_instance + +# 内部使用存储类 + + +class InternalStorage: + @staticmethod + async def store_realtime_simulation( + node_result_list: List[dict], + link_result_list: List[dict], + result_start_time: str, + db_name: str = None, + ): + """存储实时模拟结果""" + instance = await get_database_instance(db_name) + async with instance.get_connection() as conn: + await RealtimeRepository.store_realtime_simulation_result( + conn, node_result_list, link_result_list, result_start_time + ) + + @staticmethod + async def store_scheme_simulation( + scheme_type: str, + scheme_name: str, + node_result_list: List[dict], + link_result_list: List[dict], + result_start_time: str, + num_periods: int = 1, + db_name: str = None, + ): + """存储方案模拟结果""" + instance = await get_database_instance(db_name) + async with instance.get_connection() as conn: + await SchemeRepository.store_scheme_simulation_result( + conn, + scheme_type, + scheme_name, + node_result_list, + link_result_list, + result_start_time, + num_periods, + ) diff --git a/timescaledb/router.py b/timescaledb/router.py index fe2ab72..1fb889f 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -168,19 +168,21 @@ async def insert_scheme_links( @router.get("/scheme/links") async def get_scheme_links( - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): return await SchemeRepository.get_links_by_scheme_and_time_range( - conn, scheme, start_time, end_time + conn, scheme_type, scheme_name, start_time, end_time ) @router.get("/scheme/links/{link_id}/field") async def get_scheme_link_field( - scheme: str, + scheme_type: str, + scheme_name: str, link_id: str, start_time: datetime, end_time: datetime, @@ -189,7 +191,7 @@ async def get_scheme_link_field( ): try: return await SchemeRepository.get_link_field_by_scheme_and_time_range( - conn, scheme, start_time, end_time, link_id, field + conn, scheme_type, scheme_name, start_time, end_time, link_id, field ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -197,7 +199,8 @@ async def get_scheme_link_field( @router.patch("/scheme/links/{link_id}/field") async def update_scheme_link_field( - scheme: str, + scheme_type: str, + scheme_name: str, link_id: str, time: datetime, field: str, @@ -206,7 +209,7 @@ async def update_scheme_link_field( ): try: await SchemeRepository.update_link_field( - conn, time, scheme, link_id, field, value + conn, time, scheme_type, scheme_name, link_id, field, value ) return {"message": "Updated successfully"} except ValueError as e: @@ -215,13 +218,14 @@ async def update_scheme_link_field( @router.delete("/scheme/links") async def delete_scheme_links( - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): await SchemeRepository.delete_links_by_scheme_and_time_range( - conn, scheme, start_time, end_time + conn, scheme_type, scheme_name, start_time, end_time ) return {"message": "Deleted successfully"} @@ -236,7 +240,8 @@ async def insert_scheme_nodes( @router.get("/scheme/nodes/{node_id}/field") async def get_scheme_node_field( - scheme: str, + scheme_type: str, + scheme_name: str, node_id: str, start_time: datetime, end_time: datetime, @@ -245,7 +250,7 @@ async def get_scheme_node_field( ): try: return await SchemeRepository.get_node_field_by_scheme_and_time_range( - conn, scheme, start_time, end_time, node_id, field + conn, scheme_type, scheme_name, start_time, end_time, node_id, field ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -253,7 +258,8 @@ async def get_scheme_node_field( @router.patch("/scheme/nodes/{node_id}/field") async def update_scheme_node_field( - scheme: str, + scheme_type: str, + scheme_name: str, node_id: str, time: datetime, field: str, @@ -262,7 +268,7 @@ async def update_scheme_node_field( ): try: await SchemeRepository.update_node_field( - conn, time, scheme, node_id, field, value + conn, time, scheme_type, scheme_name, node_id, field, value ) return {"message": "Updated successfully"} except ValueError as e: @@ -271,20 +277,22 @@ async def update_scheme_node_field( @router.delete("/scheme/nodes") async def delete_scheme_nodes( - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): await SchemeRepository.delete_nodes_by_scheme_and_time_range( - conn, scheme, start_time, end_time + conn, scheme_type, scheme_name, start_time, end_time ) return {"message": "Deleted successfully"} @router.post("/scheme/simulation/store", status_code=201) async def store_scheme_simulation_result( - scheme: str, + scheme_type: str, + scheme_name: str, node_result_list: List[dict], link_result_list: List[dict], result_start_time: str, @@ -292,14 +300,20 @@ async def store_scheme_simulation_result( ): """Store scheme simulation results to TimescaleDB""" await SchemeRepository.store_scheme_simulation_result( - conn, scheme, node_result_list, link_result_list, result_start_time + conn, + scheme_type, + scheme_name, + node_result_list, + link_result_list, + result_start_time, ) return {"message": "Scheme simulation results stored successfully"} @router.get("/scheme/query/by-scheme-time-property") async def query_scheme_records_by_scheme_time_property( - scheme: str, + scheme_type: str, + scheme_name: str, query_time: str, type: str, property: str, @@ -308,7 +322,7 @@ async def query_scheme_records_by_scheme_time_property( """Query all scheme records by scheme, time and property""" try: return await SchemeRepository.query_all_record_by_scheme_time_property( - conn, scheme, query_time, type, property + conn, scheme_type, scheme_name, query_time, type, property ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -316,7 +330,8 @@ async def query_scheme_records_by_scheme_time_property( @router.get("/scheme/query/by-id-time") async def query_scheme_simulation_by_id_time( - scheme: str, + scheme_type: str, + scheme_name: str, ID: str, type: str, query_time: str, @@ -325,7 +340,7 @@ async def query_scheme_simulation_by_id_time( """Query scheme simulation results by ID and time""" try: return await SchemeRepository.query_scheme_simulation_result_by_ID_time( - conn, scheme, ID, type, query_time + conn, scheme_type, scheme_name, ID, type, query_time ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) diff --git a/timescaledb/schemas/scheme.py b/timescaledb/schemas/scheme.py index 6f8e293..81e6aaf 100644 --- a/timescaledb/schemas/scheme.py +++ b/timescaledb/schemas/scheme.py @@ -1,6 +1,7 @@ from typing import List, Any, Dict from datetime import datetime, timedelta, timezone from psycopg import AsyncConnection, sql +import globals # 定义UTC+8时区 UTC_8 = timezone(timedelta(hours=8)) @@ -18,13 +19,14 @@ class SchemeRepository: async with conn.cursor() as cur: async with cur.copy( - "COPY scheme.link_simulation (time, scheme, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" + "COPY scheme.link_simulation (time, scheme_type, scheme_name, id, flow, friction, headloss, quality, reaction, setting, status, velocity) FROM STDIN" ) as copy: for item in data: await copy.write_row( ( item["time"], - item["scheme"], + item["scheme_type"], + item["scheme_name"], item["id"], item.get("flow"), item.get("friction"), @@ -40,33 +42,39 @@ class SchemeRepository: @staticmethod async def get_link_by_scheme_and_time_range( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, link_id: str, ) -> List[dict]: async with conn.cursor() as cur: await cur.execute( - "SELECT * FROM scheme.link_simulation WHERE scheme = %s AND time >= %s AND time <= %s AND id = %s", - (scheme, start_time, end_time, link_id), + "SELECT * FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s AND id = %s", + (scheme_type, scheme_name, start_time, end_time, link_id), ) return await cur.fetchall() @staticmethod async def get_links_by_scheme_and_time_range( - conn: AsyncConnection, scheme: str, start_time: datetime, end_time: datetime + conn: AsyncConnection, + scheme_type: str, + scheme_name: str, + start_time: datetime, + end_time: datetime, ) -> List[dict]: async with conn.cursor() as cur: await cur.execute( - "SELECT * FROM scheme.link_simulation WHERE scheme = %s AND time >= %s AND time <= %s", - (scheme, start_time, end_time), + "SELECT * FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s", + (scheme_type, scheme_name, start_time, end_time), ) return await cur.fetchall() @staticmethod async def get_link_field_by_scheme_and_time_range( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, link_id: str, @@ -87,18 +95,21 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scheme.link_simulation WHERE scheme = %s AND time >= %s AND time <= %s AND id = %s" + "SELECT {} FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s AND id = %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (scheme, start_time, end_time, link_id)) + await cur.execute( + query, (scheme_type, scheme_name, start_time, end_time, link_id) + ) row = await cur.fetchone() return row[field] if row else None @staticmethod async def get_links_field_by_scheme_and_time_range( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, field: str, @@ -118,11 +129,11 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scheme.link_simulation WHERE scheme = %s AND time >= %s AND time <= %s" + "SELECT {} FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (scheme, start_time, end_time)) + await cur.execute(query, (scheme_type, scheme_name, start_time, end_time)) row = await cur.fetchone() return row[field] if row else None @@ -130,7 +141,8 @@ class SchemeRepository: async def update_link_field( conn: AsyncConnection, time: datetime, - scheme: str, + scheme_type: str, + scheme_name: str, link_id: str, field: str, value: Any, @@ -149,20 +161,24 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "UPDATE scheme.link_simulation SET {} = %s WHERE time = %s AND scheme = %s AND id = %s" + "UPDATE scheme.link_simulation SET {} = %s WHERE time = %s AND scheme_type = %s AND scheme_name = %s AND id = %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (value, time, scheme, link_id)) + await cur.execute(query, (value, time, scheme_type, scheme_name, link_id)) @staticmethod async def delete_links_by_scheme_and_time_range( - conn: AsyncConnection, scheme: str, start_time: datetime, end_time: datetime + conn: AsyncConnection, + scheme_type: str, + scheme_name: str, + start_time: datetime, + end_time: datetime, ): async with conn.cursor() as cur: await cur.execute( - "DELETE FROM scheme.link_simulation WHERE scheme = %s AND time >= %s AND time <= %s", - (scheme, start_time, end_time), + "DELETE FROM scheme.link_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s", + (scheme_type, scheme_name, start_time, end_time), ) # --- Node Simulation --- @@ -174,13 +190,14 @@ class SchemeRepository: async with conn.cursor() as cur: async with cur.copy( - "COPY scheme.node_simulation (time, scheme, id, actual_demand, total_head, pressure, quality) FROM STDIN" + "COPY scheme.node_simulation (time, scheme_type, scheme_name, id, actual_demand, total_head, pressure, quality) FROM STDIN" ) as copy: for item in data: await copy.write_row( ( item["time"], - item["scheme"], + item["scheme_type"], + item["scheme_name"], item["id"], item.get("actual_demand"), item.get("total_head"), @@ -192,33 +209,39 @@ class SchemeRepository: @staticmethod async def get_node_by_scheme_and_time_range( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, node_id: str, ) -> List[dict]: async with conn.cursor() as cur: await cur.execute( - "SELECT * FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s AND id = %s", - (scheme, start_time, end_time, node_id), + "SELECT * FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s AND id = %s", + (scheme_type, scheme_name, start_time, end_time, node_id), ) return await cur.fetchall() @staticmethod async def get_nodes_by_scheme_and_time_range( - conn: AsyncConnection, scheme: str, start_time: datetime, end_time: datetime + conn: AsyncConnection, + scheme_type: str, + scheme_name: str, + start_time: datetime, + end_time: datetime, ) -> List[dict]: async with conn.cursor() as cur: await cur.execute( - "SELECT * FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s", - (scheme, start_time, end_time), + "SELECT * FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s", + (scheme_type, scheme_name, start_time, end_time), ) return await cur.fetchall() @staticmethod async def get_node_field_by_scheme_and_time_range( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, node_id: str, @@ -230,18 +253,21 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s AND id = %s" + "SELECT {} FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s AND id = %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (scheme, start_time, end_time, node_id)) + await cur.execute( + query, (scheme_type, scheme_name, start_time, end_time, node_id) + ) row = await cur.fetchone() return row[field] if row else None @staticmethod async def get_nodes_field_by_scheme_and_time_range( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, start_time: datetime, end_time: datetime, field: str, @@ -252,11 +278,11 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s" + "SELECT {} FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (scheme, start_time, end_time)) + await cur.execute(query, (scheme_type, scheme_name, start_time, end_time)) row = await cur.fetchone() return row[field] if row else None @@ -264,7 +290,8 @@ class SchemeRepository: async def update_node_field( conn: AsyncConnection, time: datetime, - scheme: str, + scheme_type: str, + scheme_name: str, node_id: str, field: str, value: Any, @@ -274,20 +301,24 @@ class SchemeRepository: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "UPDATE scheme.node_simulation SET {} = %s WHERE time = %s AND scheme = %s AND id = %s" + "UPDATE scheme.node_simulation SET {} = %s WHERE time = %s AND scheme_type = %s AND scheme_name = %s AND id = %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (value, time, scheme, node_id)) + await cur.execute(query, (value, time, scheme_type, scheme_name, node_id)) @staticmethod async def delete_nodes_by_scheme_and_time_range( - conn: AsyncConnection, scheme: str, start_time: datetime, end_time: datetime + conn: AsyncConnection, + scheme_type: str, + scheme_name: str, + start_time: datetime, + end_time: datetime, ): async with conn.cursor() as cur: await cur.execute( - "DELETE FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s", - (scheme, start_time, end_time), + "DELETE FROM scheme.node_simulation WHERE scheme_type = %s AND scheme_name = %s AND time >= %s AND time <= %s", + (scheme_type, scheme_name, start_time, end_time), ) # --- 复合查询 --- @@ -295,17 +326,20 @@ class SchemeRepository: @staticmethod async def store_scheme_simulation_result( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], result_start_time: str, + num_periods: int = 1, ): """ Store scheme simulation results to TimescaleDB. Args: conn: Database connection - scheme: Scheme name + scheme_type: Scheme type + scheme_name: Scheme name node_result_list: List of node simulation results link_result_list: List of link simulation results result_start_time: Start time for the results (ISO format string) @@ -313,9 +347,11 @@ class SchemeRepository: # Convert result_start_time string to datetime if needed if isinstance(result_start_time, str): # 如果是ISO格式字符串,解析并转换为UTC+8 - if result_start_time.endswith('Z'): + if result_start_time.endswith("Z"): # UTC时间,转换为UTC+8 - utc_time = datetime.fromisoformat(result_start_time.replace("Z", "+00:00")) + utc_time = datetime.fromisoformat( + result_start_time.replace("Z", "+00:00") + ) simulation_time = utc_time.astimezone(UTC_8) else: # 假设已经是UTC+8时间 @@ -327,39 +363,56 @@ class SchemeRepository: if simulation_time.tzinfo is None: simulation_time = simulation_time.replace(tzinfo=UTC_8) + timestep_parts = globals.hydraulic_timestep.split(":") + timestep = timedelta( + hours=int(timestep_parts[0]), + minutes=int(timestep_parts[1]), + seconds=int(timestep_parts[2]), + ) + # Prepare node data for batch insert node_data = [] for node_result in node_result_list: - node_data.append( - { - "time": simulation_time, - "scheme": scheme, - "id": node_result.get("id"), - "actual_demand": node_result.get("actual_demand"), - "total_head": node_result.get("total_head"), - "pressure": node_result.get("pressure"), - "quality": node_result.get("quality"), - } - ) + node_id = node_result.get("node") + for period_index in range(num_periods): + current_time = simulation_time + (timestep * period_index) + data = node_result.get("result", [])[period_index] + node_data.append( + { + "time": current_time, + "scheme_type": scheme_type, + "scheme_name": scheme_name, + "id": node_id, + "actual_demand": data.get("demand"), + "total_head": data.get("head"), + "pressure": data.get("pressure"), + "quality": data.get("quality"), + } + ) # Prepare link data for batch insert link_data = [] for link_result in link_result_list: - link_data.append( - { - "time": simulation_time, - "scheme": scheme, - "id": link_result.get("id"), - "flow": link_result.get("flow"), - "friction": link_result.get("friction"), - "headloss": link_result.get("headloss"), - "quality": link_result.get("quality"), - "reaction": link_result.get("reaction"), - "setting": link_result.get("setting"), - "status": link_result.get("status"), - "velocity": link_result.get("velocity"), - } - ) + link_id = link_result.get("link") + for period_index in range(num_periods): + current_time = simulation_time + (timestep * period_index) + data = link_result.get("result", [])[period_index] + link_data.append( + { + "time": current_time, + "scheme_type": scheme_type, + "scheme_name": scheme_name, + "id": link_id, + "flow": data.get("flow"), + "friction": data.get("friction"), + "headloss": data.get("headloss"), + "quality": data.get("quality"), + "reaction": data.get("reaction"), + "setting": data.get("setting"), + "status": data.get("status"), + "velocity": data.get("velocity"), + } + ) # Insert data using batch methods if node_data: @@ -371,7 +424,8 @@ class SchemeRepository: @staticmethod async def query_all_record_by_scheme_time_property( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, query_time: str, type: str, property: str, @@ -381,7 +435,8 @@ class SchemeRepository: Args: conn: Database connection - scheme: Scheme name + scheme_type: Scheme type + scheme_name: Scheme name query_time: Time to query (ISO format string) type: Type of data ("node" or "link") property: Property/field to query @@ -391,7 +446,7 @@ class SchemeRepository: """ # Convert query_time string to datetime if isinstance(query_time, str): - if query_time.endswith('Z'): + if query_time.endswith("Z"): # UTC时间,转换为UTC+8 utc_time = datetime.fromisoformat(query_time.replace("Z", "+00:00")) target_time = utc_time.astimezone(UTC_8) @@ -412,11 +467,11 @@ class SchemeRepository: # Query based on type if type.lower() == "node": return await SchemeRepository.get_nodes_field_by_scheme_and_time_range( - conn, scheme, start_time, end_time, property + conn, scheme_type, scheme_name, start_time, end_time, property ) elif type.lower() == "link": return await SchemeRepository.get_links_field_by_scheme_and_time_range( - conn, scheme, start_time, end_time, property + conn, scheme_type, scheme_name, start_time, end_time, property ) else: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") @@ -424,7 +479,8 @@ class SchemeRepository: @staticmethod async def query_scheme_simulation_result_by_ID_time( conn: AsyncConnection, - scheme: str, + scheme_type: str, + scheme_name: str, ID: str, type: str, query_time: str, @@ -434,7 +490,8 @@ class SchemeRepository: Args: conn: Database connection - scheme: Scheme name + scheme_type: Scheme type + scheme_name: Scheme name ID: The ID of the node or link type: Type of data ("node" or "link") query_time: Time to query (ISO format string) @@ -444,7 +501,7 @@ class SchemeRepository: """ # Convert query_time string to datetime if isinstance(query_time, str): - if query_time.endswith('Z'): + if query_time.endswith("Z"): # UTC时间,转换为UTC+8 utc_time = datetime.fromisoformat(query_time.replace("Z", "+00:00")) target_time = utc_time.astimezone(UTC_8) @@ -465,11 +522,11 @@ class SchemeRepository: # Query based on type if type.lower() == "node": return await SchemeRepository.get_node_by_scheme_and_time_range( - conn, scheme, start_time, end_time, ID + conn, scheme_type, scheme_name, start_time, end_time, ID ) elif type.lower() == "link": return await SchemeRepository.get_link_by_scheme_and_time_range( - conn, scheme, start_time, end_time, ID + conn, scheme_type, scheme_name, start_time, end_time, ID ) else: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")