diff --git a/postgresql/database.py b/postgresql/database.py index fae2904..d6e2b67 100644 --- a/postgresql/database.py +++ b/postgresql/database.py @@ -3,7 +3,7 @@ from contextlib import asynccontextmanager from typing import AsyncGenerator, Dict, Optional import psycopg_pool from psycopg.rows import dict_row -import postgresql_info +import api.postgresql_info as postgresql_info # Configure logging logger = logging.getLogger(__name__) @@ -27,7 +27,9 @@ class Database: open=False, # Don't open immediately, wait for startup kwargs={"row_factory": dict_row}, # Return rows as dictionaries ) - logger.info(f"PostgreSQL connection pool initialized for database: {target_db_name or 'default'}") + logger.info( + f"PostgreSQL connection pool initialized for database: {target_db_name or 'default'}" + ) except Exception as e: logger.error(f"Failed to initialize postgresql connection pool: {e}") raise @@ -58,15 +60,17 @@ db = Database() # 缓存不同数据库的实例 - 避免重复创建连接池 _database_instances: Dict[str, Database] = {} + def create_database_instance(db_name): """Create a new Database instance for a specific database.""" return Database(db_name=db_name) + async def get_database_instance(db_name: Optional[str] = None) -> Database: """Get or create a database instance for the specified database name.""" if not db_name: return db # 返回默认数据库实例 - + if db_name not in _database_instances: # 创建新的数据库实例 instance = create_database_instance(db_name) @@ -74,14 +78,16 @@ async def get_database_instance(db_name: Optional[str] = None) -> Database: await instance.open() _database_instances[db_name] = instance logger.info(f"Created new database instance for: {db_name}") - + return _database_instances[db_name] + async def get_db_connection(): """Dependency for FastAPI to get a database connection.""" async with db.get_connection() as conn: yield conn + async def get_database_connection(db_name: Optional[str] = None): """ FastAPI dependency to get database connection with optional database name. @@ -92,13 +98,14 @@ async def get_database_connection(db_name: Optional[str] = None): async with instance.get_connection() as conn: yield conn + async def cleanup_database_instances(): """Clean up all database instances (call this on application shutdown).""" for db_name, instance in _database_instances.items(): await instance.close() logger.info(f"Closed database instance for: {db_name}") _database_instances.clear() - + # 关闭默认数据库 await db.close() logger.info("All database instances cleaned up.") diff --git a/postgresql/router.py b/postgresql/router.py index 1c6ed97..1b36ba7 100644 --- a/postgresql/router.py +++ b/postgresql/router.py @@ -1,10 +1,9 @@ from fastapi import APIRouter, Depends, HTTPException, Query -from typing import List, Optional -from datetime import datetime +from typing import Optional from psycopg import AsyncConnection from .database import get_database_instance -from .scada_info import query_pg_scada_info +from .scada_info import ScadaRepository router = APIRouter(prefix="/postgresql", tags=["postgresql"]) @@ -26,15 +25,13 @@ async def get_scada_info_with_connection( conn: AsyncConnection = Depends(get_database_connection), ): """ - 使用连接池查询SCADA信息 + 使用连接池查询所有SCADA信息 """ try: - # 使用连接查询SCADA信息 - async with conn.cursor() as cur: - await cur.execute("SELECT * FROM scada_info") - scada_data = await cur.fetchall() + # 使用ScadaRepository查询SCADA信息 + scada_data = await ScadaRepository.get_scadas_info(conn) return {"success": True, "data": scada_data, "count": len(scada_data)} except Exception as e: raise HTTPException( status_code=500, detail=f"查询SCADA信息时发生错误: {str(e)}" - ) \ No newline at end of file + ) diff --git a/simulation.py b/simulation.py index 089b20f..2c14d4a 100644 --- a/simulation.py +++ b/simulation.py @@ -1,8 +1,9 @@ import numpy as np from tjnetwork import * from api.s36_wda_cal import * + # from get_real_status import * -from datetime import datetime,timedelta +from datetime import datetime, timedelta from math import modf import os import json @@ -21,7 +22,9 @@ import uuid import project_info from api.postgresql_info import get_pgconn_string -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) def query_corresponding_element_id_and_query_id(name: str) -> None: @@ -36,28 +39,30 @@ def query_corresponding_element_id_and_query_id(name: str) -> None: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 查询 transmission_mode 为 'realtime' 的记录 - cur.execute(""" + cur.execute( + """ SELECT type, associated_element_id, api_query_id FROM scada_info WHERE transmission_mode = 'realtime'; - """) + """ + ) records = cur.fetchall() # 遍历查询结果,并根据 type 将数据存储到相应的字典中 for record in records: type_, associated_element_id, api_query_id = record - if type_ == 'reservoir_liquid_level': + if type_ == "reservoir_liquid_level": globals.reservoirs_id[associated_element_id] = api_query_id - elif type_ == 'tank_liquid_level': + elif type_ == "tank_liquid_level": globals.tanks_id[associated_element_id] = api_query_id - elif type_ == 'fixed_pump': + elif type_ == "fixed_pump": globals.fixed_pumps_id[associated_element_id] = api_query_id - elif type_ == 'variable_pump': + elif type_ == "variable_pump": globals.variable_pumps_id[associated_element_id] = api_query_id - elif type_ == 'pressure': + elif type_ == "pressure": globals.pressure_id[associated_element_id] = api_query_id - elif type_ == 'demand': + elif type_ == "demand": globals.demand_id[associated_element_id] = api_query_id - elif type_ == 'quality': + elif type_ == "quality": globals.quality_id[associated_element_id] = api_query_id else: # 如果遇到未定义的类型,可以选择记录日志或忽略 @@ -79,20 +84,26 @@ def query_corresponding_pattern_id_and_query_id(name: str) -> None: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 查询 transmission_mode 为 'realtime' 且 type 为 'source_outflow' 或 'pipe_flow' 的记录 - cur.execute(""" + cur.execute( + """ SELECT type, associated_pattern, api_query_id FROM scada_info WHERE transmission_mode = 'realtime' AND type IN ('source_outflow', 'pipe_flow'); - """) + """ + ) records = cur.fetchall() # 遍历查询结果,并根据 type 将数据存储到相应的字典中 for record in records: type_, associated_pattern, api_query_id = record - if type_ == 'source_outflow': - globals.source_outflow_pattern_id[associated_pattern] = api_query_id - elif type_ == 'pipe_flow': - globals.realtime_pipe_flow_pattern_id[associated_pattern] = api_query_id + if type_ == "source_outflow": + globals.source_outflow_pattern_id[associated_pattern] = ( + api_query_id + ) + elif type_ == "pipe_flow": + globals.realtime_pipe_flow_pattern_id[associated_pattern] = ( + api_query_id + ) except psycopg.Error as e: print(f"数据库连接或查询出错: {e}") @@ -114,21 +125,32 @@ def query_non_realtime_region(name: str) -> dict: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 执行查询,筛选出 transmission_mode 为 'non_realtime' 且 type 为 'pipe_flow' 的记录 - cur.execute(""" + cur.execute( + """ SELECT * FROM scada_info WHERE transmission_mode = 'non_realtime' AND type = 'pipe_flow'; - """) + """ + ) records = cur.fetchall() col_names = [desc.name for desc in cur.description] # 找出所有以 'associated_source_outflow_id' 开头的列 - source_outflow_cols = [col for col in col_names if col.startswith('associated_source_outflow_id')] - logging.info(f"Identified source_outflow columns: {source_outflow_cols}") + source_outflow_cols = [ + col + for col in col_names + if col.startswith("associated_source_outflow_id") + ] + logging.info( + f"Identified source_outflow columns: {source_outflow_cols}" + ) for record in records: # 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None - values = [record[col_names.index(col)] for col in source_outflow_cols if - record[col_names.index(col)] is not None] + values = [ + record[col_names.index(col)] + for col in source_outflow_cols + if record[col_names.index(col)] is not None + ] # 如果该记录有相关的值,则将其作为一个 region if values: # 将值排序以确保相同的组合顺序一致(如果顺序不重要) @@ -155,7 +177,11 @@ def query_non_realtime_region(name: str) -> dict: # 2025/01/18 -def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, column_prefix: str = 'associated_source_outflow_id') -> dict: +def query_non_realtime_region_patterns( + name: str, + source_outflow_region: dict, + column_prefix: str = "associated_source_outflow_id", +) -> dict: """ 根据 source_outflow_region,对 scada_info 表中 transmission_mode 为 'non_realtime'的记录进行分组, 将匹配的记录的 associated_pattern 存入 non_realtime_region_patterns 字典中,同时把用 realtime pipe_flow修正的 non_realtime demand 去掉 @@ -164,33 +190,48 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c :param column_prefix: 需要提取的列的前缀 :return: 包含区域与对应 associated_pattern 的字典 """ - globals.non_realtime_region_patterns = {region: [] for region in globals.source_outflow_region.keys()} - region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()} + globals.non_realtime_region_patterns = { + region: [] for region in globals.source_outflow_region.keys() + } + region_tuple_to_key = { + frozenset(ids): region for region, ids in globals.source_outflow_region.items() + } conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 执行查询,筛选出 transmission_mode 为 'non_realtime' - cur.execute(""" + cur.execute( + """ SELECT * FROM scada_info WHERE transmission_mode = 'non_realtime' - """) + """ + ) records = cur.fetchall() col_names = [desc.name for desc in cur.description] # 找出所有以指定前缀开头的列 - source_outflow_cols = [col for col in col_names if col.startswith(column_prefix)] - logging.info(f"Identified source_outflow columns: {source_outflow_cols}") + source_outflow_cols = [ + col for col in col_names if col.startswith(column_prefix) + ] + logging.info( + f"Identified source_outflow columns: {source_outflow_cols}" + ) # 确保 'associated_pattern' 列存在 - if 'associated_pattern' not in col_names: - logging.error("'associated_pattern' column not found in scada_info table.") + if "associated_pattern" not in col_names: + logging.error( + "'associated_pattern' column not found in scada_info table." + ) return globals.non_realtime_region_patterns # 获取 'associated_pattern' 列的索引 - pattern_idx = col_names.index('associated_pattern') + pattern_idx = col_names.index("associated_pattern") for record in records: # 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None - values = [record[col_names.index(col)] for col in source_outflow_cols if - record[col_names.index(col)] is not None] + values = [ + record[col_names.index(col)] + for col in source_outflow_cols + if record[col_names.index(col)] is not None + ] if values: # 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配 region_frozenset = frozenset(values) @@ -200,22 +241,34 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c # 获取 'associated_pattern' 的值 associated_pattern = record[pattern_idx] if associated_pattern is not None: - globals.non_realtime_region_patterns[region_key].append(associated_pattern) + globals.non_realtime_region_patterns[region_key].append( + associated_pattern + ) logging.info("生成 regions_patterns 成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") # 获取pipe_flow_region_patterns中的所有区域 - exclude_regions = set(region for regions in globals.pipe_flow_region_patterns.values() for region in regions) + exclude_regions = set( + region + for regions in globals.pipe_flow_region_patterns.values() + for region in regions + ) # 从non_realtime_region_patterns中去除这些区域 for region_key, regions in globals.non_realtime_region_patterns.items(): - globals.non_realtime_region_patterns[region_key] = [region for region in regions if region not in exclude_regions] + globals.non_realtime_region_patterns[region_key] = [ + region for region in regions if region not in exclude_regions + ] return globals.non_realtime_region_patterns # 2025/01/18 -def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_region: dict, column_prefix: str = 'associated_source_outflow_id') -> dict: +def query_realtime_region_pipe_flow_and_demand_id( + name: str, + source_outflow_region: dict, + column_prefix: str = "associated_source_outflow_id", +) -> dict: """ 根据 source_outflow_region,对 scada_info 表中 transmission_mode 为 'realtime', 且 type 为 'pipe_flow' 或 ‘demand’ 的记录进行分组,将匹配的记录的 api_query_id 存入 realtime_region_pipe_flow_and_demand_id 字典中。 @@ -224,35 +277,50 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi :param column_prefix: 需要提取的列的前缀 :return: 包含区域与对应 api_query_id 的字典 """ - globals.realtime_region_pipe_flow_and_demand_id = {region: [] for region in globals.source_outflow_region.keys()} + globals.realtime_region_pipe_flow_and_demand_id = { + region: [] for region in globals.source_outflow_region.keys() + } # 创建一个映射,从 frozenset(ids) 到 region_key - region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()} + region_tuple_to_key = { + frozenset(ids): region for region, ids in globals.source_outflow_region.items() + } conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 执行查询,筛选出 transmission_mode 为 'realtime' 且 type 为 'pipe_flow' 或 'demand' 的记录 - cur.execute(""" + cur.execute( + """ SELECT * FROM scada_info WHERE transmission_mode = 'realtime' AND type IN ('pipe_flow', 'demand'); - """) + """ + ) records = cur.fetchall() col_names = [desc.name for desc in cur.description] # 找出所有以指定前缀开头的列 - source_outflow_cols = [col for col in col_names if col.startswith(column_prefix)] - logging.info(f"Identified source_outflow columns: {source_outflow_cols}") + source_outflow_cols = [ + col for col in col_names if col.startswith(column_prefix) + ] + logging.info( + f"Identified source_outflow columns: {source_outflow_cols}" + ) # 确保 'api_query_id' 列存在 - if 'api_query_id' not in col_names: - logging.error("'api_query_id' column not found in scada_info table.") + if "api_query_id" not in col_names: + logging.error( + "'api_query_id' column not found in scada_info table." + ) return globals.realtime_region_pipe_flow_and_demand_id # 获取 'api_query_id' 列的索引 - api_query_id_idx = col_names.index('api_query_id') + api_query_id_idx = col_names.index("api_query_id") for record in records: # 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None - values = [record[col_names.index(col)] for col in source_outflow_cols if - record[col_names.index(col)] is not None] + values = [ + record[col_names.index(col)] + for col in source_outflow_cols + if record[col_names.index(col)] is not None + ] if values: # 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配 region_frozenset = frozenset(values) @@ -262,7 +330,9 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi # 获取 'api_query_id' 的值 api_query_id = record[api_query_id_idx] if api_query_id is not None: - globals.realtime_region_pipe_flow_and_demand_id[region_key].append(api_query_id) + globals.realtime_region_pipe_flow_and_demand_id[ + region_key + ].append(api_query_id) logging.info("生成 realtime_region_pipe_flow_and_demand_id 成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") @@ -272,7 +342,9 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi # 2025/01/17 -def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_pipe_flow_id') -> dict: +def query_pipe_flow_region_patterns( + name: str, column_prefix: str = "associated_pipe_flow_id" +) -> dict: """ 查询 scada_info 表中 type 为 'demand' 且 transmission_mode 为 'non_realtime' 的记录, 记录该记录的 associated_pattern。 @@ -289,36 +361,48 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_ with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 查询 type 为 'demand' 且 transmission_mode 为 'non_realtime' 的记录 - cur.execute(""" + cur.execute( + """ SELECT associated_pattern, associated_pipe_flow_id FROM scada_info WHERE type = 'demand' AND transmission_mode = 'non_realtime'; - """) + """ + ) records = cur.fetchall() col_names = [desc.name for desc in cur.description] # 获取列索引 - pattern_idx = col_names.index('associated_pattern') - pipe_flow_id_idx = col_names.index('associated_pipe_flow_id') + pattern_idx = col_names.index("associated_pattern") + pipe_flow_id_idx = col_names.index("associated_pipe_flow_id") for record in records: associated_pattern = record[pattern_idx] associated_pipe_flow_id = record[pipe_flow_id_idx] if associated_pipe_flow_id: # 根据 associated_pipe_flow_id 查询对应的记录 - cur.execute(""" + cur.execute( + """ SELECT associated_pattern, transmission_mode FROM scada_info WHERE associated_element_id = %s; - """, (associated_pipe_flow_id,)) + """, + (associated_pipe_flow_id,), + ) pipe_flow_record = cur.fetchone() if pipe_flow_record: pipe_flow_associated_pattern = pipe_flow_record[0] transmission_mode = pipe_flow_record[1] - if transmission_mode == 'realtime': + if transmission_mode == "realtime": # 将 associated_pattern 记录到字典中 - if pipe_flow_associated_pattern not in globals.pipe_flow_region_patterns: - globals.pipe_flow_region_patterns[pipe_flow_associated_pattern] = [] - globals.pipe_flow_region_patterns[pipe_flow_associated_pattern].append(associated_pattern) + if ( + pipe_flow_associated_pattern + not in globals.pipe_flow_region_patterns + ): + globals.pipe_flow_region_patterns[ + pipe_flow_associated_pattern + ] = [] + globals.pipe_flow_region_patterns[ + pipe_flow_associated_pattern + ].append(associated_pattern) logging.info("生成 pipe_flow_region_patterns 成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") @@ -352,8 +436,8 @@ def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict: if result: # 将结果转换为字典 associated_info = { - 'associated_element_id': result[0], - 'API_query_id': result[1] + "associated_element_id": result[0], + "API_query_id": result[1], } return associated_info else: @@ -365,8 +449,11 @@ def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict: # 2025/01/11 -def get_source_outflow_region_id(name: str, source_outflow_region: dict, - column_prefix: str = 'associated_source_outflow_id') -> dict: +def get_source_outflow_region_id( + name: str, + source_outflow_region: dict, + column_prefix: str = "associated_source_outflow_id", +) -> dict: """ 基于 source_outflow_region,将其中的 associated_source_outflow_id 替换为对应的 api_query_id, 生成新的字典 source_outflow_region_id。 @@ -376,13 +463,17 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, :param column_prefix: 需要提取的列的前缀 :return: 包含区域与对应 api_query_id 的字典 """ - globals.source_outflow_region_id = {region: [] for region in globals.source_outflow_region.keys()} + globals.source_outflow_region_id = { + region: [] for region in globals.source_outflow_region.keys() + } # 提取所有唯一的 associated_source_outflow_id all_ids = set() for ids in globals.source_outflow_region.values(): all_ids.update(ids) if not all_ids: - logging.warning("No associated_source_outflow_id found in source_outflow_region.") + logging.warning( + "No associated_source_outflow_id found in source_outflow_region." + ) return globals.source_outflow_region_id conn_string = get_pgconn_string(db_name=name) try: @@ -410,7 +501,9 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, if api_id: globals.source_outflow_region_id[region].append(api_id) else: - logging.warning(f"No api_query_id found for associated_source_outflow_id: {id_}") + logging.warning( + f"No api_query_id found for associated_source_outflow_id: {id_}" + ) except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: @@ -419,7 +512,11 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, # 2025/01/18 -def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, realtime_region_pipe_flow_and_demand_id: dict) -> (dict, dict): +def get_realtime_region_patterns( + name: str, + source_outflow_region_id: dict, + realtime_region_pipe_flow_and_demand_id: dict, +) -> (dict, dict): """ 根据每个 region,从 scada_info 表中查询 api_query_id 对应的 associated_pattern。 将结果分别存储到 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 两个字典中。 @@ -429,9 +526,12 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real :return: source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 两个字典 """ # 初始化返回的字典 - globals.source_outflow_region_patterns = {region: [] for region in globals.source_outflow_region_id.keys()} - globals.realtime_region_pipe_flow_and_demand_patterns = {region: [] for region in - globals.realtime_region_pipe_flow_and_demand_id.keys()} + globals.source_outflow_region_patterns = { + region: [] for region in globals.source_outflow_region_id.keys() + } + globals.realtime_region_pipe_flow_and_demand_patterns = { + region: [] for region in globals.realtime_region_pipe_flow_and_demand_id.keys() + } conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: @@ -441,35 +541,56 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real # 获取 source_outflow_region_id 的 api_query_id 并查询 associated_pattern source_outflow_api_ids = globals.source_outflow_region_id[region] if source_outflow_api_ids: - api_query_ids_str = ", ".join([f"'{api_id}'" for api_id in source_outflow_api_ids]) - cur.execute(f""" + api_query_ids_str = ", ".join( + [f"'{api_id}'" for api_id in source_outflow_api_ids] + ) + cur.execute( + f""" SELECT api_query_id, associated_pattern FROM scada_info WHERE api_query_id IN ({api_query_ids_str}); - """) + """ + ) results = cur.fetchall() globals.source_outflow_region_patterns[region] = [ - associated_pattern for _, associated_pattern in results if associated_pattern + associated_pattern + for _, associated_pattern in results + if associated_pattern ] # 获取 realtime_region_pipe_flow_and_demand_id 的 api_query_id 并查询 associated_pattern - realtime_api_ids = globals.realtime_region_pipe_flow_and_demand_id[region] + realtime_api_ids = globals.realtime_region_pipe_flow_and_demand_id[ + region + ] if realtime_api_ids: - api_query_ids_str = ", ".join([f"'{api_id}'" for api_id in realtime_api_ids]) - cur.execute(f""" + api_query_ids_str = ", ".join( + [f"'{api_id}'" for api_id in realtime_api_ids] + ) + cur.execute( + f""" SELECT api_query_id, associated_pattern FROM scada_info WHERE api_query_id IN ({api_query_ids_str}); - """) + """ + ) results = cur.fetchall() - globals.realtime_region_pipe_flow_and_demand_patterns[region] = [ - associated_pattern for _, associated_pattern in results if associated_pattern + globals.realtime_region_pipe_flow_and_demand_patterns[ + region + ] = [ + associated_pattern + for _, associated_pattern in results + if associated_pattern ] - logging.info("生成 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 成功。") + logging.info( + "生成 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 成功。" + ) except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") - return globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns + return ( + globals.source_outflow_region_patterns, + globals.realtime_region_pipe_flow_and_demand_patterns, + ) def get_pattern_index(cur_datetime: str) -> int: @@ -498,23 +619,23 @@ def get_pattern_index_str(current_time: str) -> str: minN_str = minN_str.zfill(2) hrN_str = str(int(hrN)) hrN_str = hrN_str.zfill(2) - str_i = '{}:{}:00'.format(hrN_str, minN_str) + str_i = "{}:{}:00".format(hrN_str, minN_str) return str_i -def from_seconds_to_clock (secs: int)->str: +def from_seconds_to_clock(secs: int) -> str: """ 从秒格式化为“HH:MM:00”字符串 :param secs: int,秒 :return: str, 以“HH:MM:00”格式返回 """ - hrs=int(secs/3600) - minutes=int((secs-hrs*3600)/60) - seconds=(secs-hrs*3600-minutes*60) - hrs_str=str(hrs).zfill(2) - minutes_str=str(minutes).zfill(2) - seconds_str=str(seconds).zfill(2) - str_clock='{}:{}:{}'.format(hrs_str,minutes_str,seconds_str) + hrs = int(secs / 3600) + minutes = int((secs - hrs * 3600) / 60) + seconds = secs - hrs * 3600 - minutes * 60 + hrs_str = str(hrs).zfill(2) + minutes_str = str(minutes).zfill(2) + seconds_str = str(seconds).zfill(2) + str_clock = "{}:{}:{}".format(hrs_str, minutes_str, seconds_str) return str_clock @@ -524,8 +645,8 @@ def convert_time_format(original_time: str) -> str: :param original_time: str, “2024-04-13T08:00:00+08:00"格式的时间 :return: str,“2024-04-13 08:00:00”格式的时间 """ - new_time = original_time.replace('T', ' ') - new_time = new_time.replace('+08:00', '') + new_time = original_time.replace("T", " ") + new_time = new_time.replace("+08:00", "") return new_time @@ -533,20 +654,32 @@ def get_history_pattern_info(project_name, pattern_name): """读取选定pattern的保存的历史pattern信息flow和factor""" flow_list = [] factor_list = [] - patterns_info = read_all(project_name, - f"select * from history_patterns_flows where id = '{pattern_name}' order by _order") + patterns_info = read_all( + project_name, + f"select * from history_patterns_flows where id = '{pattern_name}' order by _order", + ) for item in patterns_info: - flow_list.append(float(item['flow'])) - factor_list.append(float(item['factor'])) + flow_list.append(float(item["flow"])) + factor_list.append(float(item["factor"])) return flow_list, factor_list # 2025/01/11 -def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: str, modify_total_duration: int = 0, - modify_reservoir_head_pattern: dict[str, list] = None, modify_tank_initial_level: dict[str, float] = None, - modify_junction_base_demand: dict[str, float] = None, modify_junction_damand_pattern: dict[str, list] = None, - modify_fixed_pump_pattern: dict[str, list] = None, modify_variable_pump_pattern: dict[str, list] = None, - modify_valve_opening: dict[str, float] = None, scheme_Type: str = None, scheme_Name: str = None) -> None: +def run_simulation( + name: str, + simulation_type: str, + modify_pattern_start_time: str, + modify_total_duration: int = 0, + modify_reservoir_head_pattern: dict[str, list] = None, + modify_tank_initial_level: dict[str, float] = None, + modify_junction_base_demand: dict[str, float] = None, + modify_junction_damand_pattern: dict[str, list] = None, + modify_fixed_pump_pattern: dict[str, list] = None, + modify_variable_pump_pattern: dict[str, list] = None, + modify_valve_opening: dict[str, float] = None, + scheme_Type: str = None, + scheme_Name: str = None, +) -> None: """ 传入需要修改的参数,改变数据库中对应位置的值,然后计算,返回结果 :param name: 模型名称,数据库中对应的名字 @@ -567,16 +700,19 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 记录开始时间 time_cost_start = time.perf_counter() - print('name', name) - print('simulation_type', simulation_type) - print('modify_pattern_start_time', modify_pattern_start_time) - print('modify_total_duration', modify_total_duration) - print('modify_reservoir_head_pattern', modify_reservoir_head_pattern) - print('modify_tank_initial_level', modify_tank_initial_level) - print('modify_junction_base_demand', modify_junction_base_demand) + print("name", name) + print("simulation_type", simulation_type) + print("modify_pattern_start_time", modify_pattern_start_time) + print("modify_total_duration", modify_total_duration) + print("modify_reservoir_head_pattern", modify_reservoir_head_pattern) + print("modify_tank_initial_level", modify_tank_initial_level) + print("modify_junction_base_demand", modify_junction_base_demand) - - print('{} -- Hydraulic simulation started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + print( + "{} -- Hydraulic simulation started.".format( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + ) + ) # 判断是实时模拟还是多步长模拟 # if simulation_type.upper() == 'REALTIME': # 实时模拟(修改原数据库) @@ -598,11 +734,13 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s print(dic_time) # 获取水力模拟步长,如’0:15:00‘ - globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP'] + globals.hydraulic_timestep = dic_time["HYDRAULIC TIMESTEP"] # 将时间字符串转换为 timedelta 对象 - time_obj = datetime.strptime(globals.hydraulic_timestep, '%H:%M:%S') + time_obj = datetime.strptime(globals.hydraulic_timestep, "%H:%M:%S") # 转换为分钟浮点数 - globals.PATTERN_TIME_STEP = float(time_obj.hour * 60 + time_obj.minute + time_obj.second / 60) + globals.PATTERN_TIME_STEP = float( + time_obj.hour * 60 + time_obj.minute + time_obj.second / 60 + ) # 对输入的时间参数进行处理 pattern_start_time = convert_time_format(modify_pattern_start_time) # 获取模拟开始时间是对应pattern的第几个数 @@ -620,12 +758,14 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # cs.append(pump_pattern) # set_pattern(name_c, cs) # 修改模拟开始的时间 - str_pattern_start = get_pattern_index_str(convert_time_format(modify_pattern_start_time)) + str_pattern_start = get_pattern_index_str( + convert_time_format(modify_pattern_start_time) + ) dic_time = get_time(name_c) - dic_time['PATTERN START'] = str_pattern_start - dic_time['DURATION'] = from_seconds_to_clock(modify_total_duration) - if simulation_type.upper() == 'REALTIME': - dic_time['DURATION'] = 0 + dic_time["PATTERN START"] = str_pattern_start + dic_time["DURATION"] = from_seconds_to_clock(modify_total_duration) + if simulation_type.upper() == "REALTIME": + dic_time["DURATION"] = 0 cs = ChangeSet() cs.operations.append(dic_time) set_time(name_c, cs) @@ -634,70 +774,110 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'} # 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'} reservoir_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.reservoirs_id.values()), query_time=modify_pattern_start_time) + query_ids_list=list(globals.reservoirs_id.values()), + query_time=modify_pattern_start_time, + ) # 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'} - reservoir_dict = {key: reservoir_SCADA_data_dict[value] for key, value in globals.reservoirs_id.items()} + reservoir_dict = { + key: reservoir_SCADA_data_dict[value] + for key, value in globals.reservoirs_id.items() + } # 3.修改reservoir液位模式 for reservoir_name, value in reservoir_dict.items(): if value and float(value) != 0: # 先根据reservoir获取对应的pattern,再对pattern进行修改 - reservoir_pattern = get_pattern(name_c, get_reservoir(name_c, reservoir_name)['pattern']) - reservoir_pattern['factors'][modify_index] = float(value) + globals.RESERVOIR_BASIC_HEIGHT + reservoir_pattern = get_pattern( + name_c, get_reservoir(name_c, reservoir_name)["pattern"] + ) + reservoir_pattern["factors"][modify_index] = ( + float(value) + globals.RESERVOIR_BASIC_HEIGHT + ) cs = ChangeSet() cs.append(reservoir_pattern) set_pattern(name_c, cs) if globals.tanks_id: # 修改tank初始液位 tank_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.tanks_id.values()), query_time=modify_pattern_start_time) - tank_dict = {key: tank_SCADA_data_dict[value] for key, value in globals.tanks_id.items()} + query_ids_list=list(globals.tanks_id.values()), + query_time=modify_pattern_start_time, + ) + tank_dict = { + key: tank_SCADA_data_dict[value] for key, value in globals.tanks_id.items() + } for tank_name, value in tank_dict.items(): if value and float(value) != 0: tank = get_tank(name_c, tank_name) - tank['init_level'] = float(value) + tank["init_level"] = float(value) cs = ChangeSet() cs.append(tank) set_tank(name_c, cs) if globals.fixed_pumps_id: # 修改工频泵的pattern - fixed_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.fixed_pumps_id.values()), query_time=modify_pattern_start_time) + fixed_pump_SCADA_data_dict = ( + influxdb_api.query_SCADA_data_by_device_ID_and_time( + query_ids_list=list(globals.fixed_pumps_id.values()), + query_time=modify_pattern_start_time, + ) + ) # print(fixed_pump_SCADA_data_dict) - fixed_pump_dict = {key: fixed_pump_SCADA_data_dict[value] for key, value in globals.fixed_pumps_id.items()} + fixed_pump_dict = { + key: fixed_pump_SCADA_data_dict[value] + for key, value in globals.fixed_pumps_id.items() + } # print(fixed_pump_dict) for fixed_pump_name, value in fixed_pump_dict.items(): if value: - pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) + pump_pattern = get_pattern( + name_c, get_pump(name_c, fixed_pump_name)["pattern"] + ) # print(pump_pattern) - pump_pattern['factors'][modify_index] = float(value) + pump_pattern["factors"][modify_index] = float(value) # print(pump_pattern['factors'][modify_index]) cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) if globals.variable_pumps_id: # 修改变频泵的pattern - variable_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time) - variable_pump_dict = {key: variable_pump_SCADA_data_dict[value] for key, value in globals.variable_pumps_id.items()} + variable_pump_SCADA_data_dict = ( + influxdb_api.query_SCADA_data_by_device_ID_and_time( + query_ids_list=list(globals.variable_pumps_id.values()), + query_time=modify_pattern_start_time, + ) + ) + variable_pump_dict = { + key: variable_pump_SCADA_data_dict[value] + for key, value in globals.variable_pumps_id.items() + } for variable_pump_name, value in variable_pump_dict.items(): if value: - pump_pattern = get_pattern(name_c, get_pump(name_c, variable_pump_name)['pattern']) - pump_pattern['factors'][modify_index] = float(value) / 50 + pump_pattern = get_pattern( + name_c, get_pump(name_c, variable_pump_name)["pattern"] + ) + pump_pattern["factors"][modify_index] = float(value) / 50 cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) if globals.demand_id: # 基于实时数据,修改大用户节点的pattern demand_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.demand_id.values()), query_time=modify_pattern_start_time) - demand_dict = {key: demand_SCADA_data_dict[value] for key, value in globals.demand_id.items()} + query_ids_list=list(globals.demand_id.values()), + query_time=modify_pattern_start_time, + ) + demand_dict = { + key: demand_SCADA_data_dict[value] + for key, value in globals.demand_id.items() + } for demand_name, value in demand_dict.items(): if value: - demand_pattern = get_pattern(name_c, get_demand(name_c, demand_name)['pattern']) - if get_option(name_c)['UNITS'] == 'LPS': - demand_pattern['factors'][modify_index] = float(value) / 3.6 # 默认SCADA数据获取的是流量单位是m3/h, 转换为 L/s - elif get_option(name_c)['UNITS'] == 'CMH': - demand_pattern['factors'][modify_index] = float(value) + demand_pattern = get_pattern( + name_c, get_demand(name_c, demand_name)["pattern"] + ) + if get_option(name_c)["UNITS"] == "LPS": + demand_pattern["factors"][modify_index] = ( + float(value) / 3.6 + ) # 默认SCADA数据获取的是流量单位是m3/h, 转换为 L/s + elif get_option(name_c)["UNITS"] == "CMH": + demand_pattern["factors"][modify_index] = float(value) cs = ChangeSet() cs.append(demand_pattern) set_pattern(name_c, cs) @@ -705,16 +885,27 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s ############################# if globals.source_outflow_pattern_id: # 基于实时的出厂流量计数据,修改出厂流量计绑定的pattern - source_outflow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time) + source_outflow_SCADA_data_dict = ( + influxdb_api.query_SCADA_data_by_device_ID_and_time( + query_ids_list=list(globals.source_outflow_pattern_id.values()), + query_time=modify_pattern_start_time, + ) + ) # print(source_outflow_SCADA_data_dict) - source_outflow_dict = {key: source_outflow_SCADA_data_dict[value] for key, value in globals.source_outflow_pattern_id.items()} + source_outflow_dict = { + key: source_outflow_SCADA_data_dict[value] + for key, value in globals.source_outflow_pattern_id.items() + } # print(source_outflow_dict) for pattern_name in source_outflow_dict.keys(): # print(pattern_name) - history_source_outflow_flow_list, history_source_outflow_factor_list = get_history_pattern_info(name_c, pattern_name) + history_source_outflow_flow_list, history_source_outflow_factor_list = ( + get_history_pattern_info(name_c, pattern_name) + ) history_source_outflow_flow = history_source_outflow_flow_list[modify_index] - history_source_outflow_factor = history_source_outflow_factor_list[modify_index] + history_source_outflow_factor = history_source_outflow_factor_list[ + modify_index + ] # print(source_outflow_dict[pattern_name]) # print(history_source_outflow_flow) # print(history_source_outflow_factor) @@ -723,25 +914,38 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s multiply_factor = realtime_source_outflow / history_source_outflow_flow # print(multiply_factor) pattern = get_pattern(name_c, pattern_name) - pattern['factors'][modify_index] = multiply_factor * history_source_outflow_factor + pattern["factors"][modify_index] = ( + multiply_factor * history_source_outflow_factor + ) # print(pattern['factors'][modify_index]) cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) if globals.realtime_pipe_flow_pattern_id: # 基于实时的pipe_flow类数据,修改pipe_flow类绑定的pattern - realtime_pipe_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(globals.realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time) - realtime_pipe_flow_dict = {key: realtime_pipe_flow_SCADA_data_dict[value] for key, value in globals.realtime_pipe_flow_pattern_id.items()} + realtime_pipe_flow_SCADA_data_dict = ( + influxdb_api.query_SCADA_data_by_device_ID_and_time( + query_ids_list=list(globals.realtime_pipe_flow_pattern_id.values()), + query_time=modify_pattern_start_time, + ) + ) + realtime_pipe_flow_dict = { + key: realtime_pipe_flow_SCADA_data_dict[value] + for key, value in globals.realtime_pipe_flow_pattern_id.items() + } for pattern_name in realtime_pipe_flow_dict.keys(): - history_pipe_flow_flow_list, history_pipe_flow_factor_list = get_history_pattern_info(name_c, pattern_name) + history_pipe_flow_flow_list, history_pipe_flow_factor_list = ( + get_history_pattern_info(name_c, pattern_name) + ) history_pipe_flow_flow = history_pipe_flow_flow_list[modify_index] history_pipe_flow_factor = history_pipe_flow_factor_list[modify_index] if realtime_pipe_flow_dict[pattern_name]: realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name]) multiply_factor = realtime_pipe_flow / history_pipe_flow_flow pattern = get_pattern(name_c, pattern_name) - pattern['factors'][modify_index] = multiply_factor * history_pipe_flow_factor + pattern["factors"][modify_index] = ( + multiply_factor * history_pipe_flow_factor + ) cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) @@ -749,62 +953,145 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 基于实时的pipe_flow类数据,修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern temp_realtime_pipe_flow_pattern_id = {} # 遍历 pipe_flow_region_patterns 字典的 key - for pipe_flow_region, demand_patterns in globals.pipe_flow_region_patterns.items(): + for ( + pipe_flow_region, + demand_patterns, + ) in globals.pipe_flow_region_patterns.items(): # 获取对应的实时值 query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region) temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id - temp_realtime_pipe_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=list(temp_realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time) - temp_realtime_pipe_flow_dict = {key: temp_realtime_pipe_flow_SCADA_data_dict[value] for key, value in temp_realtime_pipe_flow_pattern_id.items()} + temp_realtime_pipe_flow_SCADA_data_dict = ( + influxdb_api.query_SCADA_data_by_device_ID_and_time( + query_ids_list=list(temp_realtime_pipe_flow_pattern_id.values()), + query_time=modify_pattern_start_time, + ) + ) + temp_realtime_pipe_flow_dict = { + key: temp_realtime_pipe_flow_SCADA_data_dict[value] + for key, value in temp_realtime_pipe_flow_pattern_id.items() + } for pattern_name in temp_realtime_pipe_flow_dict.keys(): - temp_history_pipe_flow_flow_list, temp_history_pipe_flow_factor_list = get_history_pattern_info(name_c, pattern_name) + temp_history_pipe_flow_flow_list, temp_history_pipe_flow_factor_list = ( + get_history_pattern_info(name_c, pattern_name) + ) temp_history_pipe_flow_flow = temp_history_pipe_flow_flow_list[modify_index] if temp_realtime_pipe_flow_dict[pattern_name]: - temp_realtime_pipe_flow = float(temp_realtime_pipe_flow_dict[pattern_name]) - temp_multiply_factor = temp_realtime_pipe_flow / temp_history_pipe_flow_flow - temp_non_realtime_demand_pattern_list = globals.pipe_flow_region_patterns[pattern_name] + temp_realtime_pipe_flow = float( + temp_realtime_pipe_flow_dict[pattern_name] + ) + temp_multiply_factor = ( + temp_realtime_pipe_flow / temp_history_pipe_flow_flow + ) + temp_non_realtime_demand_pattern_list = ( + globals.pipe_flow_region_patterns[pattern_name] + ) for demand_pattern_name in temp_non_realtime_demand_pattern_list: - history_non_realtime_demand_flow_list, history_non_realtime_demand_factor_list = get_history_pattern_info(name_c, demand_pattern_name) - history_non_realtime_demand_factor = history_non_realtime_demand_factor_list[modify_index] + ( + history_non_realtime_demand_flow_list, + history_non_realtime_demand_factor_list, + ) = get_history_pattern_info(name_c, demand_pattern_name) + history_non_realtime_demand_factor = ( + history_non_realtime_demand_factor_list[modify_index] + ) pattern = get_pattern(name_c, demand_pattern_name) - pattern['factors'][modify_index] = temp_multiply_factor * history_non_realtime_demand_factor + pattern["factors"][modify_index] = ( + temp_multiply_factor * history_non_realtime_demand_factor + ) cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) if globals.source_outflow_region: # 根据associated_source_outflow_id进行分区,各分区用(出厂的流量计 - 实时的pipe_flow和demand)进行数据更新 for region in globals.source_outflow_region.keys(): - temp_source_outflow_region_id = globals.source_outflow_region_id.get(region, []) - temp_realtime_region_pipe_flow_and_demand_id = globals.realtime_region_pipe_flow_and_demand_id.get(region, []) - temp_source_outflow_region_patterns = globals.source_outflow_region_patterns.get(region, []) - temp_realtime_region_pipe_flow_and_demand_patterns = globals.realtime_region_pipe_flow_and_demand_patterns.get(region, []) - temp_non_realtime_region_patterns = globals.non_realtime_region_patterns.get(region, []) - region_source_outflow_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=temp_source_outflow_region_id, query_time=modify_pattern_start_time) - region_realtime_region_pipe_flow_and_demand_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - query_ids_list=temp_realtime_region_pipe_flow_and_demand_id, query_time=modify_pattern_start_time) + temp_source_outflow_region_id = globals.source_outflow_region_id.get( + region, [] + ) + temp_realtime_region_pipe_flow_and_demand_id = ( + globals.realtime_region_pipe_flow_and_demand_id.get(region, []) + ) + temp_source_outflow_region_patterns = ( + globals.source_outflow_region_patterns.get(region, []) + ) + temp_realtime_region_pipe_flow_and_demand_patterns = ( + globals.realtime_region_pipe_flow_and_demand_patterns.get(region, []) + ) + temp_non_realtime_region_patterns = ( + globals.non_realtime_region_patterns.get(region, []) + ) + region_source_outflow_data_dict = ( + influxdb_api.query_SCADA_data_by_device_ID_and_time( + query_ids_list=temp_source_outflow_region_id, + query_time=modify_pattern_start_time, + ) + ) + region_realtime_region_pipe_flow_and_demand_data_dict = ( + influxdb_api.query_SCADA_data_by_device_ID_and_time( + query_ids_list=temp_realtime_region_pipe_flow_and_demand_id, + query_time=modify_pattern_start_time, + ) + ) # 2025/02/12 确保 region_source_outflow_data_dict 和 # region_realtime_region_pipe_flow_and_demand_data_dict中的每个值都不是 None 且不为 0 - region_source_outflow_valid_values = [float(value) for value in region_source_outflow_data_dict.values() if value not in [None, 0]] - valid_values = [float(value) for value in region_realtime_region_pipe_flow_and_demand_data_dict.values() if value not in [None, 0]] + region_source_outflow_valid_values = [ + float(value) + for value in region_source_outflow_data_dict.values() + if value not in [None, 0] + ] + valid_values = [ + float(value) + for value in region_realtime_region_pipe_flow_and_demand_data_dict.values() + if value not in [None, 0] + ] # 如果都非空,则执行 sum 操作 if region_source_outflow_valid_values and valid_values: region_total_source_outflow = sum(region_source_outflow_valid_values) history_region_total_source_outflow = 0 for source_outflow_pattern_name in temp_source_outflow_region_patterns: - temp_history_source_outflow_flow_list, temp_history_source_outflow_factor_list = get_history_pattern_info(name_c, source_outflow_pattern_name) - history_region_total_source_outflow += temp_history_source_outflow_flow_list[modify_index] + ( + temp_history_source_outflow_flow_list, + temp_history_source_outflow_factor_list, + ) = get_history_pattern_info(name_c, source_outflow_pattern_name) + history_region_total_source_outflow += ( + temp_history_source_outflow_flow_list[modify_index] + ) region_total_realtime_region_pipe_flow_and_demand = sum(valid_values) history_region_total_realtime_region_pipe_flow_and_demand = 0 - for pipe_flow_and_demand_pattern_name in temp_realtime_region_pipe_flow_and_demand_patterns: - temp_history_pipe_flow_and_demand_flow_list, temp_history_pipe_flow_and_demand_factor_list = get_history_pattern_info(name_c, pipe_flow_and_demand_pattern_name) - history_region_total_realtime_region_pipe_flow_and_demand += temp_history_pipe_flow_and_demand_flow_list[modify_index] - temp_multiply_factor = (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand) / (history_region_total_source_outflow - history_region_total_realtime_region_pipe_flow_and_demand) - for non_realtime_region_pattern_name in temp_non_realtime_region_patterns: - history_non_realtime_region_pattern_flow_list, history_non_realtime_region_pattern_factor_list = get_history_pattern_info(name_c, non_realtime_region_pattern_name) - history_non_realtime_region_pattern_factor = history_non_realtime_region_pattern_factor_list[modify_index] + for ( + pipe_flow_and_demand_pattern_name + ) in temp_realtime_region_pipe_flow_and_demand_patterns: + ( + temp_history_pipe_flow_and_demand_flow_list, + temp_history_pipe_flow_and_demand_factor_list, + ) = get_history_pattern_info( + name_c, pipe_flow_and_demand_pattern_name + ) + history_region_total_realtime_region_pipe_flow_and_demand += ( + temp_history_pipe_flow_and_demand_flow_list[modify_index] + ) + temp_multiply_factor = ( + region_total_source_outflow + - region_total_realtime_region_pipe_flow_and_demand + ) / ( + history_region_total_source_outflow + - history_region_total_realtime_region_pipe_flow_and_demand + ) + for ( + non_realtime_region_pattern_name + ) in temp_non_realtime_region_patterns: + ( + history_non_realtime_region_pattern_flow_list, + history_non_realtime_region_pattern_factor_list, + ) = get_history_pattern_info( + name_c, non_realtime_region_pattern_name + ) + history_non_realtime_region_pattern_factor = ( + history_non_realtime_region_pattern_factor_list[modify_index] + ) pattern = get_pattern(name_c, non_realtime_region_pattern_name) - pattern['factors'][modify_index] = temp_multiply_factor * history_non_realtime_region_pattern_factor + pattern["factors"][modify_index] = ( + temp_multiply_factor + * history_non_realtime_region_pattern_factor + ) cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) @@ -816,20 +1103,27 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 如果modify_reservoir_head_pattern[reservoir_name][0]不是NaN,则条件成立,代码块会执行 if not np.isnan(modify_reservoir_head_pattern[reservoir_name][0]): # 给 list 中的所有元素加上 RESERVOIR_BASIC_HEIGHT - modified_values = [value + globals.RESERVOIR_BASIC_HEIGHT for value in - modify_reservoir_head_pattern[reservoir_name]] - reservoir_pattern = get_pattern(name_c, get_reservoir(name_c, reservoir_name)['pattern']) - reservoir_pattern['factors'][modify_index - :modify_index + len(modified_values)] = modified_values + modified_values = [ + value + globals.RESERVOIR_BASIC_HEIGHT + for value in modify_reservoir_head_pattern[reservoir_name] + ] + reservoir_pattern = get_pattern( + name_c, get_reservoir(name_c, reservoir_name)["pattern"] + ) + reservoir_pattern["factors"][ + modify_index : modify_index + len(modified_values) + ] = modified_values cs = ChangeSet() cs.append(reservoir_pattern) set_pattern(name_c, cs) # 修改调节池(tank)初始液位 if modify_tank_initial_level: for tank_name in modify_tank_initial_level.keys(): - if (not np.isnan(modify_tank_initial_level[tank_name])) and (modify_tank_initial_level[tank_name] != 0): + if (not np.isnan(modify_tank_initial_level[tank_name])) and ( + modify_tank_initial_level[tank_name] != 0 + ): tank = get_tank(name_c, tank_name) - tank['init_level'] = modify_tank_initial_level[tank_name] + tank["init_level"] = modify_tank_initial_level[tank_name] cs = ChangeSet() cs.append(tank) set_tank(name_c, cs) @@ -838,7 +1132,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s for junction_name in modify_junction_base_demand.keys(): if not np.isnan(modify_junction_base_demand[junction_name]): junction = get_demand(name_c, junction_name) - junction['demand'] = modify_junction_base_demand[junction_name] + junction["demand"] = modify_junction_base_demand[junction_name] cs = ChangeSet() cs.append(junction) set_demand(name_c, cs) @@ -847,9 +1141,10 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s for pattern_name in modify_junction_damand_pattern.keys(): if not np.isnan(modify_junction_damand_pattern[pattern_name][0]): junction_pattern = get_pattern(name_c, pattern_name) - junction_pattern['factors'][modify_index - :modify_index + len(modify_junction_damand_pattern[pattern_name])] \ - = modify_junction_damand_pattern[pattern_name] + junction_pattern["factors"][ + modify_index : modify_index + + len(modify_junction_damand_pattern[pattern_name]) + ] = modify_junction_damand_pattern[pattern_name] cs = ChangeSet() cs.append(junction_pattern) set_pattern(name_c, cs) @@ -857,10 +1152,12 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s if modify_fixed_pump_pattern: for pump_name in modify_fixed_pump_pattern.keys(): if not np.isnan(modify_fixed_pump_pattern[pump_name][0]): - pump_pattern = get_pattern(name_c, get_pump(name_c, pattern_name)['pattern']) - pump_pattern['factors'][modify_index - :modify_index + len(modify_fixed_pump_pattern)] \ - = modify_fixed_pump_pattern[pump_name] + pump_pattern = get_pattern( + name_c, get_pump(name_c, pattern_name)["pattern"] + ) + pump_pattern["factors"][ + modify_index : modify_index + len(modify_fixed_pump_pattern) + ] = modify_fixed_pump_pattern[pump_name] cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) @@ -869,10 +1166,15 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s for pump_name in modify_variable_pump_pattern.keys(): if not np.isnan(modify_variable_pump_pattern[pump_name][0]): # 给 list 中的所有元素除以 50Hz - modified_values = [value / 50 for value in modify_variable_pump_pattern[pump_name]] - pump_pattern = get_pattern(name_c, get_pump(name_c, pattern_name)['pattern']) - pump_pattern['factors'][modify_index - :modify_index + len(modified_values)] = modified_values + modified_values = [ + value / 50 for value in modify_variable_pump_pattern[pump_name] + ] + pump_pattern = get_pattern( + name_c, get_pump(name_c, pattern_name)["pattern"] + ) + pump_pattern["factors"][ + modify_index : modify_index + len(modified_values) + ] = modified_values cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) @@ -882,49 +1184,65 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s if not np.isnan(modify_valve_opening[valve_name]): valve_status = get_status(name_c, valve_name) if modify_valve_opening[valve_name] == 0: - valve_status['status'] = 'CLOSED' - valve_status['setting'] = 0 + valve_status["status"] = "CLOSED" + valve_status["setting"] = 0 if modify_valve_opening[valve_name] < 1: - valve_status['status'] = 'OPEN' - valve_status['setting'] = 0.1036 * pow(modify_valve_opening[valve_name], -3.105) + valve_status["status"] = "OPEN" + valve_status["setting"] = 0.1036 * pow( + modify_valve_opening[valve_name], -3.105 + ) if modify_valve_opening[valve_name] == 1: - valve_status['status'] = 'OPEN' - valve_status['setting'] = 0 + valve_status["status"] = "OPEN" + valve_status["setting"] = 0 cs = ChangeSet() cs.append(valve_status) set_status(name_c, cs) - # 运行并返回结果 + # 运行并返回结果 result = run_project(name_c) time_cost_end = time.perf_counter() - print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) + print( + "{} -- Hydraulic simulation finished, cost time: {:.2f} s.".format( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"), + time_cost_end - time_cost_start, + ) + ) # DingZQ 下面这几句一定要这样,不然读取不了 time.sleep(5) # wait 5 seconds # TODO: 2025/03/24 # DingZQ 这个名字要用随机数来处理 - tmp_file = f'./temp/simulation_{uuid.uuid4()}.result.out' - shutil.copy(f'./temp/{name_c}.db.opt', tmp_file) + tmp_file = f"./temp/simulation_{uuid.uuid4()}.result.out" + shutil.copy(f"./temp/{name_c}.db.opt", tmp_file) - output = Output(tmp_file) node_result = output.node_results() link_result = output.link_results() - + # link_flow = [] # for link in link_result: # link_flow.append(link['result'][-1]['flow']) # print(link_flow) - num_periods_result = output.times()['num_periods'] + num_periods_result = output.times()["num_periods"] print("simulation_type", simulation_type) print("before store result") # print(num_periods_result) # print(node_result) # 存储 - if simulation_type.upper() == 'REALTIME': - influxdb_api.store_realtime_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time) - elif simulation_type.upper() == 'EXTENDED': - influxdb_api.store_scheme_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time, num_periods_result, scheme_Type, scheme_Name) - influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) + if simulation_type.upper() == "REALTIME": + influxdb_api.store_realtime_simulation_result_to_influxdb( + node_result, link_result, modify_pattern_start_time + ) + elif simulation_type.upper() == "EXTENDED": + influxdb_api.store_scheme_simulation_result_to_influxdb( + node_result, + link_result, + modify_pattern_start_time, + num_periods_result, + scheme_Type, + scheme_Name, + ) + # 暂不需要再次存储 SCADA 模拟信息 + # influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) print("after store result") @@ -938,12 +1256,27 @@ if __name__ == "__main__": query_corresponding_pattern_id_and_query_id(project_info.name) region_result = query_non_realtime_region(project_info.name) - globals.source_outflow_region_id = get_source_outflow_region_id(project_info.name, region_result) - globals.realtime_region_pipe_flow_and_demand_id = query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result) - globals.pipe_flow_region_patterns = query_pipe_flow_region_patterns(project_info.name) + globals.source_outflow_region_id = get_source_outflow_region_id( + project_info.name, region_result + ) + globals.realtime_region_pipe_flow_and_demand_id = ( + query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result) + ) + globals.pipe_flow_region_patterns = query_pipe_flow_region_patterns( + project_info.name + ) - globals.non_realtime_region_patterns = query_non_realtime_region_patterns(project_info.name, region_result) - globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns = get_realtime_region_patterns(project_info.name, globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id) + globals.non_realtime_region_patterns = query_non_realtime_region_patterns( + project_info.name, region_result + ) + ( + globals.source_outflow_region_patterns, + globals.realtime_region_pipe_flow_and_demand_patterns, + ) = get_realtime_region_patterns( + project_info.name, + globals.source_outflow_region_id, + globals.realtime_region_pipe_flow_and_demand_id, + ) # 基础日期和时间(日期部分保持不变) base_date = datetime(2025, 5, 4) @@ -963,7 +1296,7 @@ if __name__ == "__main__": run_simulation( name=project_info.name, simulation_type="realtime", - modify_pattern_start_time=iso_time + modify_pattern_start_time=iso_time, ) # 打印字典内容以验证 @@ -994,16 +1327,3 @@ if __name__ == "__main__": # 查询示例1:query_SCADA_ID_corresponding_info # result = query_SCADA_ID_corresponding_info(name='bb', SCADA_ID='P10755') # print(result) - - - - - - - - - - - - - diff --git a/timescaledb/router.py b/timescaledb/router.py index 7427b75..ac2fe9c 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -227,7 +227,7 @@ async def delete_scheme_nodes( async def insert_scada_data( data: List[dict], conn: AsyncConnection = Depends(get_database_connection) ): - await ScadaRepository.insert_batch(conn, data) + await ScadaRepository.insert_scada_batch(conn, data) return {"message": f"Inserted {len(data)} records"} @@ -238,18 +238,23 @@ async def get_scada_data( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - return await ScadaRepository.get_data_by_time(conn, device_id, start_time, end_time) + return await ScadaRepository.get_scada_by_id_time_range( + conn, device_id, start_time, end_time + ) @router.get("/scada/{device_id}/field") async def get_scada_field( device_id: str, - time: datetime, + start_time: datetime, + end_time: datetime, field: str, conn: AsyncConnection = Depends(get_database_connection), ): try: - return await ScadaRepository.get_field(conn, time, device_id, field) + return await ScadaRepository.get_scada_field_by_id_time_range( + conn, device_id, start_time, end_time, field + ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -263,7 +268,7 @@ async def update_scada_field( conn: AsyncConnection = Depends(get_database_connection), ): try: - await ScadaRepository.update_field(conn, time, device_id, field, value) + await ScadaRepository.update_scada_field(conn, time, device_id, field, value) return {"message": "Updated successfully"} except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -276,5 +281,5 @@ async def delete_scada_data( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - await ScadaRepository.delete_data_by_time(conn, device_id, start_time, end_time) + await ScadaRepository.delete_scada_by_id_time(conn, device_id, start_time, end_time) return {"message": "Deleted successfully"} diff --git a/timescaledb/schemas/realtime.py b/timescaledb/schemas/realtime.py index d42c3f1..dc8997a 100644 --- a/timescaledb/schemas/realtime.py +++ b/timescaledb/schemas/realtime.py @@ -1,5 +1,5 @@ -from typing import List, Any, Optional -from datetime import datetime +from typing import List, Any, Dict +from datetime import datetime, timedelta from psycopg import AsyncConnection, sql @@ -266,3 +266,150 @@ class RealtimeRepository: "DELETE FROM realtime.node_simulation WHERE time >= %s AND time <= %s", (start_time, end_time), ) + + # --- 复合查询 --- + + @staticmethod + async def store_realtime_simulation_result( + conn: AsyncConnection, + node_result_list: List[Dict[str, any]], + link_result_list: List[Dict[str, any]], + result_start_time: str, + ): + """ + Store realtime simulation results to TimescaleDB. + + Args: + conn: Database connection + node_result_list: List of node simulation results + link_result_list: List of link simulation results + result_start_time: Start time for the results (ISO format string) + """ + # Convert result_start_time string to datetime if needed + if isinstance(result_start_time, str): + simulation_time = datetime.fromisoformat( + result_start_time.replace("Z", "+00:00") + ) + else: + simulation_time = result_start_time + + # Prepare node data for batch insert + node_data = [] + for node_result in node_result_list: + node_data.append( + { + "time": simulation_time, + "id": node_result.get("id"), + "actual_demand": node_result.get("actual_demand"), + "total_head": node_result.get("total_head"), + "pressure": node_result.get("pressure"), + "quality": node_result.get("quality"), + } + ) + + # Prepare link data for batch insert + link_data = [] + for link_result in link_result_list: + link_data.append( + { + "time": simulation_time, + "id": link_result.get("id"), + "flow": link_result.get("flow"), + "friction": link_result.get("friction"), + "headloss": link_result.get("headloss"), + "quality": link_result.get("quality"), + "reaction": link_result.get("reaction"), + "setting": link_result.get("setting"), + "status": link_result.get("status"), + "velocity": link_result.get("velocity"), + } + ) + + # Insert data using batch methods + if node_data: + await RealtimeRepository.insert_nodes_batch(conn, node_data) + + if link_data: + await RealtimeRepository.insert_links_batch(conn, link_data) + + @staticmethod + async def query_all_record_by_time_property( + conn: AsyncConnection, + query_time: str, + type: str, + property: str, + ) -> list: + """ + Query all records by time and property from TimescaleDB. + + Args: + conn: Database connection + query_time: Time to query (ISO format string) + type: Type of data ("node" or "link") + property: Property/field to query + + Returns: + List of records matching the criteria + """ + # Convert query_time string to datetime + if isinstance(query_time, str): + target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00")) + else: + target_time = query_time + + # Create time range: query_time ± 1 second + start_time = target_time - timedelta(seconds=1) + end_time = target_time + timedelta(seconds=1) + + # Query based on type + if type.lower() == "node": + return await RealtimeRepository.get_nodes_field_by_time_range( + conn, start_time, end_time, property + ) + elif type.lower() == "link": + return await RealtimeRepository.get_links_field_by_time_range( + conn, start_time, end_time, property + ) + else: + raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") + + @staticmethod + async def query_simulation_result_by_ID_time( + conn: AsyncConnection, + ID: str, + type: str, + query_time: str, + ) -> list[dict]: + """ + Query simulation results by ID and time from TimescaleDB. + + Args: + conn: Database connection + ID: The ID of the node or link + type: Type of data ("node" or "link") + query_time: Time to query (ISO format string) + + Returns: + List of records matching the criteria + """ + # Convert query_time string to datetime + if isinstance(query_time, str): + target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00")) + else: + target_time = query_time + + # Create time range: query_time ± 1 second + start_time = target_time - timedelta(seconds=1) + end_time = target_time + timedelta(seconds=1) + + # Query based on type + if type.lower() == "node": + return await RealtimeRepository.get_node_by_time_range( + conn, start_time, end_time, ID + ) + elif type.lower() == "link": + return await RealtimeRepository.get_link_by_time_range( + conn, start_time, end_time, ID + ) + else: + raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") diff --git a/timescaledb/schemas/scada.py b/timescaledb/schemas/scada.py index 8e9396e..03271f4 100644 --- a/timescaledb/schemas/scada.py +++ b/timescaledb/schemas/scada.py @@ -25,7 +25,7 @@ class ScadaRepository: ) @staticmethod - async def get_scada_by_id_time( + async def get_scada_by_id_time_range( conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime ) -> List[dict]: async with conn.cursor() as cur: @@ -36,19 +36,23 @@ class ScadaRepository: return await cur.fetchall() @staticmethod - async def get_scada_field_by_id_time( - conn: AsyncConnection, time: datetime, device_id: str, field: str + async def get_scada_field_by_id_time_range( + conn: AsyncConnection, + device_id: str, + start_time: datetime, + end_time: datetime, + field: str, ) -> Any: valid_fields = {"monitored_value", "cleaned_value"} if field not in valid_fields: raise ValueError(f"Invalid field: {field}") query = sql.SQL( - "SELECT {} FROM scada.scada_data WHERE time = %s AND device_id = %s" + "SELECT {} FROM scada.scada_data WHERE time >= %s AND time <= %s AND device_id = %s" ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (time, device_id)) + await cur.execute(query, (start_time, end_time, device_id)) row = await cur.fetchone() return row[field] if row else None @@ -68,7 +72,7 @@ class ScadaRepository: await cur.execute(query, (value, time, device_id)) @staticmethod - async def delete_scada_by_id_time( + async def delete_scada_by_id_time_range( conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime ): async with conn.cursor() as cur: diff --git a/timescaledb/schemas/scheme.py b/timescaledb/schemas/scheme.py index f1fd5a6..7d2021d 100644 --- a/timescaledb/schemas/scheme.py +++ b/timescaledb/schemas/scheme.py @@ -1,5 +1,5 @@ -from typing import List, Any, Optional -from datetime import datetime +from typing import List, Any, Dict +from datetime import datetime, timedelta from psycopg import AsyncConnection, sql @@ -286,3 +286,158 @@ class SchemeRepository: "DELETE FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s", (scheme, start_time, end_time), ) + + # --- 复合查询 --- + + @staticmethod + async def store_scheme_simulation_result( + conn: AsyncConnection, + scheme: str, + node_result_list: List[Dict[str, any]], + link_result_list: List[Dict[str, any]], + result_start_time: str, + ): + """ + Store scheme simulation results to TimescaleDB. + + Args: + conn: Database connection + scheme: Scheme name + node_result_list: List of node simulation results + link_result_list: List of link simulation results + result_start_time: Start time for the results (ISO format string) + """ + # Convert result_start_time string to datetime if needed + if isinstance(result_start_time, str): + simulation_time = datetime.fromisoformat( + result_start_time.replace("Z", "+00:00") + ) + else: + simulation_time = result_start_time + + # Prepare node data for batch insert + node_data = [] + for node_result in node_result_list: + node_data.append( + { + "time": simulation_time, + "scheme": scheme, + "id": node_result.get("id"), + "actual_demand": node_result.get("actual_demand"), + "total_head": node_result.get("total_head"), + "pressure": node_result.get("pressure"), + "quality": node_result.get("quality"), + } + ) + + # Prepare link data for batch insert + link_data = [] + for link_result in link_result_list: + link_data.append( + { + "time": simulation_time, + "scheme": scheme, + "id": link_result.get("id"), + "flow": link_result.get("flow"), + "friction": link_result.get("friction"), + "headloss": link_result.get("headloss"), + "quality": link_result.get("quality"), + "reaction": link_result.get("reaction"), + "setting": link_result.get("setting"), + "status": link_result.get("status"), + "velocity": link_result.get("velocity"), + } + ) + + # Insert data using batch methods + if node_data: + await SchemeRepository.insert_nodes_batch(conn, node_data) + + if link_data: + await SchemeRepository.insert_links_batch(conn, link_data) + + @staticmethod + async def query_all_record_by_scheme_time_property( + conn: AsyncConnection, + scheme: str, + query_time: str, + type: str, + property: str, + ) -> list: + """ + Query all records by scheme, time and property from TimescaleDB. + + Args: + conn: Database connection + scheme: Scheme name + query_time: Time to query (ISO format string) + type: Type of data ("node" or "link") + property: Property/field to query + + Returns: + List of records matching the criteria + """ + # Convert query_time string to datetime + if isinstance(query_time, str): + target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00")) + else: + target_time = query_time + + # Create time range: query_time ± 1 second + start_time = target_time - timedelta(seconds=1) + end_time = target_time + timedelta(seconds=1) + + # Query based on type + if type.lower() == "node": + return await SchemeRepository.get_nodes_field_by_scheme_and_time_range( + conn, scheme, start_time, end_time, property + ) + elif type.lower() == "link": + return await SchemeRepository.get_links_field_by_scheme_and_time_range( + conn, scheme, start_time, end_time, property + ) + else: + raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") + + @staticmethod + async def query_scheme_simulation_result_by_ID_time( + conn: AsyncConnection, + scheme: str, + ID: str, + type: str, + query_time: str, + ) -> list[dict]: + """ + Query scheme simulation results by ID and time from TimescaleDB. + + Args: + conn: Database connection + scheme: Scheme name + ID: The ID of the node or link + type: Type of data ("node" or "link") + query_time: Time to query (ISO format string) + + Returns: + List of records matching the criteria + """ + # Convert query_time string to datetime + if isinstance(query_time, str): + target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00")) + else: + target_time = query_time + + # Create time range: query_time ± 1 second + start_time = target_time - timedelta(seconds=1) + end_time = target_time + timedelta(seconds=1) + + # Query based on type + if type.lower() == "node": + return await SchemeRepository.get_node_by_scheme_and_time_range( + conn, scheme, start_time, end_time, ID + ) + elif type.lower() == "link": + return await SchemeRepository.get_link_by_scheme_and_time_range( + conn, scheme, start_time, end_time, ID + ) + else: + raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")