Refine
This commit is contained in:
103
influxdb_api.py
103
influxdb_api.py
@@ -182,6 +182,7 @@ def delete_buckets(org_name: str) -> None:
|
||||
:return: None
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
# 定义需要删除的 bucket 名称列表
|
||||
buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result']
|
||||
buckets_api = client.buckets_api()
|
||||
@@ -199,6 +200,7 @@ def delete_buckets(org_name: str) -> None:
|
||||
print(f"Skipping bucket {bucket.name}. Not in the deletion list.")
|
||||
else:
|
||||
print("未找到 buckets 属性,无法迭代 buckets。")
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -210,6 +212,7 @@ def create_and_initialize_buckets(org_name: str) -> None:
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
# 先删除原有的,然后再进行初始化
|
||||
delete_buckets(org_name)
|
||||
bucket_api = BucketsApi(client)
|
||||
@@ -328,6 +331,7 @@ def create_and_initialize_buckets(org_name: str) -> None:
|
||||
write_api.write(bucket=bucket, org=org_name, record=points_to_write)
|
||||
write_api.flush() # 刷新缓存一次
|
||||
print("All buckets created and initialized successfully.")
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -339,6 +343,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -646,6 +651,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
|
||||
if points_to_write:
|
||||
write_api.write(bucket=bucket, org=org_name, record=points_to_write)
|
||||
write_api.flush() # 刷新缓存一次
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -669,6 +675,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -900,6 +907,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu
|
||||
if points_to_write:
|
||||
write_api.write(bucket=bucket, org=org_name, record=points_to_write)
|
||||
write_api.flush() # 刷新缓存一次
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -913,6 +921,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str =
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -1331,6 +1340,8 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str =
|
||||
write_api.write(bucket=bucket, org=org_name, record=points_to_write)
|
||||
write_api.flush() # 刷新缓存一次
|
||||
|
||||
client.close()
|
||||
|
||||
########################SCADA############################################################################################################
|
||||
|
||||
# DingZQ, 2025-03-08
|
||||
@@ -1405,6 +1416,7 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -1440,7 +1452,9 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time
|
||||
except Exception as e:
|
||||
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
|
||||
SCADA_result_dict[device_id] = None
|
||||
|
||||
client.close()
|
||||
|
||||
return SCADA_result_dict
|
||||
|
||||
# 2025/03/14
|
||||
@@ -1454,6 +1468,7 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -1485,7 +1500,9 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start
|
||||
"value": record["_value"]
|
||||
})
|
||||
SCADA_dict[device_id] = records_list
|
||||
|
||||
client.close()
|
||||
|
||||
return SCADA_dict
|
||||
|
||||
|
||||
@@ -1517,6 +1534,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -1578,6 +1596,8 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str
|
||||
write_api.flush() # 刷新缓存一次
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
|
||||
client.close()
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -1591,6 +1611,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati
|
||||
:return: dict: 最新记录的数据,如果没有找到则返回 None。
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -1691,6 +1712,7 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r
|
||||
:return: dict: tuple: (node_records, link_records)
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -1744,7 +1766,9 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r
|
||||
"reaction": record["reaction"],
|
||||
"friction": record["friction"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return node_records, link_records
|
||||
|
||||
|
||||
@@ -1792,7 +1816,9 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str,
|
||||
"ID": record["ID"],
|
||||
"value": record["_value"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return result_records
|
||||
|
||||
|
||||
@@ -1805,10 +1831,10 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r
|
||||
:return: dict: tuple: (node_records, link_records)
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
# 记录开始时间
|
||||
time_cost_start = time.perf_counter()
|
||||
print('{} -- Hydraulic simulation started.'.format(
|
||||
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
||||
print('{} -- query_all_record_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
@@ -1817,6 +1843,10 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r
|
||||
# 将 start_date 的北京时间转换为 UTC 时间
|
||||
start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||||
stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
|
||||
|
||||
print("start_time", start_time)
|
||||
print("stop_time", stop_time)
|
||||
|
||||
# 构建 Flux 查询语句
|
||||
flux_query = f'''
|
||||
from(bucket: "{bucket}")
|
||||
@@ -1828,8 +1858,10 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r
|
||||
valueColumn:"_value"
|
||||
)
|
||||
'''
|
||||
|
||||
# 执行查询
|
||||
tables = query_api.query(flux_query)
|
||||
|
||||
node_records = []
|
||||
link_records = []
|
||||
# 解析查询结果
|
||||
@@ -1861,11 +1893,12 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r
|
||||
"reaction": record["reaction"],
|
||||
"friction": record["friction"]
|
||||
})
|
||||
|
||||
time_cost_end = time.perf_counter()
|
||||
print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format(
|
||||
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'),
|
||||
time_cost_end - time_cost_start))
|
||||
print('{} -- query_all_record_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start))
|
||||
|
||||
client.close()
|
||||
|
||||
return node_records, link_records
|
||||
|
||||
# 2025/03/15 DingZQ
|
||||
@@ -1946,6 +1979,8 @@ def query_all_records_by_date_with_type(query_date: str, query_type: str, bucket
|
||||
})
|
||||
time_cost_end = time.perf_counter()
|
||||
|
||||
client.close()
|
||||
|
||||
return result_records
|
||||
|
||||
# 2025/02/21
|
||||
@@ -1959,10 +1994,10 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str,
|
||||
:return: list(dict): result_records
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
# 记录开始时间
|
||||
time_cost_start = time.perf_counter()
|
||||
print('{} -- Hydraulic simulation started.'.format(
|
||||
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
||||
print('{} -- query_all_record_by_date_property started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -1995,11 +2030,12 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str,
|
||||
"time": record["_time"],
|
||||
"value": record["_value"]
|
||||
})
|
||||
|
||||
time_cost_end = time.perf_counter()
|
||||
print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format(
|
||||
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'),
|
||||
time_cost_end - time_cost_start))
|
||||
print('{} -- query_all_record_by_date_property finished, cost time: {:.2f} s.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start))
|
||||
|
||||
client.close()
|
||||
|
||||
return result_records
|
||||
|
||||
|
||||
@@ -2016,6 +2052,7 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star
|
||||
:return: 查询结果的列表
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2050,7 +2087,9 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star
|
||||
"time": record["_time"],
|
||||
"value": record["_value"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@@ -2070,6 +2109,7 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str,
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2139,6 +2179,8 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str,
|
||||
write_api.flush() # 刷新缓存一次
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
|
||||
client.close()
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -2203,8 +2245,8 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name:
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
write_options = WriteOptions(
|
||||
jitter_interval=200, # 添加抖动以避免同时写入
|
||||
@@ -2298,6 +2340,7 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name:
|
||||
if points_to_write:
|
||||
write_api.write(bucket=bucket, org=org_name, record=points_to_write)
|
||||
write_api.flush() # 刷新缓存一次
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -2312,8 +2355,8 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu
|
||||
:return: 查询结果的列表
|
||||
"""
|
||||
client = get_new_client()
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
query_api = client.query_api()
|
||||
# 将 start_date 的北京时间转换为 UTC 时间范围
|
||||
@@ -2335,7 +2378,9 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu
|
||||
"time": record["_time"],
|
||||
"value": record["_value"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@@ -2350,8 +2395,8 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti
|
||||
:return: dict: tuple: (node_records, link_records)
|
||||
"""
|
||||
client = get_new_client()
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
query_api = client.query_api()
|
||||
# 将北京时间转换为 UTC 时间
|
||||
@@ -2403,7 +2448,9 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti
|
||||
"reaction": record["reaction"],
|
||||
"friction": record["friction"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return node_records, link_records
|
||||
|
||||
|
||||
@@ -2421,8 +2468,8 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str,
|
||||
:return: dict: tuple: (node_records, link_records)
|
||||
"""
|
||||
client = get_new_client()
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
query_api = client.query_api()
|
||||
# 确定 measurement
|
||||
@@ -2453,7 +2500,9 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str,
|
||||
"ID": record["ID"],
|
||||
"value": record["_value"]
|
||||
})
|
||||
client.close()
|
||||
|
||||
client.close()
|
||||
|
||||
return result_records
|
||||
|
||||
|
||||
@@ -2472,6 +2521,7 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_
|
||||
:return: 查询结果的列表
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2502,7 +2552,9 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_
|
||||
"time": record["_time"],
|
||||
"value": record["_value"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return results
|
||||
|
||||
|
||||
@@ -2517,6 +2569,7 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str,
|
||||
:return: dict: tuple: (node_records, link_records)
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2567,7 +2620,9 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str,
|
||||
"reaction": record["reaction"],
|
||||
"friction": record["friction"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return node_records, link_records
|
||||
|
||||
|
||||
@@ -2615,7 +2670,9 @@ def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, query_d
|
||||
"ID": record["ID"],
|
||||
"value": record["_value"]
|
||||
})
|
||||
|
||||
client.close()
|
||||
|
||||
return result_records
|
||||
|
||||
|
||||
@@ -2629,6 +2686,7 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2666,6 +2724,7 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
print(f"Data exported to {csv_filename} successfully.")
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -2679,6 +2738,7 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2749,6 +2809,7 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc
|
||||
writer.writeheader()
|
||||
writer.writerows(node_rows)
|
||||
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -2762,6 +2823,7 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str,
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2835,6 +2897,7 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str,
|
||||
writer.writeheader()
|
||||
writer.writerows(node_rows)
|
||||
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
@@ -2849,6 +2912,7 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name:
|
||||
:return:
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
@@ -2921,6 +2985,7 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name:
|
||||
writer.writeheader()
|
||||
writer.writerows(node_rows)
|
||||
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
|
||||
|
||||
client.close()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user