From 2f14395d1e9f4f6fc260108d3b67feee46c2bc59 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Fri, 4 Apr 2025 09:44:26 +0800 Subject: [PATCH] Refine --- influxdb_api.py | 103 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 84 insertions(+), 19 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index 0f90415..f47aa7c 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -182,6 +182,7 @@ def delete_buckets(org_name: str) -> None: :return: None """ client = get_new_client() + # 定义需要删除的 bucket 名称列表 buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result'] buckets_api = client.buckets_api() @@ -199,6 +200,7 @@ def delete_buckets(org_name: str) -> None: print(f"Skipping bucket {bucket.name}. Not in the deletion list.") else: print("未找到 buckets 属性,无法迭代 buckets。") + client.close() @@ -210,6 +212,7 @@ def create_and_initialize_buckets(org_name: str) -> None: :return: """ client = get_new_client() + # 先删除原有的,然后再进行初始化 delete_buckets(org_name) bucket_api = BucketsApi(client) @@ -328,6 +331,7 @@ def create_and_initialize_buckets(org_name: str) -> None: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 print("All buckets created and initialized successfully.") + client.close() @@ -339,6 +343,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -646,6 +651,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 + client.close() @@ -669,6 +675,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -900,6 +907,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 + client.close() @@ -913,6 +921,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1331,6 +1340,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 + client.close() + ########################SCADA############################################################################################################ # DingZQ, 2025-03-08 @@ -1405,6 +1416,7 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1440,7 +1452,9 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time except Exception as e: print(f"Error querying InfluxDB for device ID {device_id}: {e}") SCADA_result_dict[device_id] = None + client.close() + return SCADA_result_dict # 2025/03/14 @@ -1454,6 +1468,7 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1485,7 +1500,9 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start "value": record["_value"] }) SCADA_dict[device_id] = records_list + client.close() + return SCADA_dict @@ -1517,6 +1534,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1578,6 +1596,8 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str write_api.flush() # 刷新缓存一次 except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") + client.close() + client.close() @@ -1591,6 +1611,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati :return: dict: 最新记录的数据,如果没有找到则返回 None。 """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1691,6 +1712,7 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r :return: dict: tuple: (node_records, link_records) """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1744,7 +1766,9 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r "reaction": record["reaction"], "friction": record["friction"] }) + client.close() + return node_records, link_records @@ -1792,7 +1816,9 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str, "ID": record["ID"], "value": record["_value"] }) + client.close() + return result_records @@ -1805,10 +1831,10 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r :return: dict: tuple: (node_records, link_records) """ client = get_new_client() + # 记录开始时间 time_cost_start = time.perf_counter() - print('{} -- Hydraulic simulation started.'.format( - datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + print('{} -- query_all_record_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1817,6 +1843,10 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r # 将 start_date 的北京时间转换为 UTC 时间 start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + + print("start_time", start_time) + print("stop_time", stop_time) + # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") @@ -1828,8 +1858,10 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r valueColumn:"_value" ) ''' + # 执行查询 tables = query_api.query(flux_query) + node_records = [] link_records = [] # 解析查询结果 @@ -1861,11 +1893,12 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r "reaction": record["reaction"], "friction": record["friction"] }) + time_cost_end = time.perf_counter() - print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( - datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), - time_cost_end - time_cost_start)) + print('{} -- query_all_record_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) + client.close() + return node_records, link_records # 2025/03/15 DingZQ @@ -1946,6 +1979,8 @@ def query_all_records_by_date_with_type(query_date: str, query_type: str, bucket }) time_cost_end = time.perf_counter() + client.close() + return result_records # 2025/02/21 @@ -1959,10 +1994,10 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, :return: list(dict): result_records """ client = get_new_client() + # 记录开始时间 time_cost_start = time.perf_counter() - print('{} -- Hydraulic simulation started.'.format( - datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + print('{} -- query_all_record_by_date_property started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1995,11 +2030,12 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, "time": record["_time"], "value": record["_value"] }) + time_cost_end = time.perf_counter() - print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( - datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), - time_cost_end - time_cost_start)) + print('{} -- query_all_record_by_date_property finished, cost time: {:.2f} s.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) + client.close() + return result_records @@ -2016,6 +2052,7 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star :return: 查询结果的列表 """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2050,7 +2087,9 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star "time": record["_time"], "value": record["_value"] }) + client.close() + return results @@ -2070,6 +2109,7 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2139,6 +2179,8 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, write_api.flush() # 刷新缓存一次 except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") + client.close() + client.close() @@ -2203,8 +2245,8 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: :return: """ client = get_new_client() - if not client.ping(): - print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) write_options = WriteOptions( jitter_interval=200, # 添加抖动以避免同时写入 @@ -2298,6 +2340,7 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 + client.close() @@ -2312,8 +2355,8 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu :return: 查询结果的列表 """ client = get_new_client() - if not client.ping(): - print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 @@ -2335,7 +2378,9 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu "time": record["_time"], "value": record["_value"] }) + client.close() + return results @@ -2350,8 +2395,8 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti :return: dict: tuple: (node_records, link_records) """ client = get_new_client() - if not client.ping(): - print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将北京时间转换为 UTC 时间 @@ -2403,7 +2448,9 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti "reaction": record["reaction"], "friction": record["friction"] }) + client.close() + return node_records, link_records @@ -2421,8 +2468,8 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, :return: dict: tuple: (node_records, link_records) """ client = get_new_client() - if not client.ping(): - print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 确定 measurement @@ -2453,7 +2500,9 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, "ID": record["ID"], "value": record["_value"] }) - client.close() + + client.close() + return result_records @@ -2472,6 +2521,7 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_ :return: 查询结果的列表 """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2502,7 +2552,9 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_ "time": record["_time"], "value": record["_value"] }) + client.close() + return results @@ -2517,6 +2569,7 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, :return: dict: tuple: (node_records, link_records) """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2567,7 +2620,9 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, "reaction": record["reaction"], "friction": record["friction"] }) + client.close() + return node_records, link_records @@ -2615,7 +2670,9 @@ def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, query_d "ID": record["ID"], "value": record["_value"] }) + client.close() + return result_records @@ -2629,6 +2686,7 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_ :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2666,6 +2724,7 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_ writer.writeheader() writer.writerows(rows) print(f"Data exported to {csv_filename} successfully.") + client.close() @@ -2679,6 +2738,7 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2749,6 +2809,7 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + client.close() @@ -2762,6 +2823,7 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2835,6 +2897,7 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + client.close() @@ -2849,6 +2912,7 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: :return: """ client = get_new_client() + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2921,6 +2985,7 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + client.close()