diff --git a/globals.py b/globals.py index 322ab31..d71c63d 100644 --- a/globals.py +++ b/globals.py @@ -2,7 +2,6 @@ # reservoir basic height RESERVOIR_BASIC_HEIGHT = float(250.35) PATTERN_TIME_STEP = None # 浮点数 - # 实时数据类:element_id和api_query_id对应 reservoirs_id = {} tanks_id = {} @@ -11,22 +10,18 @@ variable_pumps_id = {} pressure_id = {} demand_id = {} quality_id = {} - # 实时数据类:pattern_id和api_query_id对应 source_outflow_pattern_id = {} realtime_pipe_flow_pattern_id = {} pipe_flow_region_patterns = {} # 根据realtime的pipe_flow,对non_realtime的demand进行分区 - # 分区查询 source_outflow_region = {} # 以绑定的管段作为value source_outflow_region_id = {} # 以api_query_id作为value source_outflow_region_patterns = {} # 以associated_pattern作为value # 非实时数据的pattern non_realtime_region_patterns = {} # 基于source_outflow_region进行区分 - realtime_region_pipe_flow_and_demand_id = {} # 基于source_outflow_region搜索该分区中的实时pipe_flow和demand的api_query_id,后续用region的流量 - 实时流量计的流量 realtime_region_pipe_flow_and_demand_patterns = {} # 基于source_outflow_region搜索该分区中的实时pipe_flow和demand的associated_pattern,后续用region的流量 - 实时流量计的流量 - # --------------------------------------------------------- # influxdb_api.py中的全局变量 # 全局变量,用于存储不同类型的realtime api_query_id @@ -39,11 +34,9 @@ pipe_flow_realtime_ids = [] pressure_realtime_ids = [] demand_realtime_ids = [] quality_realtime_ids = [] - # transmission_frequency的最大值 transmission_frequency = None hydraulic_timestep = None # 时间字符串 - reservoir_liquid_level_non_realtime_ids = [] tank_liquid_level_non_realtime_ids = [] fixed_pump_non_realtime_ids = [] @@ -54,3 +47,9 @@ pressure_non_realtime_ids = [] demand_non_realtime_ids = [] quality_non_realtime_ids = [] +# api_query_id和associated_element_id对应,不包含液位和泵 +scheme_source_outflow_ids = {} +scheme_pipe_flow_ids = {} +scheme_pressure_ids = {} +scheme_demand_ids = {} +scheme_quality_ids = {} diff --git a/influxdb_api.py b/influxdb_api.py index 3096413..c29db17 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1,4 +1,4 @@ -from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi +from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi, WriteOptions from typing import List, Dict from datetime import datetime, timedelta, timezone from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS @@ -15,51 +15,14 @@ import globals import csv import pandas as pd import openpyxl -import influxdb_info -import time_api +import pytz # influxdb数据库连接信息 -url = influxdb_info.url -token = influxdb_info.token -org_name = influxdb_info.org +url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 +token = "MhJDl7odKW-y6wNXXUhUMRJ9oPzOvEe52E4NYD5GXtAAMV7BoHMFdet6HqUOt4DjZ-syKjwGao_k0ZIcgrGAPA==" # 替换为你的InfluxDB Token +org_name = "beibei" # 替换为你的Organization名称 client = InfluxDBClient(url=url, token=token, org=org_name) -# # 所有实时更新数据的SCADA设备的ID -# flow_device_ids = ['2498', '3854', '3853'] -# pressure_device_ids = ['2510', '2514'] -# reservoir_liquid_level_ids = ['2497', '2571'] -# tank_liquid_level_ids = ['4780', '9774'] -# pump_device_ids = ['2747', '2776', '2730', '2787', '2500', '2502', '2504'] -# -# # 用于更改数据的SCADA设的ID -# change_data_device_ids = ['2498', '3854', '3853', '2497', '2571', '4780', '9774', -# '2747', '2776', '2730', '2787', '2500', '2502', '2504'] - -# # 全局变量,用于存储不同类型的realtime api_query_id -# reservoir_liquid_level_realtime_ids = [] -# tank_liquid_level_realtime_ids = [] -# fixed_pump_realtime_ids = [] -# variable_pump_realtime_ids = [] -# source_outflow_realtime_ids = [] -# pipe_flow_realtime_ids = [] -# pressure_realtime_ids = [] -# demand_realtime_ids = [] -# quality_realtime_ids = [] -# -# # transmission_frequency的最大值 -# transmission_frequency = None -# hydraulic_timestep = None -# -# reservoir_liquid_level_non_realtime_ids = [] -# tank_liquid_level_non_realtime_ids = [] -# fixed_pump_non_realtime_ids = [] -# variable_pump_non_realtime_ids = [] -# source_outflow_non_realtime_ids = [] -# pipe_flow_non_realtime_ids = [] -# pressure_non_realtime_ids = [] -# demand_non_realtime_ids = [] -# quality_non_realtime_ids = [] - def query_pg_scada_info_realtime(name: str) -> None: """ @@ -198,9 +161,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None: # print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids) # print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids) # print("Maximum Transmission Frequency:", globals.transmission_frequency) - print("Hydraulic Timestep:", globals.hydraulic_timestep) - - + # print("Hydraulic Timestep:", globals.hydraulic_timestep) except Exception as e: print(f"查询时发生错误:{e}") @@ -257,6 +218,8 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None {"name": "realtime_simulation_result", "retention_rules": []}, {"name": "scheme_simulation_result", "retention_rules": []} ] + # 创建一个临时存储点数据的列表 + points_to_write = [] # 创建 Buckets 并初始化数据 for bucket in buckets: # 创建 Bucket @@ -274,10 +237,11 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .tag("device_ID", None) \ .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ - .field("simulation_value", 0.0) \ + .field("simulation_value", None) \ .time("2024-11-21T00:00:00Z") - write_api.write(bucket="SCADA_data", org=org_name, record=point) - print("Initialized SCADA_data with default structure.") + points_to_write.append(point) + # write_api.write(bucket="SCADA_data", org=org_name, record=point) + # print("Initialized SCADA_data with default structure.") elif bucket["name"] == "realtime_simulation_result": # realtime_simulation_result link_point = Point("link") \ .tag("date", None) \ @@ -292,6 +256,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("reaction", 0.0) \ .field("friction", 0.0) \ .time("2024-11-21T00:00:00Z") + points_to_write.append(link_point) node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ @@ -302,9 +267,10 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") - write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point) - write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point) - print("Initialized realtime_simulation_result with default structure.") + points_to_write.append(node_point) + # write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point) + # write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point) + # print("Initialized realtime_simulation_result with default structure.") elif bucket["name"] == "scheme_simulation_result": link_point = Point("link") \ .tag("date", None) \ @@ -319,6 +285,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("setting", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") + points_to_write.append(link_point) node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ @@ -331,9 +298,26 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") - write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point) - write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point) - print("Initialized scheme_simulation_result with default structure.") + points_to_write.append(node_point) + SCADA_point = Point("SCADA") \ + .tag("date", None) \ + .tag("description", None) \ + .tag("device_ID", None) \ + .tag("scheme_Type", None) \ + .tag("scheme_Name", None) \ + .field("monitored_value", 0.0) \ + .field("datacleaning_value", 0.0) \ + .field("scheme_simulation_value", None) \ + .time("2024-11-21T00:00:00Z") + points_to_write.append(SCADA_point) + # write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point) + # write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point) + # write_api.write(bucket="scheme_simulation_result", org=org_name, record=SCADA_point) + # print("Initialized scheme_simulation_result with default structure.") + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 print("All buckets created and initialized successfully.") @@ -346,12 +330,18 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_api = client.write_api(write_options=ASYNCHRONOUS) + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + try_count = 0 reservoir_liquid_level_realtime_data_list = [] tank_liquid_level_realtime_data_list = [] @@ -366,8 +356,10 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str try: try_count += 1 if globals.reservoir_liquid_level_realtime_ids: + # print(globals.reservoir_liquid_level_realtime_ids) reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue( ids=','.join(globals.reservoir_liquid_level_realtime_ids)) + # print(reservoir_liquid_level_realtime_data_list) if globals.tank_liquid_level_realtime_ids: tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids)) if globals.fixed_pump_realtime_ids: @@ -399,7 +391,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 + # 判断时间差是否超过3分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 @@ -415,8 +407,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if tank_liquid_level_realtime_data_list: for data in tank_liquid_level_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -427,7 +420,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 - if time_difference > 60: # 超过1分钟 + if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] @@ -442,8 +435,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if fixed_pump_realtime_data_list: for data in fixed_pump_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -469,8 +463,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if variable_pump_realtime_data_list: for data in variable_pump_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -481,7 +476,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 - if time_difference > 60: # 超过1分钟 + if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] @@ -496,8 +491,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if source_outflow_realtime_data_list: for data in source_outflow_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -523,8 +519,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if pipe_flow_realtime_data_list: for data in pipe_flow_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -550,8 +547,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if pressure_realtime_data_list: for data in pressure_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -577,8 +575,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if demand_realtime_data_list: for data in demand_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -604,8 +603,9 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() if quality_realtime_data_list: for data in quality_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 @@ -631,8 +631,13 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("simulation_value", None) .time(get_real_value_time_utc) ) - write_api.write(bucket=bucket, org=org_name, record=point) - write_api.flush() + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 def convert_time_format(original_time: str) -> str: @@ -646,14 +651,6 @@ def convert_time_format(original_time: str) -> str: return new_time -# 筛选符合条件的数据 -def is_timestep_multiple(data_time, timestep): - # 获取时间点距离当天0点的时间差 - midnight = data_time.replace(hour=0, minute=0, second=0, microsecond=0) - delta_since_midnight = data_time - midnight - # 检查时间差是否为时间步长的整数倍 - return delta_since_midnight.total_seconds() % timestep.total_seconds() == 0 - # 2025/01/10 def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None: """ @@ -664,12 +661,18 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_api = client.write_api(write_options=SYNCHRONOUS) + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + # 将end_date字符串转换为datetime对象 end_date_dt = datetime.strptime(convert_time_format(get_history_data_end_time), '%Y-%m-%d %H:%M:%S') end_date = end_date_dt.strftime('%Y-%m-%d %H:%M:%S') @@ -764,7 +767,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if tank_liquid_level_non_realtime_data_list: for data in tank_liquid_level_non_realtime_data_list: # 创建Point对象 @@ -778,7 +782,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if fixed_pump_non_realtime_data_list: for data in fixed_pump_non_realtime_data_list: # 创建Point对象 @@ -792,7 +797,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if variable_pump_non_realtime_data_list: for data in variable_pump_non_realtime_data_list: # 创建Point对象 @@ -806,7 +812,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if source_outflow_non_realtime_data_list: for data in source_outflow_non_realtime_data_list: # 创建Point对象 @@ -820,7 +827,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if pipe_flow_non_realtime_data_list: for data in pipe_flow_non_realtime_data_list: # 创建Point对象 @@ -834,7 +842,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if pressure_non_realtime_data_list: for data in pressure_non_realtime_data_list: # 创建Point对象 @@ -848,7 +857,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if demand_non_realtime_data_list: for data in demand_non_realtime_data_list: # 创建Point对象 @@ -862,7 +872,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if quality_non_realtime_data_list: for data in quality_non_realtime_data_list: # 创建Point对象 @@ -876,7 +887,12 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 # 2025/03/01 @@ -891,12 +907,18 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_api = client.write_api(write_options=SYNCHRONOUS) + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + begin_date = convert_time_format(begin_time) end_date = convert_time_format(end_time) @@ -1042,7 +1064,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if tank_liquid_level_realtime_data_list: for data in tank_liquid_level_realtime_data_list: # 创建Point对象 @@ -1056,7 +1079,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if fixed_pump_realtime_data_list: for data in fixed_pump_realtime_data_list: # 创建Point对象 @@ -1070,7 +1094,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if variable_pump_realtime_data_list: for data in variable_pump_realtime_data_list: # 创建Point对象 @@ -1084,7 +1109,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if source_outflow_realtime_data_list: for data in source_outflow_realtime_data_list: # 创建Point对象 @@ -1098,7 +1124,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if pipe_flow_realtime_data_list: for data in pipe_flow_realtime_data_list: # 创建Point对象 @@ -1112,7 +1139,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if pressure_realtime_data_list: for data in pressure_realtime_data_list: # 创建Point对象 @@ -1126,7 +1154,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if demand_realtime_data_list: for data in demand_realtime_data_list: # 创建Point对象 @@ -1140,7 +1169,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if quality_realtime_data_list: for data in quality_realtime_data_list: # 创建Point对象 @@ -1154,7 +1184,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if reservoir_liquid_level_non_realtime_data_list: for data in reservoir_liquid_level_non_realtime_data_list: # 创建Point对象 @@ -1168,7 +1199,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if tank_liquid_level_non_realtime_data_list: for data in tank_liquid_level_non_realtime_data_list: # 创建Point对象 @@ -1182,7 +1214,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if fixed_pump_non_realtime_data_list: for data in fixed_pump_non_realtime_data_list: # 创建Point对象 @@ -1196,7 +1229,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if variable_pump_non_realtime_data_list: for data in variable_pump_non_realtime_data_list: # 创建Point对象 @@ -1210,7 +1244,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if source_outflow_non_realtime_data_list: for data in source_outflow_non_realtime_data_list: # 创建Point对象 @@ -1224,7 +1259,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if pipe_flow_non_realtime_data_list: for data in pipe_flow_non_realtime_data_list: # 创建Point对象 @@ -1238,7 +1274,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if pressure_non_realtime_data_list: for data in pressure_non_realtime_data_list: # 创建Point对象 @@ -1252,7 +1289,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if demand_non_realtime_data_list: for data in demand_non_realtime_data_list: # 创建Point对象 @@ -1266,7 +1304,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) if quality_non_realtime_data_list: for data in quality_non_realtime_data_list: # 创建Point对象 @@ -1280,21 +1319,25 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("simulation_value", None) .time(data['time']) ) - write_api.write(bucket=bucket, org=org_name, record=point) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]: """ 根据SCADA设备的ID和时间查询值 - :param query_ids_list: SCADA设备ID的列表, 是api_query 而不是 普通的Id + :param query_ids_list: SCADA设备ID的列表 :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 :param client: 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1302,10 +1345,8 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) utc_time = beijing_time.astimezone(timezone.utc) - - # DingZQ temp change delta to 5 from 1 - utc_start_time = utc_time - timedelta(seconds=5) - utc_stop_time = utc_time + timedelta(seconds=5) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) # 构建查询字典 SCADA_result_dict = {} for device_id in query_ids_list: @@ -1313,8 +1354,7 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["device_ID"] == "{device_id}") - |> filter(fn: (r) => r["_field"] == "monitored_value") + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value") ''' # 执行查询 try: @@ -1468,7 +1508,6 @@ def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, str(start_time), str(end_time), bucket, client) - # 2025/02/01 def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], result_start_time: str, @@ -1482,17 +1521,20 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ - if client.ping(): - print("{} -- store_realtime_simulation_result_to_influxdb : Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: - print("{} -- store_realtime_simulation_result_to_influxdb : Failed to connect to InfluxDB.".format( + print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - # 开始写入数据 try: - write_api = client.write_api() + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] date_str = result_start_time.split('T')[0] time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat() for result in node_result_list: @@ -1511,9 +1553,10 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ .time(time_beijing) + points_to_write.append(node_point) # 写入数据到 InfluxDB,多个 field 在同一个 point 中 - write_api.write(bucket=bucket, org=org_name, record=node_point) - write_api.flush() + # write_api.write(bucket=bucket, org=org_name, record=node_point) + # write_api.flush() print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。") for result in link_result_list: link_id = result.get('link') @@ -1531,9 +1574,14 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str .field("reaction", data.get('reaction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \ .time(time_beijing) - write_api.write(bucket=bucket, org=org_name, record=link_point) - write_api.flush() - print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") + points_to_write.append(link_point) + # write_api.write(bucket=bucket, org=org_name, record=link_point) + # write_api.flush() + # print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") @@ -1543,14 +1591,13 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati """ 查询指定ID的最新的一条记录 :param ID: (str): 要查询的 ID。 - :param type: (str): "node"或“link”或'scada' + :param type: (str): "node"或“link” :param bucket: (str): 数据存储的 bucket 名称。 :param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。 :return: dict: 最新记录的数据,如果没有找到则返回 None。 """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1559,8 +1606,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati flux_query = f''' from(bucket: "{bucket}") |> range(start: -7d) // 查找最近七天的记录 - |> filter(fn: (r) => r["_measurement"] == "node") - |> filter(fn: (r) => r["ID"] == "{ID}") + |> filter(fn: (r) => r["_measurement"] == "node" and r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -1588,8 +1634,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati flux_query = f''' from(bucket: "{bucket}") |> range(start: -7d) // 查找最近七天的记录 - |> filter(fn: (r) => r["_measurement"] == "link") - |> filter(fn: (r) => r["ID"] == "{ID}") + |> filter(fn: (r) => r["_measurement"] == "link" and r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -1652,8 +1697,7 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r :return: dict: tuple: (node_records, link_records) """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1710,7 +1754,7 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r return node_records, link_records -# 2025/03/03 WMH +# 2025/03/03 def query_all_record_by_time_property(query_time: str, type: str, property: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list: """ @@ -1723,8 +1767,7 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str, :return: list(dict): result_records """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1745,13 +1788,11 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str, flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> filter(fn: (r) => r["_field"] == "{property}") + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) result_records = [] - # 解析查询结果 for table in tables: for record in table.records: @@ -1772,30 +1813,30 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r :param client: 已初始化的InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ + # 记录开始时间 + time_cost_start = time.perf_counter() + print('{} -- Hydraulic simulation started.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间 start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() - print(start_time) - stop_time = (datetime.strptime(query_date, "%Y-%m-%d")).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() - print(stop_time) # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") - |> range(start: {start_time}, stop: {stop_time}) - |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") - |> filter(fn: (r) => r["date"] == "{query_date}") + |> range(start: {start_time}) + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link" and r["date"] == "{query_date}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) ''' - # 执行查询 tables = query_api.query(flux_query) node_records = [] @@ -1829,9 +1870,14 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r "reaction": record["reaction"], "friction": record["friction"] }) + time_cost_end = time.perf_counter() + print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), + time_cost_end - time_cost_start)) return node_records, link_records -# 2025/02/21 WMH + +# 2025/02/21 def query_all_record_by_date_property(query_date: str, type: str, property: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list: """ @@ -1843,10 +1889,15 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, :param client: 已初始化的InfluxDBClient 实例。 :return: list(dict): result_records """ - - if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - + # 记录开始时间 + time_cost_start = time.perf_counter() + print('{} -- Hydraulic simulation started.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + if client.ping(): + pass + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 确定 measurement if type == "node": @@ -1855,45 +1906,30 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, measurement = "link" else: raise ValueError(f"不支持的类型: {type}") - # 将 start_date 的北京时间转换为 UTC 时间 - bg_start_time, bg_end_time = time_api.parse_beijing_date_range(query_date) - # bg_end_time = bg_start_time + timedelta(hours=2) # 服务器性能不行,暂时返回2个小时的数据 - utc_start_time = time_api.to_utc_time(bg_start_time) - utc_end_time = time_api.to_utc_time(bg_end_time) - - print(f"utc_start_time: {utc_start_time}, utc_end_time: {utc_end_time}") - - # 构建 Flux 查询语句 - print("before query influxdb") - + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' - from(bucket: "{bucket}") - |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) - |> filter(fn: (r) => - r._measurement == "{measurement}" and - r._field == "{property}" - ) - |> keep(columns: ["ID", "_time", "_value"]) + from(bucket: "{bucket}") + |> range(start: {start_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["date"] == "{query_date}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) - - print("after query influxdb") - print(f"now time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - result_records = [] # 解析查询结果 for table in tables: for record in table.records: # print(record.values) # 打印完整记录内容 result_records.append({ - "time": record["_time"], "ID": record["ID"], - property: record["_value"] + "time": record["_time"], + "value": record["_value"] }) - + time_cost_end = time.perf_counter() + print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), + time_cost_end - time_cost_start)) return result_records @@ -1911,8 +1947,7 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star :return: 查询结果的列表 """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1935,9 +1970,7 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> filter(fn: (r) => r["ID"] == "{ID}") - |> filter(fn: (r) => r["_field"] == "{property}") + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["ID"] == "{ID}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) @@ -1951,31 +1984,6 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star }) return results -def query_buckets(client: InfluxDBClient=client) -> list[str]: - # 获取 Buckets API 实例 - buckets_api = client.buckets_api() - - # 查询所有 Buckets - buckets = buckets_api.find_buckets().buckets - - print("All Buckets:") - buckets_list = [] - for bucket in buckets: - buckets_list.append(bucket.name) - - return buckets_list - -def query_measurements(bucket: str, client: InfluxDBClient=client) -> list[str]: - query = f''' - import "influxdata/influxdb/schema" - schema.measurements(bucket: "{bucket}") - ''' - - result = client.query_api().query(query) - - # 提取测量名称 - measurements = [row.values["_value"] for table in result for row in table.records] - return measurements # 2025/02/13 def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], @@ -1994,13 +2002,18 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) try: - write_api = client.write_api() + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] date_str = scheme_start_time.split('T')[0] time_beijing = datetime.strptime(scheme_start_time, '%Y-%m-%dT%H:%M:%S%z') timestep_parts = globals.hydraulic_timestep.split(':') @@ -2026,9 +2039,10 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ .time(scheme_time) + points_to_write.append(node_point) # 写入数据到 InfluxDB,多个 field 在同一个 point 中 - write_api.write(bucket=bucket, org=org_name, record=node_point) - write_api.flush() + # write_api.write(bucket=bucket, org=org_name, record=node_point) + # write_api.flush() for link_result in link_result_list: link_id = link_result.get('link') for period_index in range(num_periods): @@ -2049,12 +2063,169 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, .field("reaction", data.get('reaction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \ .time(scheme_time) - write_api.write(bucket=bucket, org=org_name, record=link_point) - write_api.flush() + points_to_write.append(link_point) + # write_api.write(bucket=bucket, org=org_name, record=link_point) + # write_api.flush() + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") +# 2025/03/12 +def query_corresponding_query_id_and_element_id(name: str) -> None: + """ + 查询scada_info这张表中,api_query_id与associated_element_id的对应关系,用于下一步fill_scheme_simulation_result_to_SCADA + :param name: 数据库名称 + :return: + """ + # 连接数据库 + conn_string = f"dbname={name} host=127.0.0.1" + try: + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 查询 transmission_mode 为 'realtime' 的记录 + cur.execute(""" + SELECT type, associated_element_id, api_query_id + FROM scada_info + WHERE type IN ('source_outflow', 'pipe_flow', 'demand', 'pressure', 'quality'); + """) + records = cur.fetchall() + # 遍历查询结果,根据 type 分类存入对应的字典 + for record in records: + record_type, associated_element_id, api_query_id = record + if record_type == "source_outflow": + globals.scheme_source_outflow_ids[api_query_id] = associated_element_id + elif record_type == "pipe_flow": + globals.scheme_pipe_flow_ids[api_query_id] = associated_element_id + elif record_type == "pressure": + globals.scheme_pressure_ids[api_query_id] = associated_element_id + elif record_type == "demand": + globals.scheme_demand_ids[api_query_id] = associated_element_id + elif record_type == "quality": + globals.scheme_quality_ids[api_query_id] = associated_element_id + # 如果需要调试,可以打印该字典 + # print("scheme_source_outflow_ids:", globals.scheme_source_outflow_ids) + # print("scheme_pipe_flow_ids:", globals.scheme_pipe_flow_ids) + # print("scheme_pressure_ids:", globals.scheme_pressure_ids) + # print("scheme_demand_ids:", globals.scheme_demand_ids) + # print("scheme_quality_ids:", globals.scheme_quality_ids) + except psycopg.Error as e: + print(f"数据库连接或查询出错: {e}") + + +# 2025/03/11 +def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: str = None, query_date: str = None, + bucket: str = "scheme_simulation_result", client: InfluxDBClient = client): + """ + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' + :param bucket: InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result" + :param client: 已初始化的 InfluxDBClient 实例 + :return: + """ + if client.ping(): + pass + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + # 查找associated_element_id的对应值 + for key, value in globals.scheme_source_outflow_ids.items(): + scheme_source_outflow_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='link', property='flow')) + # print(f"Key: {key}, Query result: {scheme_source_outflow_result}") # 调试输出 + for data in scheme_source_outflow_result: + point = ( + Point('scheme_source_outflow') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time']) + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_pipe_flow_ids.items(): + scheme_pipe_flow_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='link', property='flow')) + for data in scheme_pipe_flow_result: + point = ( + Point('scheme_pipe_flow') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time']) + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_pressure_ids.items(): + scheme_pressure_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='node', property='pressure')) + for data in scheme_pressure_result: + point = ( + Point('scheme_pressure') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time']) + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_demand_ids.items(): + scheme_demand_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='node', property='actualdemand')) + for data in scheme_demand_result: + point = ( + Point('scheme_demand') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time']) + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_quality_ids.items(): + scheme_quality_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='node', property='quality')) + for data in scheme_quality_result: + point = ( + Point('scheme_quality') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time']) + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + + # 2025/02/15 def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list: """ @@ -2067,8 +2238,7 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu :return: 查询结果的列表 """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2108,8 +2278,7 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti :return: dict: tuple: (node_records, link_records) """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2123,9 +2292,7 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") - |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -2168,7 +2335,7 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti return node_records, link_records -# 2025/03/04 WMH +# 2025/03/04 def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, query_time: str, type: str, property: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: """ @@ -2183,8 +2350,7 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, :return: dict: tuple: (node_records, link_records) """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2205,10 +2371,7 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> filter(fn: (r) => r["_field"] == "{property}") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) @@ -2224,12 +2387,13 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, # 2025/02/19 -def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: str, type: str, property: str, +def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_date: str, ID: str, type: str, property: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: """ 根据scheme_Type和scheme_Name,查询该模拟方案中,某一node或link的某一属性值的所有时间的结果 :param scheme_Type: 方案类型 :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param ID: 元素的ID :param type: 元素的类型,node或link :param property: 元素的属性值 @@ -2238,8 +2402,7 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: st :return: 查询结果的列表 """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2255,11 +2418,7 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: st flux_query = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") - |> filter(fn: (r) => r["ID"] == "{ID}") - |> filter(fn: (r) => r["_field"] == "{property}") + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["ID"] == "{ID}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) @@ -2275,19 +2434,19 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: st # 2025/02/21 -def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", +def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> tuple: """ 查询指定方案的所有记录,包括‘node'和‘link’,分别以指定格式返回。 :param scheme_Type: 方案类型 :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称。 :param client: 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2296,9 +2455,7 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="sch flux_query = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") - |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -2342,13 +2499,14 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="sch return node_records, link_records -# 2025/03/04 WMH -def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, type: str, property: str, +# 2025/03/04 +def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, query_date: str, type: str, property: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: """ 查询指定方案的‘node'或‘link’的某一属性值,以指定格式返回。 :param scheme_Type: 方案类型 :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param type: 查询的类型(决定 measurement) :param property: 查询的字段名称(field) :param bucket: 数据存储的 bucket 名称。 @@ -2356,8 +2514,7 @@ def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, type: s :return: dict: tuple: (node_records, link_records) """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2373,10 +2530,7 @@ def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, type: s flux_query = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") - |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> filter(fn: (r) => r["_field"] == "{property}") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) @@ -2403,8 +2557,7 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_ :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2455,8 +2608,7 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2540,8 +2692,7 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2618,18 +2769,19 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, # 2025/02/18 -def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: +def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, query_date: str, + bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: """ 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 :param scheme_Type: 查询的方案类型 :param scheme_Name: 查询的方案名 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" :param client: 已初始化的 InfluxDBClient 实例 :return: """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + pass else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2638,9 +2790,7 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: flux_query_link = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["_measurement"] == "link") - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}") ''' # 执行查询 link_tables = query_api.query(flux_query_link) @@ -2662,9 +2812,7 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: flux_query_node = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["_measurement"] == "node") - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") - |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "node" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") ''' # 执行查询 node_tables = query_api.query(flux_query_node) @@ -2727,18 +2875,22 @@ if __name__ == "__main__": query_pg_scada_info_realtime('bb') query_pg_scada_info_non_realtime('bb') + query_corresponding_query_id_and_element_id('bb') + + + # 手动执行存储测试 # 示例1:store_realtime_SCADA_data_to_influxdb - # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-25T23:45:00+08:00') + store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-03-12T23:45:00+08:00') # 示例2:store_non_realtime_SCADA_data_to_influxdb - # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00') + # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-03-08T12:00:00+08:00') # 示例3:download_history_data_manually - # download_history_data_manually(begin_time='2025-02-25T00:00:00+08:00', end_time='2025-02-26T00:00:00+08:00') + # download_history_data_manually(begin_time='2025-03-04T00:00:00+08:00', end_time='2025-03-10T00:00:00+08:00') # step3: 查询测试示例 - with InfluxDBClient(url=url, token=token, org=org_name) as client: + # with InfluxDBClient(url=url, token=token, org=org_name) as client: # # 示例1:query_latest_record_by_ID # bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 @@ -2764,11 +2916,11 @@ if __name__ == "__main__": # print(curve_result) # 示例4:query_SCADA_data_by_device_ID_and_time - SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.fixed_pump_realtime_ids, query_time='2025-03-08T23:45:00+08:00') - print(SCADA_result_dict) + # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.fixed_pump_realtime_ids, query_time='2025-03-09T23:45:00+08:00') + # print(SCADA_result_dict) # 示例5:query_SCADA_data_curve - # SCADA_result = query_SCADA_data_curve(api_query_id='3853', start_date='2025-02-14', end_date='2025-02-16') + # SCADA_result = query_SCADA_data_curve(api_query_id='9519', start_date='2025-03-08', end_date='2025-03-08') # print(SCADA_result) # 示例6:export_SCADA_data_to_csv @@ -2781,7 +2933,7 @@ if __name__ == "__main__": # export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15') # 示例9:export_scheme_simulation_result_to_csv_scheme - # export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1') + # export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10') # 示例10:query_scheme_all_record_by_time # node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00") @@ -2794,12 +2946,12 @@ if __name__ == "__main__": # print(curve_result) # 示例12:query_all_record_by_date - # node_records, link_records = query_all_record_by_date(query_date='2025-02-14') + # node_records, link_records = query_all_record_by_date(query_date='2025-02-27') # print("Node 数据:", node_records) # print("Link 数据:", link_records) # 示例13:query_scheme_all_record - # node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1') + # node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10') # print("Node 数据:", node_records) # print("Link 数据:", link_records) @@ -2808,7 +2960,7 @@ if __name__ == "__main__": # print(result_records) # 示例15:query_all_record_by_date_property - # result_records = query_all_record_by_date_property(query_date='2025-02-27', type='node', property='head') + # result_records = query_all_record_by_date_property(query_date='2025-02-14', type='node', property='head') # print(result_records) # 示例16:query_scheme_all_record_by_time_property @@ -2817,8 +2969,11 @@ if __name__ == "__main__": # print(result_records) # 示例17:query_scheme_all_record_property - # result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', type='node', property='head') + # result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10', type='node', property='head') # print(result_records) + # 示例18:fill_scheme_simulation_result_to_SCADA + # fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_Name='burst_scheme', query_date='2025-03-10') + diff --git a/scada_info.csv b/scada_info.csv index 081b923..6be6e48 100644 --- a/scada_info.csv +++ b/scada_info.csv @@ -56,11 +56,11 @@ ZBBDFQJL000035,demand,ZBBDFQJL000035,ZhongYangXinDu,,ZBBGXSZW000377,P17021,,,,95 J06186,demand,J06186,XinHaiJiaYuan,,ZBBGXSZW000377,P17021,,,,9525,non_realtime,6:00:00,106.4067,29.8278 ZBBDFQJL000052,demand,ZBBDFQJL000052,DongFengJie,,ZBBGXSZW000377,P17021,,,,9526,non_realtime,6:00:00,106.3678,29.766 ZBBDFQJL000049,demand,ZBBDFQJL000049,DingYaXinYu,,ZBBGXSZW000377,P17021,,,,9500,non_realtime,6:00:00,106.3674,29.7654 -GSD2302160925534D50CD17540D_E,demand,GSD2302160925534D50CD17540D_E,ZiYunTai,,ZBBGXSZW000377,P17021,,,,9529,non_realtime,6:00:00,106.387,29.7884 +GSD2302160925534D50CD17540D_E,demand,GSD2302160925534D50CD17540D_End,ZiYunTai,,ZBBGXSZW000377,P17021,,,,9529,non_realtime,6:00:00,106.387,29.7884 ZBBDFQJL000050,demand,ZBBDFQJL000050,XieMaGuangChang,,ZBBGXSZW000377,P17021,,,,9477,non_realtime,6:00:00,106.3652,29.7657 GSD2307192058578BCE265C6EA8,demand,GSD2307192058578BCE265C6EA8,YongJinFu,,ZBBGXSZW000377,P17021,,,,9534,non_realtime,6:00:00,106.3831,29.781 ZBBDFQJL000028,demand,ZBBDFQJL000028,PanXiMingDu,,P16504,,,,,9514,non_realtime,6:00:00,106.4223,29.821 -GSD230113095617514F4A2066D7_E,demand,GSD230113095617514F4A2066D7_E,WanKeJinYuHuaFuGaoCeng,,P16504,,,,,9528,non_realtime,6:00:00,106.4228,29.8113 +GSD230113095617514F4A2066D7_E,demand,GSD230113095617514F4A2066D7_End,WanKeJinYuHuaFuGaoCeng,,P16504,,,,,9528,non_realtime,6:00:00,106.4228,29.8113 ZBBDFQJL000014,demand,ZBBDFQJL000014,KeJiXiao,,P16504,,,,,9503,non_realtime,6:00:00,106.4332,29.8258 ZBBDFQJL000016,demand,ZBBDFQJL000016,LuGouQiao,,P16504,,,,,9527,non_realtime,6:00:00,106.437,29.8285 ZBBDFQJL000057,demand,ZBBDFQJL000057,LongJiangHuaYuan,,P16504,,,,,9495,non_realtime,6:00:00,106.4316,29.8309 @@ -74,7 +74,7 @@ ZBBDFQJL000005,demand,ZBBDFQJL000005,TaiJiBinJiangYiQi,,P16504,,,,,9484,non_real ZBBDFQJL000006,demand,ZBBDFQJL000006,TianQiHuaYuan,,P16504,,,,,9523,non_realtime,6:00:00,106.4361,29.8189 ZBBDFQJL000004,demand,ZBBDFQJL000004,TaiJiBinJiangErQi,,P16504,,,,,9515,non_realtime,6:00:00,106.4443,29.8297 ZBBDFQJL000039,demand,ZBBDFQJL000039,122Zhong,,P16504,,,,,9516,non_realtime,6:00:00,106.4284,29.8109 -GSD230112144241F42EF6065148_E,demand,GSD230112144241F42EF6065148_E,WanKeJinYuHuaFuYangFang,,P16504,,,,,9530,non_realtime,6:00:00,106.4263,29.8154 +GSD230112144241F42EF6065148_E,demand,GSD230112144241F42EF6065148_End,WanKeJinYuHuaFuYangFang,,P16504,,,,,9530,non_realtime,6:00:00,106.4263,29.8154 J06194,pressure,J06194,,,,,,,,2514,realtime,0:01:00,106.4195183,29.83782213 J06190,pressure,J06190,,,,,,,,2510,realtime,0:01:00,106.4194644,29.83781489 ZBBDFQJL000025_p,pressure,ZBBDFQJL000025,,,,,,,,9536,non_realtime,6:00:00,106.4120554,29.82753087 diff --git a/sensor_placement.py b/sensor_placement.py new file mode 100644 index 0000000..b570741 --- /dev/null +++ b/sensor_placement.py @@ -0,0 +1,557 @@ +# 改进灵敏度法 +import networkx +import numpy as np +import pandas +import wntr +import pandas as pd +import copy +import matplotlib.pyplot as plt +import networkx as nx +from sklearn.cluster import KMeans +from wntr.epanet.toolkit import EpanetException +from numpy.linalg import slogdet +import random +from tjnetwork import * + + +# 2025/03/12 +# Step1: 获取节点坐标 +def getCoor(wn: wntr.network.WaterNetworkModel) -> pandas.DataFrame: + """ + 获取管网模型的节点坐标 + :param wn: 由wntr生成的模型 + :return: 节点坐标 + """ + # site: pandas.Series + # index:节点名称(wn.node_name_list) + # values:每个节点的坐标,格式为 tuple(如 (x, y) 或 (x, y, z)) + site = wn.query_node_attribute('coordinates') + # Coor: pandas.Series + # index:与site相同(节点名称)。 + # values:坐标转换为numpy.ndarray(如array([10.5, 20.3])) + Coor = site.apply(lambda x: np.array(x)) # 将节点坐标转换为numpy数组 + # x, y: list[float] + x = [] # 存储所有节点的 x 坐标 + y = [] # 存储所有节点的 y 坐标 + for i in range(0, len(Coor)): + x.append(Coor.values[i][0]) # 将 x 坐标存入 x 列表。 + y.append(Coor.values[i][1]) # 将 y 坐标存入 y 列表 + # xy: dict[str, list], x、y 坐标的字典 + xy = {'x': x, 'y': y} + # Coor_node: pandas.DataFrame, 存储节点 x, y 坐标的 DataFrame + Coor_node = pd.DataFrame(xy, index=wn.node_name_list, columns=['x', 'y']) + return Coor_node + + +# 2025/03/12 +# Step2: KMeans 聚类 +# 将节点用kmeans根据坐标分为k组,存入字典g +def kgroup(coor: pandas.DataFrame, knum: int) -> dict[int, list[str]]: + """ + 使用KMeans聚类,将节点坐标分组 + :param coor: 存储所有节点的坐标数据 + :param knum: 需要分成的聚类数 + :return: 聚类结果字典 + """ + g = {} + # estimator: sklearn.cluster.KMeans,KMeans 聚类模型 + estimator = KMeans(n_clusters=knum) + estimator.fit(coor) + # label_pred: numpy.ndarray(int),每个点的类别标签 + label_pred = estimator.labels_ + for i in range(0, knum): + g[i] = coor[label_pred == i].index.tolist() + return g + + +# 2025/03/12 +# Step3: wn_func类,水力计算 +# wn_func 主要用于计算: +# 水力距离(hydraulic length):即节点之间的水力阻力。 +# 灵敏度分析(sensitivity analysis):用于优化测压点的布置。 +# 一些与水力相关的函数,包括 CtoS:求水力距离,stafun:求状态函数F +# # diff:求F对P的导数,返回灵敏度矩阵A +# # sensitivity:返回灵敏度和总灵敏度 +class wn_func(object): + + # Step3.1: 初始化 + def __init__(self, wn: wntr.network.WaterNetworkModel): + """ + 获取管网模型信息 + :param wn: 由wntr生成的模型 + """ + # self.results: wntr.sim.results.SimulationResults,仿真结果,包含压力、流量、水头等数据 + self.results = wntr.sim.EpanetSimulator(wn).run_sim() # 存储运行结果 + self.wn = wn + # self.q:pandas.DataFrame,管道流量,索引为时间步长,列为管道名称 + self.q = self.results.link['flowrate'] + # ReservoirIndex / Tankindex: list[str],水库 / 水箱节点名称列表 + ReservoirIndex = wn.reservoir_name_list + Tankindex = wn.tank_name_list + # 删除水库节点,删除与直接水库相连的虚拟管道 + # self.pipes: list[str],所有管道的名称 + self.pipes = wn.pipe_name_list + # self.nodes: list[str],所有节点的名称 + self.nodes = wn.node_name_list + # self.coordinates:pandas.Series,节点坐标,索引为节点名,值为 (x, y) 坐标的 tuple + self.coordinates = wn.query_node_attribute('coordinates') + # allpumps / allvalves: list[str],所有泵/阀门名称列表 + allpumps = wn.pump_name_list + allvalves = wn.valve_name_list + # pumpstnode / pumpednode / valvestnode / valveednode: list[str],存储泵和阀门 起终点节点的名称 + pumpstnode = [] + pumpednode = [] + valvestnode = [] + valveednode = [] + # Reservoirpipe / Reservoirednode: list[str],记录与水库相关的管道和节点 + Reservoirpipe = [] + Reservoirednode = [] + for pump in allpumps: + pumpstnode.append(wn.links[pump].start_node.name) + pumpednode.append(wn.links[pump].end_node.name) + for valve in allvalves: + valvestnode.append(wn.links[valve].start_node.name) + valveednode.append(wn.links[valve].end_node.name) + for pipe in self.pipes: + if wn.links[pipe].start_node.name in ReservoirIndex: + Reservoirpipe.append(pipe) + Reservoirednode.append(wn.links[pipe].end_node.name) + if wn.links[pipe].start_node.name in Tankindex: + Reservoirpipe.append(pipe) + Reservoirednode.append(wn.links[pipe].end_node.name) + if wn.links[pipe].end_node.name in Tankindex: + Reservoirpipe.append(pipe) + Reservoirednode.append(wn.links[pipe].start_node.name) + # 泵的起终点、tank、reservoir + # self.delnodes: list[str],需要删除的节点(包括水库、泵、阀门连接的节点) + self.delnodes = list( + set(ReservoirIndex).union(Tankindex, pumpstnode, pumpednode, valvestnode, valveednode, Reservoirednode)) + # 泵、起终点为tank、reservoir的管道 + # self.delpipes: list[str],需要删除的管道(包括水库、泵、阀门连接的管道) + self.delpipes = list(set(wn.pump_name_list).union(wn.valve_name_list).union(Reservoirpipe)) + self.pipes = [pipe for pipe in wn.pipe_name_list if pipe not in self.delpipes] + # self.L: list[float],所有管道的长度(以米为单位) + self.L = wn.query_link_attribute('length')[self.pipes].tolist() + self.n = len(self.nodes) + self.m = len(self.pipes) + # self.unit_headloss: list[float],单位水头损失(headloss 数据的第一行,单位:米/km) + self.unit_headloss = self.results.link['headloss'].iloc[0, :].tolist() + ## + self.delnodes1 = list(set(ReservoirIndex).union(Tankindex)) + + # Step3.2: 计算水力距离 + def CtoS(self): + """ + 计算水力距离矩阵 + :return: + """ + # 水力距离:当行索引对应的节点为控制点时,列索引对应的节点距离控制点的(路径*水头损失)的最小值 + # nodes:list[str](节点名称) + nodes = copy.deepcopy(self.nodes) + # pipes:list[str](管道名称) + pipes = self.pipes + wn = self.wn + # n / m:int(节点数 / 管道数) + n = self.n + m = self.m + s1 = [0] * m + q = self.q + L = self.L + # H1:pandas.DataFrame,水头数据,索引为时间步长,列为节点名 + H1 = self.results.node['head'].T + # hh:list[float],计算管道两端水头之差 + hh = [] + # 水头损失 + for p in pipes: + h1 = self.wn.links[p].start_node.name + h1 = H1.loc[str(h1)] + h2 = self.wn.links[p].end_node.name + h2 = H1.loc[str(h2)] + hh.append(abs(h1 - h2)) + hh = np.array(hh) + # headloss:pandas.DataFrame,管道水头损失矩阵 + headloss = pd.DataFrame(hh, index=pipes).T + # s1:管道阻力系数,s2:将管道阻力系数与管道的起始节点和终止节点对应 + hf = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) + weightL = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) + # s2为对应管道起始节点与终止节点的粗糙度系数矩阵,index代表起始节点,columns代表终止节点 + G = nx.DiGraph() + for i in range(0, m): + pipe = pipes[i] + a = wn.links[pipe].start_node.name + b = wn.links[pipe].end_node.name + if q.loc[0, pipe] > 0: + hf.loc[a, b] = headloss.loc[0, pipe] + weightL.loc[a, b] = headloss.loc[0, pipe] * L[i] + G.add_weighted_edges_from([(a, b, weightL.loc[a, b])]) + + else: + hf.loc[b, a] = headloss.loc[0, pipe] + weightL.loc[b, a] = headloss.loc[0, pipe] * L[i] + G.add_weighted_edges_from([(b, a, weightL.loc[b, a])]) + + hydraulicL = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) + + for a in nodes: + if a in G.nodes: + d = nx.shortest_path_length(G, source=a, weight='weight') + for b in list(d.keys()): + hydraulicL.loc[a, b] = d[b] + + hydraulicL = hydraulicL.drop(self.delnodes) + hydraulicL = hydraulicL.drop(self.delnodes, axis=1) + + # 求加权水力距离 + return hydraulicL, G + + # Step3.3: 计算灵敏度矩阵 + # 获取关系矩阵 + def get_Conn(self): + """ + 计算管网连接关系矩阵 + :return: + """ + m = self.wn.num_links + n = self.wn.num_nodes + p = self.wn.num_pumps + v = self.wn.num_valves + + self.nonjunc_index = [] + self.non_link_index = [] + for r in self.wn.reservoirs(): + self.nonjunc_index.append(r[0]) + for t in self.wn.tanks(): + self.nonjunc_index.append(t[0]) + # Conn:numpy.matrix,节点-管道连接矩阵,起点 -1,终点 1 + Conn = np.mat(np.zeros([n, m - p - v])) # 节点和管道的关系矩阵,行为节点,列为管道,起点为-1,终点为1 + # NConn:numpy.matrix,节点-节点连接矩阵,有管道相连的地方设为 1 + NConn = np.mat(np.zeros([n, n])) # 节点之间的关系,之间有管道为1,反之为0 + # pipes:list[str],去除泵和阀门的管道列表 + pipes = [pipe for pipe in self.wn.pipes() if pipe not in self.wn.pumps() and pipe not in self.wn.valves()] + for pipe_name, pipe in pipes: + start = self.wn.node_name_list.index(pipe.start_node_name) + end = self.wn.node_name_list.index(pipe.end_node_name) + p_index = self.wn.link_name_list.index(pipe_name) + Conn[start, p_index] = -1 + Conn[end, p_index] = 1 + NConn[start, end] = 1 + NConn[end, start] = 1 + self.A = Conn + link_name_list = [link for link in self.wn.link_name_list if + link not in self.wn.pump_name_list and link not in self.wn.valve_name_list] + self.A2 = pd.DataFrame(self.A, index=self.wn.node_name_list, columns=link_name_list) + self.A2 = self.A2.drop(self.delnodes) + for pipe in self.delpipes: + if pipe not in self.wn.pump_name_list and pipe not in self.wn.valve_name_list: + self.A2 = self.A2.drop(columns=pipe) + self.junc_list = self.A2.index + self.A2 = np.mat(self.A2) # 节点管道关系 + self.A3 = NConn + + def Jaco(self, hL: pandas.DataFrame): + """ + 计算灵敏度矩阵(节点压力对粗糙度变化的响应) + :param hL: 水力距离矩阵 + :return: + """ + # global result + # A:numpy.matrix, 节点-管道关系矩阵 + A = self.A2 + wn = self.wn + + try: + result = wntr.sim.EpanetSimulator(wn).run_sim() + except EpanetException: + pass + finally: + h = result.link['headloss'][self.pipes].values[0] + q = result.link['flowrate'][self.pipes].values[0] + l = self.wn.query_link_attribute('length')[self.pipes] + C = self.wn.query_link_attribute('roughness')[self.pipes] + # headloss:numpy.ndarray,水头损失数组 + headloss = np.array(h) + # 调整流量方向 + for i in range(0, len(q)): + if q[i] < 0: + A[:, i] = -A[:, i] + # q:numpy.ndarray,流量数组 + q = np.abs(q) + # 两个灵敏度矩阵 + # B / S:numpy.matrix,灵敏度计算的中间矩阵 + B = np.mat(np.diag(q / ((1.852 * headloss) + 1e-10))) + S = np.mat(np.diag(q / C)) + # X:numpy.matrix, 灵敏度矩阵 + X = A * B * A.T + try: + det = np.linalg.det(X) + except RuntimeError as e: + sign, logdet = slogdet(X) # 防止溢出 + det = sign * np.exp(logdet) + if det != 0: + J_H_Cw = X.I * A * S + # J_H_Q = -X.I + J_q_Cw = S - B * A.T * X.I * A * S # 去掉了delnodes和delpipes + # J_q_Q = B * A.T * X.I + else: # 当X不可逆 + J_H_Cw = np.linalg.pinv(X) @ A @ S + # J_H_Q = -np.linalg.pinv(X) + J_q_Cw = S - B * A.T * np.linalg.pinv(X) * A * S + # J_q_Q = B * A.T * np.linalg.pinv(X) + + Sen_pressure = [] + S_pressure = np.abs(J_H_Cw).sum(axis=1).tolist() # 修改为绝对值 + for ss in S_pressure: + Sen_pressure.append(ss[0]) + # 求总灵敏度 + SS_pressure = copy.deepcopy(hL) + for i in range(0, len(Sen_pressure)): + SS_pressure.iloc[i, :] = SS_pressure.iloc[i, :] * Sen_pressure[i] + SS = copy.deepcopy(hL) + for i in range(0, len(Sen_pressure)): + SS.iloc[i, :] = SS.iloc[i, :] * Sen_pressure[i] + # SS[i,j]:节点nodes[i]的灵敏度*该节点到nodes[j]的水力距离 + return SS + + +# 2025/03/12 +# Step4: 传感器布置优化 +# Sensorplacement +# weight:分配权重 +# sensor:传感器布置的位置 +class Sensorplacement(wn_func): + """ + Sensorplacement 类继承了 wn_func 类,并且用于计算和优化传感器布置的位置。 + """ + def __init__(self, wn: wntr.network.WaterNetworkModel, sensornum: int): + """ + + :param wn: 由wntr生成的模型 + :param sensornum: 传感器的数量 + """ + wn_func.__init__(self, wn) + self.sensornum = sensornum + + # 1.某个节点到所有节点的加权距离之和 + # 2.某个节点到该组内所有节点的加权距离之和 + def sensor(self, SS: pandas.DataFrame, G: networkx.Graph, group: dict[int, list[str]]): + """ + sensor 方法是用来根据灵敏度矩阵 SS 和加权图 G 来确定传感器布置位置的 + :param SS: 灵敏度矩阵,每个节点的行和列代表不同节点,矩阵元素表示节点间的灵敏度。SS.iloc[i, :] 表示第 i 行对应节点 i 到所有其他节点的灵敏度 + :param G: 加权图,表示管网的拓扑结构,每个节点通过管道连接。图的边的权重通常是根据水力距离或者流量等计算的 + :param group: 节点分组,字典的键是分组编号,值是该组的节点名称列表 + :return: + """ + # 传感器布置个数以及位置 + # W = self.weight() + n = self.n - len(self.delnodes) + nodes = copy.deepcopy(self.nodes) + for node in self.delnodes: + nodes.remove(node) + # sumSS:list[float],每个节点到其他节点的灵敏度之和。SS.iloc[i, :] 返回第 i 个节点与所有其他节点的灵敏度值,sum(SS.iloc[i, :]) 计算这些灵敏度值的总和。 + sumSS = [] + for i in range(0, n): + sumSS.append(sum(SS.iloc[i, :])) + # 一个整数范围,表示每个节点的索引,用作sumSS_ DataFrame的索引 + indices = range(0, n) + # sumSS_:pandas.DataFrame,将 sumSS 转换成 DataFrame 格式,并且将节点的总灵敏度保存到 CSV 文件 sumSS_data.csv 中 + sumSS_ = pd.DataFrame(np.array(sumSS), index=indices) + sumSS_.to_csv('sumSS_data.csv') # 存储节点总灵敏度 + # sumSS:pandas.DataFrame,sumSS 被转换为 DataFrame 类型,并且按总灵敏度(即灵敏度之和)降序排列。此时,sumSS 是按节点的灵敏度之和排序的 DataFrame + sumSS = pd.DataFrame(np.array(sumSS), index=nodes) + sumSS = sumSS.sort_values(by=[0], ascending=[False]) + # sensorindex:list[str],用于存储根据灵敏度排序选出的传感器位置的节点名称,存储根据总灵敏度排序的节点列表,用于传感器布置 + sensorindex = [] + # sensorindex_2:list[str],用于存储每组内根据灵敏度排序选出的传感器位置的节点名称,存储每个组内根据灵敏度排序选择的传感器节点 + sensorindex_2 = [] + # group_S:dict[int, pandas.DataFrame],存储每个组内的灵敏度矩阵 + group_S = {} + # group_sumSS:dict[int, list[float]],存储每个组内节点的总灵敏度,值为每个组内节点灵敏度之和的列表 + group_sumSS = {} + for i in range(0, len(group)): + for node in self.delnodes: + # 这里的group[i]是每个组的节点列表,代码首先去除已经被标记为删除的节点self.delnodes + if node in group[i]: + group[i].remove(node) + group_S[i] = SS.loc[group[i], group[i]] + # 对每个组内的节点,计算组内节点的总灵敏度(group_sumSS[i])。它将每个组内节点的灵敏度值相加,并且按灵敏度降序排序 + group_sumSS[i] = [] + for j in range(0, len(group[i])): + group_sumSS[i].append(sum(group_S[i].iloc[j, :])) + group_sumSS[i] = pd.DataFrame(np.array(group_sumSS[i]), index=group[i]) + group_sumSS[i] = group_sumSS[i].sort_values(by=[0], ascending=[False]) + pass + + # 1.选sumSS最大的节点,然后把这个节点所在的那个组删掉,就可以不再从这个组选点。再重新排序选sumSS最大的; + # 2.在每组内选group_sumSS最大的节点 + # 在这个循环中,首先选择灵敏度最高的节点Smaxnode并添加到sensorindex。然后根据灵敏度排序,删除已选的节点并继续选择下一个灵敏度最大的节点。这个过程用于选择传感器的位置 + sensornum = self.sensornum + for i in range(0, sensornum): + # Smaxnode:str,最大灵敏度节点,sumSS.index[0] 表示灵敏度最高的节点 + Smaxnode = sumSS.index[0] + sensorindex.append(Smaxnode) + sensorindex_2.append(group_sumSS[i].index[0]) + + for key, value in group.items(): + if Smaxnode in value: + sumSS = sumSS.drop(index=group[key]) + continue + + sumSS = sumSS.sort_values(by=[0], ascending=[False]) + + return sensorindex, sensorindex_2 + + +# 2025/03/13 +def get_sensor_coord(name: str, sensor_num: int) -> dict[str, float]: + """ + 获取布置测压点的坐标,初始测压点布置根据灵敏度来布置,计算初始情况下的校准过程的error + :param name: 数据库名称 + :param sensor_num: 测压点数目 + :return: 测压点坐标字典 + """ + # inp_file_real:str,输入文件名,表示原始水力模型文件的路径,该文件格式为 EPANET 输入文件(.inp),包含管网的结构信息、节点、管道、泵等数据 + inp_file_real = f'./db_inp/{name}.db.inp' + # sensornum:int,需要布置的传感器数量 + # sensornum = sensor_num + # wn_real:wntr.network.WaterNetworkModel,加载 EPANET 水力模型 + wn_real = wntr.network.WaterNetworkModel(inp_file_real) # 真实粗糙度的原始管网 + # sim_real:wntr.sim.EpanetSimulator,创建一个水力仿真器对象 + sim_real = wntr.sim.EpanetSimulator(wn_real) + # results_real:wntr.sim.results.SimulationResults,运行仿真并返回结果 + results_real = sim_real.run_sim() + + # real_C:list[float],包含所有管道粗糙度的列表 + real_C = wn_real.query_link_attribute('roughness').tolist() + # wn_fun1:wn_func(继承自 object),创建 wn_func 类的实例,传入 wn_real 水力模型对象。wn_func 用于计算管网相关的水力属性,比如水力距离、灵敏度等 + wn_fun1 = wn_func(wn_real) + # nodes:list[str],管网的节点名称列表 + nodes = wn_fun1.nodes + # delnodes:list[str],被删除的节点(如水库、泵、阀门连接的节点等) + delnodes = wn_fun1.delnodes + # Coor_node:pandas.DataFrame + Coor_node = getCoor(wn_real) + Coor_node = Coor_node.drop(wn_fun1.delnodes) + nodes = [node for node in wn_fun1.nodes if node not in delnodes] + # coordinates:pandas.Series,存储所有节点的坐标,类型为 Series,索引为节点名称,值为 (x, y) 坐标对 + coordinates = wn_fun1.coordinates + + # 随机产生监测点 + # junctionnum:int,nodes 的长度,表示节点的数量 + junctionnum = len(nodes) + # random_numbers:list[int],使用 random.sample 随机选择 sensornum(20)个节点的编号。它返回一个不重复的随机编号列表 + # random_numbers = random.sample(range(junctionnum), sensor_num) + # for i in range(sensor_num): + # # print(random_numbers[i]) + + wn_fun1.get_Conn() + # hL:pandas.DataFrame,水力距离矩阵,表示每个节点到其他节点的水力阻力 + # G:networkx.DiGraph,加权有向图,表示管网的拓扑结构,节点之间的边带有权重 + hL, G = wn_fun1.CtoS() + # SS:pandas.DataFrame,灵敏度矩阵,表示每个节点对管网变化(如粗糙度、流量等)的响应 + SS = wn_fun1.Jaco(hL) + # group:dict[int, list[str]],使用 kgroup 函数将节点按坐标分成若干组,每组包含的节点数不一定相同。group 是一个字典,键为分组编号,值为节点名列表 + group = kgroup(Coor_node, sensor_num) + # wn_fun:Sensorplacement(继承自wn_func) + # 创建Sensorplacement类的实例,传入水力网络模型wn_real和传感器数量sensornum。Sensorplacement用于计算和布置传感器 + wn_fun = Sensorplacement(wn_real, sensor_num) + wn_fun.__dict__.update(wn_fun1.__dict__) + # sensorindex:list[str],初始传感器布置位置的节点名称 + # sensorindex_2:list[str],根据分组选择的传感器位置 + sensorindex, sensorindex_2 = wn_fun.sensor(SS, G, group) # 初始的sensorindex + # print(str(sensor_num), "个测压点,测压点位置:", sensorindex) + sensor_coord = {} + # 重新打开数据库 + if is_project_open(name=name): + close_project(name=name) + open_project(name=name) + for node_id in sensorindex: + sensor_coord[node_id] = get_node_coord(name=name, node_id=node_id) + close_project(name=name) + # print(sensor_coord) + return sensor_coord + + +if __name__ == '__main__': + sensor_coord = get_sensor_coord(name='bb', sensor_num=20) + print(sensor_coord) + # ''' + # 初始测压点布置根据灵敏度来布置,计算初始情况下的校准过程的error + # ''' + # + # # inp_file_real:str,输入文件名,表示原始水力模型文件的路径,该文件格式为 EPANET 输入文件(.inp),包含管网的结构信息、节点、管道、泵等数据 + # inp_file_real = './db_inp/bb.db.inp' + # # sensornum:int,需要布置的传感器数量 + # sensornum = 20 + # # wn_real:wntr.network.WaterNetworkModel,加载 EPANET 水力模型 + # wn_real = wntr.network.WaterNetworkModel(inp_file_real) # 真实粗糙度的原始管网 + # # sim_real:wntr.sim.EpanetSimulator,创建一个水力仿真器对象 + # sim_real = wntr.sim.EpanetSimulator(wn_real) + # # results_real:wntr.sim.results.SimulationResults,运行仿真并返回结果 + # results_real = sim_real.run_sim() + # + # # real_C:list[float],包含所有管道粗糙度的列表 + # real_C = wn_real.query_link_attribute('roughness').tolist() + # # wn_fun1:wn_func(继承自 object),创建 wn_func 类的实例,传入 wn_real 水力模型对象。wn_func 用于计算管网相关的水力属性,比如水力距离、灵敏度等 + # wn_fun1 = wn_func(wn_real) + # # nodes:list[str],管网的节点名称列表 + # nodes = wn_fun1.nodes + # # delnodes:list[str],被删除的节点(如水库、泵、阀门连接的节点等) + # delnodes = wn_fun1.delnodes + # # Coor_node:pandas.DataFrame + # Coor_node = getCoor(wn_real) + # Coor_node = Coor_node.drop(wn_fun1.delnodes) + # nodes = [node for node in wn_fun1.nodes if node not in delnodes] + # # coordinates:pandas.Series,存储所有节点的坐标,类型为 Series,索引为节点名称,值为 (x, y) 坐标对 + # coordinates = wn_fun1.coordinates + # + # # 随机产生监测点 + # # junctionnum:int,nodes 的长度,表示节点的数量 + # junctionnum = len(nodes) + # # random_numbers:list[int],使用 random.sample 随机选择 sensornum(20)个节点的编号。它返回一个不重复的随机编号列表 + # random_numbers = random.sample(range(junctionnum), sensornum) + # for i in range(sensornum): + # print(random_numbers[i]) + # + # wn_fun1.get_Conn() + # # hL:pandas.DataFrame,水力距离矩阵,表示每个节点到其他节点的水力阻力 + # # G:networkx.DiGraph,加权有向图,表示管网的拓扑结构,节点之间的边带有权重 + # hL, G = wn_fun1.CtoS() + # # SS:pandas.DataFrame,灵敏度矩阵,表示每个节点对管网变化(如粗糙度、流量等)的响应 + # SS = wn_fun1.Jaco(hL) + # # group:dict[int, list[str]],使用 kgroup 函数将节点按坐标分成若干组,每组包含的节点数不一定相同。group 是一个字典,键为分组编号,值为节点名列表 + # group = kgroup(Coor_node, sensornum) + # # wn_fun:Sensorplacement(继承自wn_func) + # # 创建Sensorplacement类的实例,传入水力网络模型wn_real和传感器数量sensornum。Sensorplacement用于计算和布置传感器 + # wn_fun = Sensorplacement(wn_real, sensornum) + # wn_fun.__dict__.update(wn_fun1.__dict__) + # # sensorindex:list[str],初始传感器布置位置的节点名称 + # # sensorindex_2:list[str],根据分组选择的传感器位置 + # sensorindex, sensorindex_2 = wn_fun.sensor(SS, G, group) # 初始的sensorindex + # print(str(sensornum), "个测压点,测压点位置:", sensorindex) + + # # 分区画图 + # colorlist = ['lightpink', 'coral', 'rosybrown', 'olive', 'powderblue', 'lightskyblue', 'steelblue', 'peachpuff','brown','silver','indigo','lime','gold','violet','maroon','navy','teal','magenta','cyan', + # 'burlywood', 'tan', 'slategrey', 'thistle', 'lightseagreen', 'lightgreen', 'red','blue','yellow','orange','purple','grey','green','pink','lightblue','beige','chartreuse','turquoise','lavender','fuchsia','coral'] + # G = wn_real.to_graph() + # G = G.to_undirected() # 变为无向图 + # pos = nx.get_node_attributes(G, 'pos') + # pass + # for i in range(0, sensornum): + # ax = plt.gca() + # ax.set_title(inp_file_real + str(sensornum)) + # nodes = nx.draw_networkx_nodes(G, pos, nodelist=group[i], node_color=colorlist[i], node_size=20) + # nodes = nx.draw_networkx_nodes(G, pos, + # nodelist=sensorindex_2, node_color='black', node_size=70, node_shape='*' + # ) + # edges = nx.draw_networkx_edges(G, pos) + # ax.spines['top'].set_visible(False) + # ax.spines['right'].set_visible(False) + # ax.spines['bottom'].set_visible(False) + # ax.spines['left'].set_visible(False) + # plt.savefig(inp_file_real + str(sensornum) + ".png") + # plt.show() + # + # wntr.graphics.plot_network(wn_real, node_attribute=sensorindex_2, node_size=50, node_labels=False, + # title=inp_file_real + '_Projetion' + str(sensornum)) + # plt.savefig(inp_file_real + '_S' + str(sensornum) + ".png") + # plt.show() \ No newline at end of file diff --git a/simulation.py b/simulation.py index 7baff42..5e7a10d 100644 --- a/simulation.py +++ b/simulation.py @@ -656,11 +656,15 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 修改工频泵的pattern fixed_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.fixed_pumps_id.values()), query_time=modify_pattern_start_time) + # print(fixed_pump_SCADA_data_dict) fixed_pump_dict = {key: fixed_pump_SCADA_data_dict[value] for key, value in globals.fixed_pumps_id.items()} + # print(fixed_pump_dict) for fixed_pump_name, value in fixed_pump_dict.items(): if value: pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) + print(pump_pattern) pump_pattern['factors'][modify_index] = float(value) + print(pump_pattern['factors'][modify_index]) cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) @@ -887,14 +891,15 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s tmp_file = './temp/simulation.result.out' shutil.copy(f'./temp/{name_c}.db.opt', tmp_file) output = Output(tmp_file) - node_result = output.node_results() link_result = output.link_results() + link_flow = [] + for link in link_result: + link_flow.append(link['result'][-1]['flow']) + print(link_flow) num_periods_result = output.times()['num_periods'] - print("simulation_type", simulation_type) print("before store result") - # print(num_periods_result) # print(node_result) # 存储 @@ -903,8 +908,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s elif simulation_type.upper() == 'EXTENDED': influxdb_api.store_scheme_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time, num_periods_result, scheme_Type, scheme_Name) - - print("after store result") + influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) if __name__ == "__main__": @@ -921,28 +925,29 @@ if __name__ == "__main__": globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns = get_realtime_region_patterns('bb', globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id) # 打印字典内容以验证 - print("Reservoirs ID:", globals.reservoirs_id) - print("Tanks ID:", globals.tanks_id) - print("Fixed Pumps ID:", globals.fixed_pumps_id) - print("Variable Pumps ID:", globals.variable_pumps_id) - print("Pressure ID:", globals.pressure_id) - print("Demand ID:", globals.demand_id) - print("Quality ID:", globals.quality_id) - print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id) - print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id) - print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns) - print("Source Outflow Region:", region_result) - print('Source Outflow Region ID:', globals.source_outflow_region_id) - print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns) - print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns) - print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id) - print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns) + # print("Reservoirs ID:", globals.reservoirs_id) + # print("Tanks ID:", globals.tanks_id) + # print("Fixed Pumps ID:", globals.fixed_pumps_id) + # print("Variable Pumps ID:", globals.variable_pumps_id) + # print("Pressure ID:", globals.pressure_id) + # print("Demand ID:", globals.demand_id) + # print("Quality ID:", globals.quality_id) + # print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id) + # print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id) + # print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns) + # print("Source Outflow Region:", region_result) + # print('Source Outflow Region ID:', globals.source_outflow_region_id) + # print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns) + # print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns) + # print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id) + # print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns) + # dump_inp(name='bb', inp="sensor_placement.inp", version='2') # 模拟示例1 run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-25T23:45:00+08:00') # 模拟示例2 - # run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-02-14T10:30:00+08:00', modify_total_duration=900, - # scheme_Type="burst_Analysis", scheme_Name="scheme1") + # run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-03-10T12:00:00+08:00', + # modify_total_duration=1800, scheme_Type="burst_Analysis", scheme_Name="scheme1") # 查询示例1:query_SCADA_ID_corresponding_info # result = query_SCADA_ID_corresponding_info(name='bb', SCADA_ID='P10755')