diff --git a/influxdb_api.py b/influxdb_api.py index e983a86..1fe73f8 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -173,6 +173,20 @@ def get_new_client() -> InfluxDBClient: """每次调用返回一个新的 InfluxDBClient 实例。""" return InfluxDBClient(url=url, token=token, org=org_name, timeout=600*1000) # 600 seconds +# 2025/04/11, DingZQ +def create_write_options() -> WriteOptions: + ''' + 创建一个写入选项 + ''' + return WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000, # 最大重试延迟(毫秒) + batch_size=10_000, # 每批次发送10,000个点 + flush_interval=10_000, # 10秒强制刷新 + retry_interval=5_000 # 失败重试间隔5秒 + ) + + # 2025/02/01 def delete_buckets(org_name: str) -> None: @@ -347,10 +361,8 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_options = WriteOptions( - jitter_interval=200, # 添加抖动以避免同时写入 - max_retry_delay=30000 # 最大重试延迟(毫秒) - ) + write_options = create_write_options() + write_api = client.write_api(write_options=write_options) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -679,11 +691,9 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_options = WriteOptions( - jitter_interval=200, # 添加抖动以避免同时写入 - max_retry_delay=30000 # 最大重试延迟(毫秒) - ) + write_options = create_write_options() write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 points_to_write = [] @@ -925,10 +935,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_options = WriteOptions( - jitter_interval=200, # 添加抖动以避免同时写入 - max_retry_delay=30000 # 最大重试延迟(毫秒) - ) + write_options = create_write_options() write_api = client.write_api(write_options=write_options) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -1560,10 +1567,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str # 开始写入数据 try: - write_options = WriteOptions( - jitter_interval=200, # 添加抖动以避免同时写入 - max_retry_delay=30000 # 最大重试延迟(毫秒) - ) + write_options = create_write_options() write_api = client.write_api(write_options=write_options) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -2140,10 +2144,7 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) try: - write_options = WriteOptions( - jitter_interval=200, # 添加抖动以避免同时写入 - max_retry_delay=30000 # 最大重试延迟(毫秒) - ) + write_options = create_write_options() write_api = client.write_api(write_options=write_options) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -2274,10 +2275,7 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_options = WriteOptions( - jitter_interval=200, # 添加抖动以避免同时写入 - max_retry_delay=30000 # 最大重试延迟(毫秒) - ) + write_options = create_write_options() write_api = client.write_api(write_options=write_options) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -2583,6 +2581,7 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_ return results +# D # 2025/02/21 # query_date 是模拟的时间,不是查询的时s