From 05569f8859518152a1c8340e927b2de6b9e0cd62 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Tue, 4 Mar 2025 20:59:45 +0800 Subject: [PATCH] Update influxdb_api --- influxdb_api.py | 812 +++++++++++++++++++++++++++++++++++++++--------- simulation.py | 199 ++---------- 2 files changed, 677 insertions(+), 334 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index 1131117..d56f382 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -79,7 +79,6 @@ def query_pg_scada_info_realtime(name: str) -> None: WHERE transmission_mode = 'realtime'; """) records = cur.fetchall() - # 清空全局列表 globals.reservoir_liquid_level_realtime_ids.clear() globals.tank_liquid_level_realtime_ids.clear() @@ -90,7 +89,6 @@ def query_pg_scada_info_realtime(name: str) -> None: globals.pressure_realtime_ids.clear() globals.demand_realtime_ids.clear() globals.quality_realtime_ids.clear() - # 根据 type 分类存储 api_query_id for record in records: record_type, api_query_id = record @@ -113,7 +111,6 @@ def query_pg_scada_info_realtime(name: str) -> None: globals.demand_realtime_ids.append(api_query_id) elif record_type == "quality": globals.quality_realtime_ids.append(api_query_id) - # 打印结果,方便调试 # print("Query completed. Results:") # print("Reservoir Liquid Level IDs:", globals.reservoir_liquid_level_realtime_ids) @@ -125,7 +122,6 @@ def query_pg_scada_info_realtime(name: str) -> None: # print("Pressure IDs:", globals.pressure_realtime_ids) # print("Demand IDs:", globals.demand_realtime_ids) # print("Quality IDs:", globals.quality_realtime_ids) - except Exception as e: print(f"查询时发生错误:{e}") @@ -142,7 +138,6 @@ def query_pg_scada_info_non_realtime(name: str) -> None: open_project(name) dic_time = get_time(name) globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP'] - close_project(name) # 连接数据库 conn_string = f"dbname={name} host=127.0.0.1" @@ -156,7 +151,6 @@ def query_pg_scada_info_non_realtime(name: str) -> None: WHERE transmission_mode = 'non_realtime'; """) records = cur.fetchall() - # 清空全局列表 globals.reservoir_liquid_level_non_realtime_ids.clear() globals.fixed_pump_non_realtime_ids.clear() @@ -166,10 +160,8 @@ def query_pg_scada_info_non_realtime(name: str) -> None: globals.pressure_non_realtime_ids.clear() globals.demand_non_realtime_ids.clear() globals.quality_non_realtime_ids.clear() - # 用于计算 transmission_frequency 最大值 transmission_frequencies = [] - # 根据 type 分类存储 api_query_id for record in records: record_type, api_query_id, freq = record @@ -190,14 +182,11 @@ def query_pg_scada_info_non_realtime(name: str) -> None: globals.demand_non_realtime_ids.append(api_query_id) elif record_type == "quality": globals.quality_non_realtime_ids.append(api_query_id) - # 收集 transmission_frequency,用于计算最大值 if freq is not None: transmission_frequencies.append(freq) - # 计算 transmission_frequency 最大值 globals.transmission_frequency = max(transmission_frequencies) if transmission_frequencies else None - # 打印结果,方便调试 # print("Query completed. Results:") # print("Reservoir Liquid Level Non-Realtime IDs:", globals.reservoir_liquid_level_non_realtime_ids) @@ -226,10 +215,8 @@ def delete_buckets(client: InfluxDBClient, org_name: str) -> None: """ # 定义需要删除的 bucket 名称列表 buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result'] - buckets_api = client.buckets_api() buckets_obj = buckets_api.find_buckets(org=org_name) - # 确保 buckets_obj 拥有 buckets 属性 if hasattr(buckets_obj, 'buckets'): for bucket in buckets_obj.buckets: @@ -255,25 +242,21 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None """ # 先删除原有的,然后再进行初始化 delete_buckets(client, org_name) - bucket_api = BucketsApi(client) write_api = client.write_api() org_api = OrganizationsApi(client) - # 获取 org_id org = next((o for o in org_api.find_organizations() if o.name == org_name), None) if not org: raise ValueError(f"Organization '{org_name}' not found.") org_id = org.id print(f"Using Organization ID: {org_id}") - # 定义 Buckets 信息 buckets = [ {"name": "SCADA_data", "retention_rules": []}, {"name": "realtime_simulation_result", "retention_rules": []}, {"name": "scheme_simulation_result", "retention_rules": []} ] - # 创建 Buckets 并初始化数据 for bucket in buckets: # 创建 Bucket @@ -283,7 +266,6 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None org_id=org_id ) print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}") - # 根据 Bucket 初始化数据 if bucket["name"] == "SCADA_data": point = Point("SCADA") \ @@ -296,7 +278,6 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .time("2024-11-21T00:00:00Z") write_api.write(bucket="SCADA_data", org=org_name, record=point) print("Initialized SCADA_data with default structure.") - elif bucket["name"] == "realtime_simulation_result": # realtime_simulation_result link_point = Point("link") \ .tag("date", None) \ @@ -311,7 +292,6 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("reaction", 0.0) \ .field("friction", 0.0) \ .time("2024-11-21T00:00:00Z") - node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ @@ -322,11 +302,9 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") - write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point) write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point) print("Initialized realtime_simulation_result with default structure.") - elif bucket["name"] == "scheme_simulation_result": link_point = Point("link") \ .tag("date", None) \ @@ -341,7 +319,6 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("setting", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") - node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ @@ -354,12 +331,9 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") - write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point) write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point) print("Initialized scheme_simulation_result with default structure.") - - print("All buckets created and initialized successfully.") @@ -415,26 +389,21 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str time.sleep(10) else: try_count = 100 - # 写入数据 if reservoir_liquid_level_realtime_data_list: for data in reservoir_liquid_level_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('reservoir_liquid_level_realtime') @@ -448,25 +417,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if tank_liquid_level_realtime_data_list: for data in tank_liquid_level_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('tank_liquid_level_realtime') @@ -480,25 +444,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if fixed_pump_realtime_data_list: for data in fixed_pump_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('fixed_pump_realtime') @@ -512,25 +471,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if variable_pump_realtime_data_list: for data in variable_pump_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('variable_pump_realtime') @@ -544,25 +498,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if source_outflow_realtime_data_list: for data in source_outflow_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('source_outflow_realtime') @@ -576,25 +525,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if pipe_flow_realtime_data_list: for data in pipe_flow_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('pipe_flow_realtime') @@ -608,25 +552,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if pressure_realtime_data_list: for data in pressure_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('pressure_realtime') @@ -640,25 +579,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if demand_realtime_data_list: for data in demand_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('demand_realtime') @@ -672,25 +606,20 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() - if quality_realtime_data_list: for data in quality_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) - # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) - # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) - # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] - # 创建Point对象 point = ( Point('quality_realtime') @@ -741,7 +670,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) write_api = client.write_api(write_options=SYNCHRONOUS) - # 将end_date字符串转换为datetime对象 end_date_dt = datetime.strptime(convert_time_format(get_history_data_end_time), '%Y-%m-%d %H:%M:%S') end_date = end_date_dt.strftime('%Y-%m-%d %H:%M:%S') @@ -751,7 +679,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S') # print(begin_date) # print(end_date) - reservoir_liquid_level_non_realtime_data_list = [] tank_liquid_level_non_realtime_data_list = [] fixed_pump_non_realtime_data_list = [] @@ -761,8 +688,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu pressure_non_realtime_data_list = [] demand_non_realtime_data_list = [] quality_non_realtime_data_list = [] - - try_count = 0 while try_count < 5: try: @@ -774,57 +699,48 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu ids=','.join(globals.reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') - if globals.tank_liquid_level_non_realtime_ids: tank_liquid_level_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.tank_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') - if globals.fixed_pump_non_realtime_ids: fixed_pump_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.fixed_pump_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') - if globals.variable_pump_non_realtime_ids: variable_pump_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.variable_pump_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') - if globals.source_outflow_non_realtime_ids: source_outflow_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.source_outflow_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') - if globals.pipe_flow_non_realtime_ids: pipe_flow_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.pipe_flow_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') # print(pipe_flow_non_realtime_data_list) - if globals.pressure_non_realtime_ids: pressure_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.pressure_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') # print(pressure_non_realtime_data_list) - if globals.demand_non_realtime_ids: demand_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.demand_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') - if globals.quality_non_realtime_ids: quality_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.quality_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') - except Exception as e: print(f"Attempt {try_count} failed with error: {e}") if try_count < 5: @@ -832,11 +748,9 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu time.sleep(10) else: print("Max retries reached. Exiting.") - else: print("Data fetched successfully.") break # 成功后退出循环 - if reservoir_liquid_level_non_realtime_data_list: for data in reservoir_liquid_level_non_realtime_data_list: # 创建Point对象 @@ -851,7 +765,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) - if tank_liquid_level_non_realtime_data_list: for data in tank_liquid_level_non_realtime_data_list: # 创建Point对象 @@ -866,7 +779,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) - if fixed_pump_non_realtime_data_list: for data in fixed_pump_non_realtime_data_list: # 创建Point对象 @@ -881,7 +793,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) - if variable_pump_non_realtime_data_list: for data in variable_pump_non_realtime_data_list: # 创建Point对象 @@ -896,7 +807,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) - if source_outflow_non_realtime_data_list: for data in source_outflow_non_realtime_data_list: # 创建Point对象 @@ -911,7 +821,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) - # if pipe_flow_non_realtime_data_list: for data in pipe_flow_non_realtime_data_list: # 创建Point对象 @@ -926,7 +835,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) - if pressure_non_realtime_data_list: for data in pressure_non_realtime_data_list: # 创建Point对象 @@ -941,7 +849,6 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) - if demand_non_realtime_data_list: for data in demand_non_realtime_data_list: # 创建Point对象 @@ -956,7 +863,410 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) + if quality_non_realtime_data_list: + for data in quality_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('quality_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + +# 2025/03/01 +def download_history_data_manually(begin_time: str, end_time: str, bucket: str = "SCADA_data", + client: InfluxDBClient = client) -> None: + """ + 获取某个时间段内所有SCADA设备的历史数据,非实时执行,手动补充数据版 + :param begin_time: 获取历史数据的开始时间,格式如'2024-11-25T09:00:00+08:00' + :param end_time: 获取历史数据的结束时间,格式如'2024-11-25T09:00:00+08:00' + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data" + :param client: 已初始化的 InfluxDBClient 实例 + :return: + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + write_api = client.write_api(write_options=SYNCHRONOUS) + begin_date = convert_time_format(begin_time) + end_date = convert_time_format(end_time) + + reservoir_liquid_level_realtime_data_list = [] + tank_liquid_level_realtime_data_list = [] + fixed_pump_realtime_data_list =[] + variable_pump_realtime_data_list =[] + source_outflow_realtime_data_list = [] + pipe_flow_realtime_data_list = [] + pressure_realtime_data_list =[] + demand_realtime_data_list = [] + quality_realtime_data_list = [] + + reservoir_liquid_level_non_realtime_data_list = [] + tank_liquid_level_non_realtime_data_list = [] + fixed_pump_non_realtime_data_list = [] + variable_pump_non_realtime_data_list = [] + source_outflow_non_realtime_data_list = [] + pipe_flow_non_realtime_data_list = [] + pressure_non_realtime_data_list = [] + demand_non_realtime_data_list = [] + quality_non_realtime_data_list = [] + + try_count = 0 + while try_count < 5: + try: + try_count += 1 + if globals.reservoir_liquid_level_realtime_ids: + reservoir_liquid_level_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.reservoir_liquid_level_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.tank_liquid_level_realtime_ids: + tank_liquid_level_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.tank_liquid_level_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.fixed_pump_realtime_ids: + fixed_pump_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.fixed_pump_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.variable_pump_realtime_ids: + variable_pump_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.variable_pump_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.source_outflow_realtime_ids: + source_outflow_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.source_outflow_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.pipe_flow_realtime_ids: + pipe_flow_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pipe_flow_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.pressure_realtime_ids: + pressure_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pressure_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.demand_realtime_ids: + demand_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.demand_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.quality_realtime_ids: + quality_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.quality_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( + # ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') + if globals.reservoir_liquid_level_non_realtime_ids: + reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.reservoir_liquid_level_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.tank_liquid_level_non_realtime_ids: + tank_liquid_level_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.tank_liquid_level_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.fixed_pump_non_realtime_ids: + fixed_pump_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.fixed_pump_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.variable_pump_non_realtime_ids: + variable_pump_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.variable_pump_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.source_outflow_non_realtime_ids: + source_outflow_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.source_outflow_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.pipe_flow_non_realtime_ids: + pipe_flow_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pipe_flow_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # print(pipe_flow_non_realtime_data_list) + if globals.pressure_non_realtime_ids: + pressure_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pressure_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # print(pressure_non_realtime_data_list) + if globals.demand_non_realtime_ids: + demand_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.demand_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.quality_non_realtime_ids: + quality_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.quality_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + except Exception as e: + print(f"Attempt {try_count} failed with error: {e}") + if try_count < 5: + print("Retrying in 10 seconds...") + time.sleep(10) + else: + print("Max retries reached. Exiting.") + else: + print("Data fetched successfully.") + break # 成功后退出循环 + + if reservoir_liquid_level_realtime_data_list: + for data in reservoir_liquid_level_realtime_data_list: + # 创建Point对象 + point = ( + Point('reservoir_liquid_level_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if tank_liquid_level_realtime_data_list: + for data in tank_liquid_level_realtime_data_list: + # 创建Point对象 + point = ( + Point('tank_liquid_level_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if fixed_pump_realtime_data_list: + for data in fixed_pump_realtime_data_list: + # 创建Point对象 + point = ( + Point('fixed_pump_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if variable_pump_realtime_data_list: + for data in variable_pump_realtime_data_list: + # 创建Point对象 + point = ( + Point('variable_pump_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if source_outflow_realtime_data_list: + for data in source_outflow_realtime_data_list: + # 创建Point对象 + point = ( + Point('source_outflow_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if pipe_flow_realtime_data_list: + for data in pipe_flow_realtime_data_list: + # 创建Point对象 + point = ( + Point('pipe_flow_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if pressure_realtime_data_list: + for data in pressure_realtime_data_list: + # 创建Point对象 + point = ( + Point('pressure_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if demand_realtime_data_list: + for data in demand_realtime_data_list: + # 创建Point对象 + point = ( + Point('demand_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if quality_realtime_data_list: + for data in quality_realtime_data_list: + # 创建Point对象 + point = ( + Point('quality_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if reservoir_liquid_level_non_realtime_data_list: + for data in reservoir_liquid_level_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('reservoir_liquid_level_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if tank_liquid_level_non_realtime_data_list: + for data in tank_liquid_level_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('tank_liquid_level_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if fixed_pump_non_realtime_data_list: + for data in fixed_pump_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('fixed_pump_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if variable_pump_non_realtime_data_list: + for data in variable_pump_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('variable_pump_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if source_outflow_non_realtime_data_list: + for data in source_outflow_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('source_outflow_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if pipe_flow_non_realtime_data_list: + for data in pipe_flow_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('pipe_flow_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if pressure_non_realtime_data_list: + for data in pressure_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('pressure_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) + if demand_non_realtime_data_list: + for data in demand_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('demand_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time']) + ) + write_api.write(bucket=bucket, org=org_name, record=point) if quality_non_realtime_data_list: for data in quality_non_realtime_data_list: # 创建Point对象 @@ -996,10 +1306,8 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time # DingZQ temp change delta to 5 from 1 utc_start_time = utc_time - timedelta(seconds=5) utc_stop_time = utc_time + timedelta(seconds=5) - # 构建查询字典 SCADA_result_dict = {} - for device_id in query_ids_list: # 构建 Flux 查询语句 flux_query = f''' @@ -1008,11 +1316,9 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time |> filter(fn: (r) => r["device_ID"] == "{device_id}") |> filter(fn: (r) => r["_field"] == "monitored_value") ''' - # 执行查询 try: result = query_api.query(flux_query) - # 从查询结果中提取 monitored_value if result: # 假设返回的结果为一行数据 @@ -1027,7 +1333,6 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time except Exception as e: print(f"Error querying InfluxDB for device ID {device_id}: {e}") SCADA_result_dict[device_id] = None - return SCADA_result_dict # DingZQ, 2025-02-15 @@ -1136,7 +1441,6 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str # 提取节点信息和结果数据 node_id = result.get('node') data_list = result.get('result', []) - for data in data_list: # 构建 Point 数据,多个 field 存在于一个数据点中 node_point = Point("node") \ @@ -1149,7 +1453,6 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ .time(time_beijing) - # 写入数据到 InfluxDB,多个 field 在同一个 point 中 write_api.write(bucket=bucket, org=org_name, record=node_point) write_api.flush() @@ -1157,7 +1460,6 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str for result in link_result_list: link_id = result.get('link') data_list = result.get('result', []) - for data in data_list: link_point = Point("link") \ .tag("date", date_str) \ @@ -1174,7 +1476,6 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str write_api.write(bucket=bucket, org=org_name, record=link_point) write_api.flush() print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") - except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") @@ -1215,7 +1516,6 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati # 解析查询结果 for table in tables: for record in table.records: - return { "time": record["_time"], "nodeID": ID, @@ -1335,7 +1635,6 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r "actualdemand": record["actualdemand"], "quality": record["quality"] }) - # 处理 link 数据 elif measurement == "link": link_records.append({ @@ -1350,9 +1649,61 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r "reaction": record["reaction"], "friction": record["friction"] }) - return node_records, link_records + +# 2025/03/03 WMH +def query_all_record_by_time_property(query_time: str, type: str, property: str, bucket: str="realtime_simulation_result", + client: InfluxDBClient=client) -> list: + """ + 查询指定北京时间的所有记录,查询 'node' 或 'link' 的某一属性值,以指定格式返回。 + :param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param type: (str): 查询的类型(决定 measurement) + :param property: (str): 查询的字段名称(field) + :param bucket: (str): 数据存储的 bucket 名称。 + :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 + :return: list(dict): result_records + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + result_records.append({ + "ID": record["ID"], + "value": record["_value"] + }) + return result_records + + # 2025/02/21 def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple: """ @@ -1362,7 +1713,6 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r :param client: 已初始化的InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ - if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1396,8 +1746,6 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r for record in table.records: # print(record.values) # 打印完整记录内容 measurement = record["_measurement"] - print(measurement) - # 处理 node 数据 if measurement == "node": node_records.append({ @@ -1422,9 +1770,60 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r "reaction": record["reaction"], "friction": record["friction"] }) - return node_records, link_records + +# 2025/02/21 WMH +def query_all_record_by_date_property(query_date: str, type: str, property: str, + bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list: + """ + 查询指定日期的‘node’或‘link’的某一属性值的所有记录,以指定的格式返回 + :param query_date: 输入的日期,格式为‘2025-02-14’ + :param type: (str): 查询的类型(决定 measurement) + :param property: (str): 查询的字段名称(field) + :param bucket: 数据存储的bucket名称 + :param client: 已初始化的InfluxDBClient 实例。 + :return: list(dict): result_records + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 将 start_date 的北京时间转换为 UTC 时间 + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r["date"] == "{query_date}") + |> filter(fn: (r) => r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + result_records.append({ + "time": record["_time"], + "ID": record["ID"], + "value": record["_value"] + }) + return result_records + + # 2025/02/01 def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list: """ @@ -1452,16 +1851,13 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star measurement = "link" else: raise ValueError(f"不支持的类型: {type}") - # 解析日期范围(当天的 UTC 开始和结束时间) # previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1) # start_time = previous_day.isoformat() + "T16:00:00Z" # stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z" - # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() - # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") @@ -1470,10 +1866,8 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star |> filter(fn: (r) => r["ID"] == "{ID}") |> filter(fn: (r) => r["_field"] == "{property}") ''' - # 执行查询 tables = query_api.query(flux_query) - # 解析查询结果 results = [] for table in tables: @@ -1482,7 +1876,6 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star "time": record["_time"], "value": record["_value"] }) - return results def query_buckets(client: InfluxDBClient=client) -> list[str]: @@ -1511,10 +1904,7 @@ def query_measurements(bucket: str, client: InfluxDBClient=client) -> list[str]: measurements = [row.values["_value"] for table in result for row in table.records] return measurements - - - -# WMH 2025/02/13 +# 2025/02/13 def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], scheme_start_time: str, num_periods: int = 1, scheme_Type: str = None, scheme_Name: str = None, bucket: str = "scheme_simulation_result", client: InfluxDBClient = client): @@ -1536,16 +1926,12 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - try: write_api = client.write_api() - date_str = scheme_start_time.split('T')[0] time_beijing = datetime.strptime(scheme_start_time, '%Y-%m-%dT%H:%M:%S%z') - timestep_parts = globals.hydraulic_timestep.split(':') timestep = timedelta(hours=int(timestep_parts[0]), minutes=int(timestep_parts[1]), seconds=int(timestep_parts[2])) - for node_result in node_result_list: # 提取节点信息和数据结果 node_id = node_result.get('node') @@ -1570,7 +1956,6 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, # 写入数据到 InfluxDB,多个 field 在同一个 point 中 write_api.write(bucket=bucket, org=org_name, record=node_point) write_api.flush() - for link_result in link_result_list: link_id = link_result.get('link') for period_index in range(num_periods): @@ -1593,7 +1978,6 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, .time(scheme_time) write_api.write(bucket=bucket, org=org_name, record=link_point) write_api.flush() - except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") @@ -1708,9 +2092,64 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti "reaction": record["reaction"], "friction": record["friction"] }) - return node_records, link_records + +# 2025/03/04 WMH +def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, query_time: str, type: str, property: str, + bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: + """ + 查询指定方案某一时刻‘node'或‘link’某一属性值,以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param type: 查询的类型(决定 measurement) + :param property: 查询的字段名称(field) + :param bucket: 数据存储的 bucket 名称。 + :param client: 已初始化的 InfluxDBClient 实例。 + :return: dict: tuple: (node_records, link_records) + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") + |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + result_records.append({ + "ID": record["ID"], + "value": record["_value"] + }) + return result_records + + # 2025/02/19 def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: str, type: str, property: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: @@ -1829,6 +2268,57 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="sch return node_records, link_records + +# 2025/03/04 WMH +def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, type: str, property: str, + bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: + """ + 查询指定方案的‘node'或‘link’的某一属性值,以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param type: 查询的类型(决定 measurement) + :param property: 查询的字段名称(field) + :param bucket: 数据存储的 bucket 名称。 + :param client: 已初始化的 InfluxDBClient 实例。 + :return: dict: tuple: (node_records, link_records) + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: 2025-01-01T00:00:00Z) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") + |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + result_records.append({ + "time": record["_time"], + "ID": record["ID"], + "value": record["_value"] + }) + return result_records + + # 2025/02/16 def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> None: """ @@ -1878,9 +2368,9 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_ writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'description', 'device_ID', 'monitored_value', 'datacleaning_value', 'simulation_value']) writer.writeheader() writer.writerows(rows) - print(f"Data exported to {csv_filename} successfully.") + # 2025/02/17 def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> None: """ @@ -1950,7 +2440,6 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc row = {'time': key[0], "ID": key[1]} row.update(node_data.get(key, {})) node_rows.append(row) - # 动态生成 CSV 文件名 csv_filename_link = f"realtime_simulation_link_result_{start_date}至{end_date}.csv" csv_filename_node = f"realtime_simulation_node_result_{start_date}至{end_date}.csv" @@ -1964,9 +2453,9 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc 'demanddeficit', 'totalExternalOutflow', 'quality']) writer.writeheader() writer.writerows(node_rows) - print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + # 2025/02/18 def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: """ @@ -2031,7 +2520,6 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, node_data[key]['date'] = record.values.get('date', None) node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) - for key in set(link_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(link_data.get(key, {})) @@ -2040,7 +2528,6 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, row = {'time': key[0], "ID": key[1]} row.update(node_data.get(key, {})) node_rows.append(row) - # 动态生成 CSV 文件名 csv_filename_link = f"scheme_simulation_link_result_{start_date}至{end_date}.csv" csv_filename_node = f"scheme_simulation_node_result_{start_date}至{end_date}.csv" @@ -2054,9 +2541,9 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, 'demanddeficit', 'totalExternalOutflow', 'quality']) writer.writeheader() writer.writerows(node_rows) - print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + # 2025/02/18 def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: """ @@ -2122,7 +2609,6 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: node_data[key]['date'] = record.values.get('date', None) node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) - for key in set(link_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(link_data.get(key, {})) @@ -2131,7 +2617,6 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: row = {'time': key[0], "ID": key[1]} row.update(node_data.get(key, {})) node_rows.append(row) - # 动态生成 CSV 文件名 csv_filename_link = f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv" csv_filename_node = f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv" @@ -2145,9 +2630,9 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: 'demanddeficit', 'totalExternalOutflow', 'quality']) writer.writeheader() writer.writerows(node_rows) - print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + # 示例调用 if __name__ == "__main__": url = influxdb_info.url @@ -2157,26 +2642,31 @@ if __name__ == "__main__": client = InfluxDBClient(url=url, token=token) # step1: 检查连接状态,初始化influxdb的buckets - try: - delete_buckets(client, org_name) - create_and_initialize_buckets(client, org_name) - except Exception as e: - print(f"连接失败: {e}") - finally: - client.close() + # try: + # # delete_buckets(client, org_name) + # create_and_initialize_buckets(client, org_name) + # except Exception as e: + # print(f"连接失败: {e}") + # finally: + # client.close() # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 - # query_pg_scada_info_realtime('bb') - # query_pg_scada_info_non_realtime('bb') + query_pg_scada_info_realtime('bb') + query_pg_scada_info_non_realtime('bb') - # 手动执行 - # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-07T16:52:00+08:00') + # 手动执行存储测试 + # 示例1:store_realtime_SCADA_data_to_influxdb + # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-25T23:45:00+08:00') + + # 示例2:store_non_realtime_SCADA_data_to_influxdb # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00') - # step3: 查询测试示例 - with InfluxDBClient(url=url, token=token, org=org_name, timeout=1000*10000) as client: + # 示例3:download_history_data_manually + # download_history_data_manually(begin_time='2025-02-25T00:00:00+08:00', end_time='2025-02-26T00:00:00+08:00') + + # step3: 查询测试示例 + # with InfluxDBClient(url=url, token=token, org=org_name) as client: - # create_and_initialize_buckets(client=client, org_name=org_name) # # 示例1:query_latest_record_by_ID # bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 # node_id = "ZBBDTZDP000022" # 查询的节点 ID @@ -2191,7 +2681,7 @@ if __name__ == "__main__": # print("未找到符合条件的记录。") # 示例2:query_all_record_by_time - # node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00") + # node_records, link_records = query_all_record_by_time(query_time="2025-02-14T10:30:00+08:00") # print("Node 数据:", node_records) # print("Link 数据:", link_records) @@ -2201,7 +2691,7 @@ if __name__ == "__main__": # print(curve_result) # 示例4:query_SCADA_data_by_device_ID_and_time - # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-14T23:58:00+08:00') + # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-25T23:45:00+08:00') # print(SCADA_result_dict) # 示例5:query_SCADA_data_curve @@ -2240,8 +2730,22 @@ if __name__ == "__main__": # print("Node 数据:", node_records) # print("Link 数据:", link_records) - results = query_all_record_by_date('2025-02-23', client=client) - # results = query_all_record_by_time('2025-02-12T14:15:00+08:00', client=client) - print(results) + # 示例14:query_all_record_by_time_property + # result_records = query_all_record_by_time_property(query_time='2025-02-25T23:45:00+08:00', type='node', property='head') + # print(result_records) + + # 示例15:query_all_record_by_date_property + # result_records = query_all_record_by_date_property(query_date='2025-02-27', type='node', property='head') + # print(result_records) + + # 示例16:query_scheme_all_record_by_time_property + # result_records = query_scheme_all_record_by_time_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', + # query_time='2025-02-14T10:30:00+08:00', type='node', property='head') + # print(result_records) + + # 示例17:query_scheme_all_record_property + # result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', type='node', property='head') + # print(result_records) + diff --git a/simulation.py b/simulation.py index 7f6bbac..9e2a58a 100644 --- a/simulation.py +++ b/simulation.py @@ -38,11 +38,9 @@ def query_corresponding_element_id_and_query_id(name: str) -> None: WHERE transmission_mode = 'realtime'; """) records = cur.fetchall() - # 遍历查询结果,并根据 type 将数据存储到相应的字典中 for record in records: type_, associated_element_id, api_query_id = record - if type_ == 'reservoir_liquid_level': globals.reservoirs_id[associated_element_id] = api_query_id elif type_ == 'tank_liquid_level': @@ -84,11 +82,9 @@ def query_corresponding_pattern_id_and_query_id(name: str) -> None: AND type IN ('source_outflow', 'pipe_flow'); """) records = cur.fetchall() - # 遍历查询结果,并根据 type 将数据存储到相应的字典中 for record in records: type_, associated_pattern, api_query_id = record - if type_ == 'source_outflow': globals.source_outflow_pattern_id[associated_pattern] = api_query_id elif type_ == 'pipe_flow': @@ -109,7 +105,6 @@ def query_non_realtime_region(name: str) -> dict: source_outflow_regions = [] # 用于存储所有 region(包含重复的) # 构建连接字符串 conn_string = f"dbname={name} host=127.0.0.1" - try: # 连接到数据库 with psycopg.connect(conn_string) as conn: @@ -121,26 +116,21 @@ def query_non_realtime_region(name: str) -> dict: WHERE transmission_mode = 'non_realtime' AND type = 'pipe_flow'; """) - records = cur.fetchall() col_names = [desc.name for desc in cur.description] - # 找出所有以 'associated_source_outflow_id' 开头的列 source_outflow_cols = [col for col in col_names if col.startswith('associated_source_outflow_id')] logging.info(f"Identified source_outflow columns: {source_outflow_cols}") - for record in records: # 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None values = [record[col_names.index(col)] for col in source_outflow_cols if record[col_names.index(col)] is not None] - # 如果该记录有相关的值,则将其作为一个 region if values: # 将值排序以确保相同的组合顺序一致(如果顺序不重要) # 如果顺序重要,请删除排序步骤 region_tuple = tuple(sorted(values)) source_outflow_regions.append(region_tuple) - # 移除重复的 regions unique_regions = [] seen = set() @@ -148,18 +138,15 @@ def query_non_realtime_region(name: str) -> dict: if region not in seen: seen.add(region) unique_regions.append(region) - # 为每个唯一的 region 分配一个 region 键 for idx, region in enumerate(unique_regions, 1): region_key = f"region{idx}" globals.source_outflow_region[region_key] = list(region) - logging.info("查询并处理数据成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") - return globals.source_outflow_region @@ -176,7 +163,6 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c globals.non_realtime_region_patterns = {region: [] for region in globals.source_outflow_region.keys()} region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()} conn_string = f"dbname={name} host=127.0.0.1" - try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -186,31 +172,24 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c FROM scada_info WHERE transmission_mode = 'non_realtime' """) - records = cur.fetchall() col_names = [desc.name for desc in cur.description] - # 找出所有以指定前缀开头的列 source_outflow_cols = [col for col in col_names if col.startswith(column_prefix)] logging.info(f"Identified source_outflow columns: {source_outflow_cols}") - # 确保 'associated_pattern' 列存在 if 'associated_pattern' not in col_names: logging.error("'associated_pattern' column not found in scada_info table.") return globals.non_realtime_region_patterns - # 获取 'associated_pattern' 列的索引 pattern_idx = col_names.index('associated_pattern') - for record in records: # 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None values = [record[col_names.index(col)] for col in source_outflow_cols if record[col_names.index(col)] is not None] - if values: # 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配 region_frozenset = frozenset(values) - # 检查是否存在匹配的 region region_key = region_tuple_to_key.get(region_frozenset) if region_key: @@ -218,20 +197,16 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c associated_pattern = record[pattern_idx] if associated_pattern is not None: globals.non_realtime_region_patterns[region_key].append(associated_pattern) - logging.info("生成 regions_patterns 成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") - # 获取pipe_flow_region_patterns中的所有区域 exclude_regions = set(region for regions in globals.pipe_flow_region_patterns.values() for region in regions) - # 从non_realtime_region_patterns中去除这些区域 for region_key, regions in globals.non_realtime_region_patterns.items(): globals.non_realtime_region_patterns[region_key] = [region for region in regions if region not in exclude_regions] - return globals.non_realtime_region_patterns @@ -249,7 +224,6 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi # 创建一个映射,从 frozenset(ids) 到 region_key region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()} conn_string = f"dbname={name} host=127.0.0.1" - try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -260,31 +234,24 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi WHERE transmission_mode = 'realtime' AND type IN ('pipe_flow', 'demand'); """) - records = cur.fetchall() col_names = [desc.name for desc in cur.description] - # 找出所有以指定前缀开头的列 source_outflow_cols = [col for col in col_names if col.startswith(column_prefix)] logging.info(f"Identified source_outflow columns: {source_outflow_cols}") - # 确保 'api_query_id' 列存在 if 'api_query_id' not in col_names: logging.error("'api_query_id' column not found in scada_info table.") return globals.realtime_region_pipe_flow_and_demand_id - # 获取 'api_query_id' 列的索引 api_query_id_idx = col_names.index('api_query_id') - for record in records: # 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None values = [record[col_names.index(col)] for col in source_outflow_cols if record[col_names.index(col)] is not None] - if values: # 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配 region_frozenset = frozenset(values) - # 检查是否存在匹配的 region region_key = region_tuple_to_key.get(region_frozenset) if region_key: @@ -292,13 +259,11 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi api_query_id = record[api_query_id_idx] if api_query_id is not None: globals.realtime_region_pipe_flow_and_demand_id[region_key].append(api_query_id) - logging.info("生成 realtime_region_pipe_flow_and_demand_id 成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") - return globals.realtime_region_pipe_flow_and_demand_id @@ -316,7 +281,6 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_ :return: pipe_flow_region_patterns 字典 """ conn_string = f"dbname={name} host=127.0.0.1" - try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -327,18 +291,14 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_ WHERE type = 'demand' AND transmission_mode = 'non_realtime'; """) - records = cur.fetchall() col_names = [desc.name for desc in cur.description] - # 获取列索引 pattern_idx = col_names.index('associated_pattern') pipe_flow_id_idx = col_names.index('associated_pipe_flow_id') - for record in records: associated_pattern = record[pattern_idx] associated_pipe_flow_id = record[pipe_flow_id_idx] - if associated_pipe_flow_id: # 根据 associated_pipe_flow_id 查询对应的记录 cur.execute(""" @@ -346,26 +306,20 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_ FROM scada_info WHERE associated_element_id = %s; """, (associated_pipe_flow_id,)) - pipe_flow_record = cur.fetchone() if pipe_flow_record: pipe_flow_associated_pattern = pipe_flow_record[0] transmission_mode = pipe_flow_record[1] - if transmission_mode == 'realtime': # 将 associated_pattern 记录到字典中 if pipe_flow_associated_pattern not in globals.pipe_flow_region_patterns: globals.pipe_flow_region_patterns[pipe_flow_associated_pattern] = [] - globals.pipe_flow_region_patterns[pipe_flow_associated_pattern].append(associated_pattern) - - logging.info("生成 pipe_flow_region_patterns 成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") - return globals.pipe_flow_region_patterns @@ -389,10 +343,8 @@ def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict: WHERE id = %s """ cur.execute(query, (SCADA_ID,)) # 执行查询并传递参数 - # 获取查询结果 result = cur.fetchone() - if result: # 将结果转换为字典 associated_info = { @@ -425,13 +377,10 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, all_ids = set() for ids in globals.source_outflow_region.values(): all_ids.update(ids) - if not all_ids: logging.warning("No associated_source_outflow_id found in source_outflow_region.") return globals.source_outflow_region_id - conn_string = f"dbname={name} host=127.0.0.1" - try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -443,7 +392,6 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, """ cur.execute(query, (list(all_ids),)) rows = cur.fetchall() - # 构建 associated_source_outflow_id 到 api_query_id 的映射 id_to_api_query_id = {} for row in rows: @@ -451,7 +399,6 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, api_query_id = row[1] if associated_id in all_ids and api_query_id is not None: id_to_api_query_id[associated_id] = str(api_query_id) - # 替换 source_outflow_region 中的 associated_source_outflow_id 为 api_query_id for region, ids in globals.source_outflow_region.items(): for id_ in ids: @@ -460,12 +407,10 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, globals.source_outflow_region_id[region].append(api_id) else: logging.warning(f"No api_query_id found for associated_source_outflow_id: {id_}") - except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") - return globals.source_outflow_region_id @@ -483,7 +428,6 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real globals.source_outflow_region_patterns = {region: [] for region in globals.source_outflow_region_id.keys()} globals.realtime_region_pipe_flow_and_demand_patterns = {region: [] for region in globals.realtime_region_pipe_flow_and_demand_id.keys()} - conn_string = f"dbname={name} host=127.0.0.1" try: with psycopg.connect(conn_string) as conn: @@ -503,7 +447,6 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real globals.source_outflow_region_patterns[region] = [ associated_pattern for _, associated_pattern in results if associated_pattern ] - # 获取 realtime_region_pipe_flow_and_demand_id 的 api_query_id 并查询 associated_pattern realtime_api_ids = globals.realtime_region_pipe_flow_and_demand_id[region] if realtime_api_ids: @@ -517,13 +460,11 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real globals.realtime_region_pipe_flow_and_demand_patterns[region] = [ associated_pattern for _, associated_pattern in results if associated_pattern ] - logging.info("生成 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 成功。") except psycopg.Error as e: logging.error(f"数据库连接或查询出错: {e}") except Exception as ex: logging.error(f"处理数据时出错: {ex}") - return globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns @@ -556,6 +497,7 @@ def get_pattern_index_str(current_time: str) -> str: str_i = '{}:{}:00'.format(hrN_str, minN_str) return str_i + def from_seconds_to_clock (secs: int)->str: """ 从秒格式化为“HH:MM:00”字符串 @@ -620,7 +562,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s time_cost_start = time.perf_counter() print('{} -- Hydraulic simulation started.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) - # 重新打开数据库 if is_project_open(name): close_project(name) @@ -646,12 +587,10 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s time_obj = datetime.strptime(globals.hydraulic_timestep, '%H:%M:%S') # 转换为分钟浮点数 globals.PATTERN_TIME_STEP = float(time_obj.hour * 60 + time_obj.minute + time_obj.second / 60) - # 对输入的时间参数进行处理 pattern_start_time = convert_time_format(modify_pattern_start_time) # 获取模拟开始时间是对应pattern的第几个数 modify_index = get_pattern_index(pattern_start_time) - # 遍历水泵的pattern_id,并根据输入的pump_pattern修改pattern的值 # for pump_pattern_id in pump_pattern_ids: # # 检查pump_pattern中pump_pattern_id对应的第一个频率值是否为有效数字(非空、非NaN)。如果该值有效,则继续执行代码块。 @@ -664,7 +603,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # cs = ChangeSet() # cs.append(pump_pattern) # set_pattern(name_c, cs) - # 修改模拟开始的时间 str_pattern_start = get_pattern_index_str(convert_time_format(modify_pattern_start_time)) dic_time = get_time(name_c) @@ -675,17 +613,14 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.operations.append(dic_time) set_time(name_c, cs) - # 根据SCADA实时数据进行修改,如果没有对应的SCADA数据,如未来的时间点,则不改变pg数据库的数据 if globals.reservoirs_id: # reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'} # 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'} reservoir_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.reservoirs_id.values()), query_time=modify_pattern_start_time) - # 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'} reservoir_dict = {key: reservoir_SCADA_data_dict[value] for key, value in globals.reservoirs_id.items()} - # 3.修改reservoir液位模式 for reservoir_name, value in reservoir_dict.items(): if value and float(value) != 0: @@ -695,14 +630,11 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(reservoir_pattern) set_pattern(name_c, cs) - if globals.tanks_id: # 修改tank初始液位 tank_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.tanks_id.values()), query_time=modify_pattern_start_time) - tank_dict = {key: tank_SCADA_data_dict[value] for key, value in globals.tanks_id.items()} - for tank_name, value in tank_dict.items(): if value and float(value) != 0: tank = get_tank(name_c, tank_name) @@ -710,14 +642,11 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(tank) set_tank(name_c, cs) - if globals.fixed_pumps_id: # 修改工频泵的pattern fixed_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.fixed_pumps_id.values()), query_time=modify_pattern_start_time) - fixed_pump_dict = {key: fixed_pump_SCADA_data_dict[value] for key, value in globals.fixed_pumps_id.items()} - for fixed_pump_name, value in fixed_pump_dict.items(): if value: pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) @@ -725,31 +654,23 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) - if globals.variable_pumps_id: # 修改变频泵的pattern variable_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time) - # print(variable_pump_SCADA_data_dict) - variable_pump_dict = {key: variable_pump_SCADA_data_dict[value] for key, value in globals.variable_pumps_id.items()} - # print(variable_pump_dict) - for variable_pump_name, value in variable_pump_dict.items(): if value: - pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) + pump_pattern = get_pattern(name_c, get_pump(name_c, variable_pump_name)['pattern']) pump_pattern['factors'][modify_index] = float(value) / 50 cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) - if globals.demand_id: # 基于实时数据,修改大用户节点的pattern demand_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.demand_id.values()), query_time=modify_pattern_start_time) - demand_dict = {key: demand_SCADA_data_dict[value] for key, value in globals.demand_id.items()} - for demand_name, value in demand_dict.items(): if value: demand_pattern = get_pattern(name_c, get_demand(name_c, demand_name)['pattern']) @@ -760,19 +681,15 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(demand_pattern) set_pattern(name_c, cs) - # 水质、压力实时数据使用方法待补充 ############################# - if globals.source_outflow_pattern_id: # 基于实时的出厂流量计数据,修改出厂流量计绑定的pattern source_outflow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time) # print(source_outflow_SCADA_data_dict) - source_outflow_dict = {key: source_outflow_SCADA_data_dict[value] for key, value in globals.source_outflow_pattern_id.items()} # print(source_outflow_dict) - for pattern_name in source_outflow_dict.keys(): # print(pattern_name) history_source_outflow_list = get_history_pattern_info(name_c, pattern_name) @@ -780,36 +697,28 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # print(source_outflow_dict[pattern_name]) if source_outflow_dict[pattern_name]: realtime_source_outflow = float(source_outflow_dict[pattern_name]) - multiply_factor = realtime_source_outflow / history_source_outflow - pattern = get_pattern(name_c, pattern_name) pattern['factors'][modify_index] *= multiply_factor cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) - if globals.realtime_pipe_flow_pattern_id: # 基于实时的pipe_flow类数据,修改pipe_flow类绑定的pattern realtime_pipe_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time) - realtime_pipe_flow_dict = {key: realtime_pipe_flow_SCADA_data_dict[value] for key, value in globals.realtime_pipe_flow_pattern_id.items()} - for pattern_name in realtime_pipe_flow_dict.keys(): history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name) history_pipe_flow = history_pipe_flow_list[modify_index] if realtime_pipe_flow_dict[pattern_name]: realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name]) - multiply_factor = realtime_pipe_flow / history_pipe_flow - pattern = get_pattern(name_c, pattern_name) pattern['factors'][modify_index] *= multiply_factor cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) - if globals.pipe_flow_region_patterns: # 基于实时的pipe_flow类数据,修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern temp_realtime_pipe_flow_pattern_id = {} @@ -818,20 +727,15 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 获取对应的实时值 query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region) temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id - temp_realtime_pipe_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(temp_realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time) - temp_realtime_pipe_flow_dict = {key: temp_realtime_pipe_flow_SCADA_data_dict[value] for key, value in temp_realtime_pipe_flow_pattern_id.items()} - for pattern_name in temp_realtime_pipe_flow_dict.keys(): temp_history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name) temp_history_pipe_flow = temp_history_pipe_flow_list[modify_index] if temp_realtime_pipe_flow_dict[pattern_name]: temp_realtime_pipe_flow = float(temp_realtime_pipe_flow_dict[pattern_name]) - temp_multiply_factor = temp_realtime_pipe_flow / temp_history_pipe_flow - temp_non_realtime_demand_pattern_list = globals.pipe_flow_region_patterns[pattern_name] for demand_pattern_name in temp_non_realtime_demand_pattern_list: pattern = get_pattern(name_c, demand_pattern_name) @@ -839,7 +743,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) - if globals.source_outflow_region: # 根据associated_source_outflow_id进行分区,各分区用(出厂的流量计 - 实时的pipe_flow和demand)进行数据更新 for region in globals.source_outflow_region.keys(): @@ -848,13 +751,10 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s temp_source_outflow_region_patterns = globals.source_outflow_region_patterns.get(region, []) temp_realtime_region_pipe_flow_and_demand_patterns = globals.realtime_region_pipe_flow_and_demand_patterns.get(region, []) temp_non_realtime_region_patterns = globals.non_realtime_region_patterns.get(region, []) - region_source_outflow_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=temp_source_outflow_region_id, query_time=modify_pattern_start_time) - region_realtime_region_pipe_flow_and_demand_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=temp_realtime_region_pipe_flow_and_demand_id, query_time=modify_pattern_start_time) - # 2025/02/12 确保 region_source_outflow_data_dict 和 # region_realtime_region_pipe_flow_and_demand_data_dict中的每个值都不是 None 且不为 0 region_source_outflow_valid_values = [float(value) for value in region_source_outflow_data_dict.values() if value not in [None, 0]] @@ -866,13 +766,11 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s for source_outflow_pattern_name in temp_source_outflow_region_patterns: temp_history_source_outflow_list = get_history_pattern_info(name_c, source_outflow_pattern_name) history_region_total_source_outflow += temp_history_source_outflow_list[modify_index] - region_total_realtime_region_pipe_flow_and_demand = sum(valid_values) history_region_total_realtime_region_pipe_flow_and_demand = 0 for pipe_flow_and_demand_pattern_name in temp_realtime_region_pipe_flow_and_demand_patterns: temp_history_pipe_flow_and_demand_list = get_history_pattern_info(name_c, pipe_flow_and_demand_pattern_name) history_region_total_realtime_region_pipe_flow_and_demand += temp_history_pipe_flow_and_demand_list[modify_index] - temp_multiply_factor = (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand) / (history_region_total_source_outflow - history_region_total_realtime_region_pipe_flow_and_demand) for non_realtime_region_pattern_name in temp_non_realtime_region_patterns: pattern = get_pattern(name_c, non_realtime_region_pattern_name) @@ -880,7 +778,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(pattern) set_pattern(name_c, cs) - # 根据输入的参数进行数据修改,后面修改的可以覆盖前面的,用于EXTENDED类的方案模拟 # 修改清水池(reservoir)液位的pattern if modify_reservoir_head_pattern: @@ -897,7 +794,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(reservoir_pattern) set_pattern(name_c, cs) - # 修改调节池(tank)初始液位 if modify_tank_initial_level: for tank_name in modify_tank_initial_level.keys(): @@ -907,7 +803,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(tank) set_tank(name_c, cs) - # 修改节点(junction)基础水量(demand) if modify_junction_base_demand: for junction_name in modify_junction_base_demand.keys(): @@ -917,7 +812,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(junction) set_demand(name_c, cs) - # 修改节点(junction)的水量模式(pattern) if modify_junction_damand_pattern: for pattern_name in modify_junction_damand_pattern.keys(): @@ -929,7 +823,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(junction_pattern) set_pattern(name_c, cs) - # 修改工频水泵(fixed_pump)的pattern if modify_fixed_pump_pattern: for pump_name in modify_fixed_pump_pattern.keys(): @@ -941,7 +834,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) - # 修改变频水泵(variable_pump)的pattern if modify_variable_pump_pattern: for pump_name in modify_variable_pump_pattern.keys(): @@ -954,7 +846,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(pump_pattern) set_pattern(name_c, cs) - # 修改阀门(valve)的状态setting和status if modify_valve_opening: for valve_name in modify_valve_opening.keys(): @@ -972,62 +863,12 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s cs = ChangeSet() cs.append(valve_status) set_status(name_c, cs) - - # 根据高压出厂流量,更改高压用水模式 - # hp_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - # query_ids_list=list(hp_flow_pattern_id.values()), query_time=modify_pattern_start_time) - # - # hp_flow_dict = {key: hp_flow_SCADA_data_dict[value] for key, value in hp_flow_pattern_id.items()} - # - # all_valid = all(value and float(value) != 0 for value in hp_flow_dict.values()) - # - # if all_valid: - # hp_total_SCADA_flow = sum(float(value) for value in hp_flow_dict.values()) - # hp_total_history_flow = 0 - # for pattern_name in hp_flow_dict.keys(): - # history_flow_list = get_history_pattern_info(name_c, pattern_name) - # hp_total_history_flow += history_flow_list[modify_index] - # - # multiply_factor1 = hp_total_SCADA_flow / hp_total_history_flow - # hp_pattern_list = regions_patterns['hp'] - # for pattern_name in hp_pattern_list: - # pattern = get_pattern(name_c, pattern_name) - # pattern['factors'][modify_index] *= multiply_factor1 - # cs = ChangeSet() - # cs.append(pattern) - # set_pattern(name_c, cs) - # - # # 根据低压出厂流量,更改低压用水模式 - # lp_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( - # query_ids_list=list(lp_flow_pattern_id.values()), query_time=modify_pattern_start_time) - # - # lp_flow_dict = {key: lp_flow_SCADA_data_dict[value] for key, value in lp_flow_pattern_id.items()} - # - # all_valid2 = all(value and float(value) != 0 for value in lp_flow_dict.values()) - # - # if all_valid2: - # lp_total_SCADA_flow = sum(float(value) for value in lp_flow_dict.values()) - # lp_total_history_flow = 0 - # for pattern_name in lp_flow_dict.keys(): - # history_flow_list = get_history_pattern_info(name_c, pattern_name) - # lp_total_history_flow += history_flow_list[modify_index] - # - # multiply_factor2 = lp_total_SCADA_flow / lp_total_history_flow - # lp_pattern_list = regions_patterns['lp'] - # for pattern_name in lp_pattern_list: - # pattern = get_pattern(name_c, pattern_name) - # pattern['factors'][modify_index] *= multiply_factor2 - # cs = ChangeSet() - # cs.append(pattern) - # set_pattern(name_c, cs) # 运行并返回结果 result = run_project(name_c) - time_cost_end = time.perf_counter() print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) - close_project(name_c) # DingZQ 下面这几句一定要这样,不然读取不了 @@ -1040,7 +881,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s node_result = output.node_results() link_result = output.link_results() num_periods_result = output.times()['num_periods'] - # print(num_periods_result) # print(node_result) # 存储 @@ -1065,25 +905,25 @@ if __name__ == "__main__": globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns = get_realtime_region_patterns('bb', globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id) # 打印字典内容以验证 - # print("Reservoirs ID:", globals.reservoirs_id) - # print("Tanks ID:", globals.tanks_id) - # print("Fixed Pumps ID:", globals.fixed_pumps_id) - # print("Variable Pumps ID:", globals.variable_pumps_id) - # print("Pressure ID:", globals.pressure_id) - # print("Demand ID:", globals.demand_id) - # print("Quality ID:", globals.quality_id) - # print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id) - # print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id) - # print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns) - # print("Source Outflow Region:", region_result) - # print('Source Outflow Region ID:', globals.source_outflow_region_id) - # print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns) - # print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns) - # print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id) - # print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns) + print("Reservoirs ID:", globals.reservoirs_id) + print("Tanks ID:", globals.tanks_id) + print("Fixed Pumps ID:", globals.fixed_pumps_id) + print("Variable Pumps ID:", globals.variable_pumps_id) + print("Pressure ID:", globals.pressure_id) + print("Demand ID:", globals.demand_id) + print("Quality ID:", globals.quality_id) + print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id) + print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id) + print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns) + print("Source Outflow Region:", region_result) + print('Source Outflow Region ID:', globals.source_outflow_region_id) + print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns) + print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns) + print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id) + print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns) # 模拟示例1 - # run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-14T10:30:00+08:00') + run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-25T23:45:00+08:00') # 模拟示例2 # run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-02-14T10:30:00+08:00', modify_total_duration=900, # scheme_Type="burst_Analysis", scheme_Name="scheme1") @@ -1104,4 +944,3 @@ if __name__ == "__main__": -