Refine influxdb api write api option

This commit is contained in:
DingZQ
2025-04-24 21:02:31 +08:00
parent 183c5a2bad
commit 86e06e1717
3 changed files with 33 additions and 19 deletions

View File

@@ -31,8 +31,7 @@ def store_realtime_SCADA_data_job() -> None:
get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str格式为'2025-02-01T18:45:00+08:00' get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str格式为'2025-02-01T18:45:00+08:00'
# 调用函数执行任务 # 调用函数执行任务
influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time) influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time)
print('{} -- Successfully store realtime SCADA data.'.format( print('{} -- Successfully store realtime SCADA data.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 2025/02/06 # 2025/02/06
@@ -74,10 +73,9 @@ def run_simulation_job() -> None:
globals.realtime_region_pipe_flow_and_demand_id) globals.realtime_region_pipe_flow_and_demand_id)
modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点 modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点
# print(modify_pattern_start_time) # print(modify_pattern_start_time)
simulation.run_simulation(name='bb', simulation_type="realtime", simulation.run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time=modify_pattern_start_time)
modify_pattern_start_time=modify_pattern_start_time)
print('{} -- Successfully run simulation and store realtime simulation result.'.format( print('{} -- Successfully run simulation and store realtime simulation result.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else: else:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.") print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.")

View File

@@ -383,6 +383,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
client = get_new_client() client = get_new_client()
if not client.ping(): if not client.ping():
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 本地变量,用于记录成功写入的数据点数量 # 本地变量,用于记录成功写入的数据点数量
points_written = 0 points_written = 0
lock = threading.Lock() lock = threading.Lock()
@@ -396,12 +397,14 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
def error_callback(exception): def error_callback(exception):
print("Error writing batch:", exception) print("Error writing batch:", exception)
# 使用异步写入模式配置写入选项和回调函数 # 使用异步写入模式配置写入选项和回调函数
write_api = client.write_api( write_api = client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=1000), write_options= create_write_options(),
success_callback=success_callback, success_callback=success_callback,
error_callback=error_callback error_callback=error_callback
) )
# 创建一个临时存储点数据的列表 # 创建一个临时存储点数据的列表
points_to_write = [] points_to_write = []
@@ -702,6 +705,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
if points_to_write: if points_to_write:
write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.write(bucket=bucket, org=org_name, record=points_to_write)
write_api.flush() # write_api.flush() #
time.sleep(10) time.sleep(10)
print("Total points written:", points_written) print("Total points written:", points_written)
client.close() client.close()
@@ -750,10 +754,11 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu
# ) # )
# 使用异步写入模式配置写入选项和回调函数 # 使用异步写入模式配置写入选项和回调函数
write_api = client.write_api( write_api = client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=1000), write_options=create_write_options(),
success_callback=success_callback, success_callback=success_callback,
error_callback=error_callback error_callback=error_callback
) )
# 创建一个临时存储点数据的列表 # 创建一个临时存储点数据的列表
points_to_write = [] points_to_write = []
@@ -978,8 +983,11 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu
if points_to_write: if points_to_write:
write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.write(bucket=bucket, org=org_name, record=points_to_write)
write_api.flush() # 刷新缓存一次 write_api.flush() # 刷新缓存一次
time.sleep(10) time.sleep(10)
print("Total points written:", points_written) print("Total points written:", points_written)
client.close() client.close()
@@ -1016,7 +1024,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str =
# write_api = client.write_api(write_options=SYNCHRONOUS, success_callback=success_callback, error_callback=error_callback) # write_api = client.write_api(write_options=SYNCHRONOUS, success_callback=success_callback, error_callback=error_callback)
# 使用异步写入模式配置写入选项和回调函数 # 使用异步写入模式配置写入选项和回调函数
write_api = client.write_api( write_api = client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=1000), write_options=create_write_options(),
success_callback=success_callback, success_callback=success_callback,
error_callback=error_callback error_callback=error_callback
) )
@@ -1430,8 +1438,11 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str =
if points_to_write: if points_to_write:
write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.write(bucket=bucket, org=org_name, record=points_to_write)
write_api.flush() # 刷新缓存一次 write_api.flush() # 刷新缓存一次
time.sleep(10) time.sleep(10)
print("Total points written:", points_written) print("Total points written:", points_written)
client.close() client.close()
########################SCADA############################################################################################################ ########################SCADA############################################################################################################
@@ -1762,7 +1773,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str
try: try:
# 使用异步写入模式配置写入选项和回调函数 # 使用异步写入模式配置写入选项和回调函数
write_api = client.write_api( write_api = client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=1000), write_options=create_write_options(),
success_callback=success_callback, success_callback=success_callback,
error_callback=error_callback error_callback=error_callback
) )
@@ -1823,7 +1834,9 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
time.sleep(10) time.sleep(10)
print("Total points written:", points_written) print("Total points written:", points_written)
client.close() client.close()
@@ -2412,7 +2425,7 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str,
# ) # )
# 使用异步写入模式配置写入选项和回调函数 # 使用异步写入模式配置写入选项和回调函数
write_api = client.write_api( write_api = client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=1000), write_options=create_write_options(),
success_callback=success_callback, success_callback=success_callback,
error_callback=error_callback error_callback=error_callback
) )
@@ -2476,9 +2489,13 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str,
write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.write(bucket=bucket, org=org_name, record=points_to_write)
write_api.flush() # 刷新缓存一次 write_api.flush() # 刷新缓存一次
except Exception as e: except Exception as e:
client.close()
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
time.sleep(10) time.sleep(10)
print("Total points written:", points_written) print("Total points written:", points_written)
client.close() client.close()
@@ -2564,7 +2581,7 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name:
# max_retry_delay=30000 # 最大重试延迟(毫秒) # max_retry_delay=30000 # 最大重试延迟(毫秒)
# ) # )
write_api = client.write_api( write_api = client.write_api(
write_options=WriteOptions(batch_size=1000, flush_interval=1000), write_options=create_write_options(),
success_callback=success_callback, success_callback=success_callback,
error_callback=error_callback error_callback=error_callback
) )
@@ -2656,8 +2673,11 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name:
if points_to_write: if points_to_write:
write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.write(bucket=bucket, org=org_name, record=points_to_write)
write_api.flush() # 刷新缓存一次 write_api.flush() # 刷新缓存一次
time.sleep(10) time.sleep(10)
print("Total points written:", points_written) print("Total points written:", points_written)
client.close() client.close()

View File

@@ -574,8 +574,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
print('modify_junction_base_demand', modify_junction_base_demand) print('modify_junction_base_demand', modify_junction_base_demand)
print('{} -- Hydraulic simulation started.'.format( print('{} -- Hydraulic simulation started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
# 判断是实时模拟还是多步长模拟 # 判断是实时模拟还是多步长模拟
# if simulation_type.upper() == 'REALTIME': # 实时模拟(修改原数据库) # if simulation_type.upper() == 'REALTIME': # 实时模拟(修改原数据库)
@@ -892,9 +891,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
# 运行并返回结果 # 运行并返回结果
result = run_project(name_c) result = run_project(name_c)
time_cost_end = time.perf_counter() time_cost_end = time.perf_counter()
print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start))
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'),
time_cost_end - time_cost_start))
# DingZQ 下面这几句一定要这样,不然读取不了 # DingZQ 下面这几句一定要这样,不然读取不了
time.sleep(5) # wait 5 seconds time.sleep(5) # wait 5 seconds
@@ -921,8 +918,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
if simulation_type.upper() == 'REALTIME': if simulation_type.upper() == 'REALTIME':
influxdb_api.store_realtime_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time) influxdb_api.store_realtime_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time)
elif simulation_type.upper() == 'EXTENDED': elif simulation_type.upper() == 'EXTENDED':
influxdb_api.store_scheme_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time, influxdb_api.store_scheme_simulation_result_to_influxdb(node_result, link_result, modify_pattern_start_time, num_periods_result, scheme_Type, scheme_Name)
num_periods_result, scheme_Type, scheme_Name)
influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name)
print("after store result") print("after store result")