From 86e06e1717d06fabbea953ec45f803b66b3c0f35 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Thu, 24 Apr 2025 21:02:31 +0800 Subject: [PATCH] Refine influxdb api write api option --- auto_realtime.py | 10 ++++------ influxdb_api.py | 32 ++++++++++++++++++++++++++------ simulation.py | 10 +++------- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/auto_realtime.py b/auto_realtime.py index 7e05416..26788bb 100644 --- a/auto_realtime.py +++ b/auto_realtime.py @@ -31,8 +31,7 @@ def store_realtime_SCADA_data_job() -> None: get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str,格式为'2025-02-01T18:45:00+08:00' # 调用函数执行任务 influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time) - print('{} -- Successfully store realtime SCADA data.'.format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + print('{} -- Successfully store realtime SCADA data.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 2025/02/06 @@ -74,10 +73,9 @@ def run_simulation_job() -> None: globals.realtime_region_pipe_flow_and_demand_id) modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点 # print(modify_pattern_start_time) - simulation.run_simulation(name='bb', simulation_type="realtime", - modify_pattern_start_time=modify_pattern_start_time) - print('{} -- Successfully run simulation and store realtime simulation result.'.format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + simulation.run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time=modify_pattern_start_time) + + print('{} -- Successfully run simulation and store realtime simulation result.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.") diff --git a/influxdb_api.py b/influxdb_api.py index 4166e4e..44100f8 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -383,6 +383,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str client = get_new_client() if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + # 本地变量,用于记录成功写入的数据点数量 points_written = 0 lock = threading.Lock() @@ -396,12 +397,14 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str def error_callback(exception): print("Error writing batch:", exception) + # 使用异步写入模式配置写入选项和回调函数 write_api = client.write_api( - write_options=WriteOptions(batch_size=1000, flush_interval=1000), + write_options= create_write_options(), success_callback=success_callback, error_callback=error_callback ) + # 创建一个临时存储点数据的列表 points_to_write = [] @@ -702,6 +705,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # + time.sleep(10) print("Total points written:", points_written) client.close() @@ -750,10 +754,11 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu # ) # 使用异步写入模式配置写入选项和回调函数 write_api = client.write_api( - write_options=WriteOptions(batch_size=1000, flush_interval=1000), + write_options=create_write_options(), success_callback=success_callback, error_callback=error_callback ) + # 创建一个临时存储点数据的列表 points_to_write = [] @@ -978,8 +983,11 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 + time.sleep(10) + print("Total points written:", points_written) + client.close() @@ -1016,7 +1024,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = # write_api = client.write_api(write_options=SYNCHRONOUS, success_callback=success_callback, error_callback=error_callback) # 使用异步写入模式配置写入选项和回调函数 write_api = client.write_api( - write_options=WriteOptions(batch_size=1000, flush_interval=1000), + write_options=create_write_options(), success_callback=success_callback, error_callback=error_callback ) @@ -1430,8 +1438,11 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 + time.sleep(10) + print("Total points written:", points_written) + client.close() ########################SCADA############################################################################################################ @@ -1762,7 +1773,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str try: # 使用异步写入模式配置写入选项和回调函数 write_api = client.write_api( - write_options=WriteOptions(batch_size=1000, flush_interval=1000), + write_options=create_write_options(), success_callback=success_callback, error_callback=error_callback ) @@ -1823,7 +1834,9 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") time.sleep(10) + print("Total points written:", points_written) + client.close() @@ -2412,7 +2425,7 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, # ) # 使用异步写入模式配置写入选项和回调函数 write_api = client.write_api( - write_options=WriteOptions(batch_size=1000, flush_interval=1000), + write_options=create_write_options(), success_callback=success_callback, error_callback=error_callback ) @@ -2476,9 +2489,13 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 except Exception as e: + client.close() raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") + time.sleep(10) + print("Total points written:", points_written) + client.close() @@ -2564,7 +2581,7 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: # max_retry_delay=30000 # 最大重试延迟(毫秒) # ) write_api = client.write_api( - write_options=WriteOptions(batch_size=1000, flush_interval=1000), + write_options=create_write_options(), success_callback=success_callback, error_callback=error_callback ) @@ -2656,8 +2673,11 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 + time.sleep(10) + print("Total points written:", points_written) + client.close() diff --git a/simulation.py b/simulation.py index 155c418..b2b6c9a 100644 --- a/simulation.py +++ b/simulation.py @@ -574,8 +574,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s print('modify_junction_base_demand', modify_junction_base_demand) - print('{} -- Hydraulic simulation started.'.format( - datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + print('{} -- Hydraulic simulation started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) # 判断是实时模拟还是多步长模拟 # if simulation_type.upper() == 'REALTIME': # 实时模拟(修改原数据库) @@ -892,9 +891,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 运行并返回结果 result = run_project(name_c) time_cost_end = time.perf_counter() - print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( - datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), - time_cost_end - time_cost_start)) + print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) # DingZQ 下面这几句一定要这样,不然读取不了 time.sleep(5) # wait 5 seconds @@ -921,8 +918,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s if simulation_type.upper() == 'REALTIME': influxdb_api.store_realtime_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time) elif simulation_type.upper() == 'EXTENDED': - influxdb_api.store_scheme_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time, - num_periods_result, scheme_Type, scheme_Name) + influxdb_api.store_scheme_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time, num_periods_result, scheme_Type, scheme_Name) influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) print("after store result")