From 00b5e0a3bbb9d4b56457764c7706a5e1284723be Mon Sep 17 00:00:00 2001 From: DingZQ Date: Wed, 23 Apr 2025 21:52:56 +0800 Subject: [PATCH] Updte influxdb and online_Analysis --- influxdb_api.py | 128 ++++++++++++++++++++++++++++++++++++++++----- online_Analysis.py | 10 ++-- 2 files changed, 120 insertions(+), 18 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index d6f783a..747b460 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1,4 +1,5 @@ from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi, WriteOptions +from influxdb_client.client.exceptions import InfluxDBError from typing import List, Dict from datetime import datetime, timedelta, timezone from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS @@ -102,8 +103,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None: open_project(name) dic_time = get_time(name) globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP'] - # DingZQ, 2025-03-21 - #close_project(name) + close_project(name) # 连接数据库 conn_string = f"dbname={name} host=127.0.0.1" try: @@ -284,7 +284,8 @@ def create_and_initialize_buckets(org_name: str) -> None: .tag("device_ID", None) \ .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ - .field("simulation_value", 0.0) \ + .field("datafilling_value", 0.0) \ + .field("cleaned_value", 0.0) \ .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(point) # write_api.write(bucket="SCADA_data", org=org_name, record=point) @@ -1499,7 +1500,6 @@ def query_all_SCADA_records_by_date(query_date: str, bucket: str="SCADA_data") - return SCADA_results - def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data") -> Dict[str, float]: """ 根据SCADA设备的ID和时间查询值 @@ -1545,7 +1545,6 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time print(f"Error querying InfluxDB for device ID {device_id}: {e}") SCADA_result_dict[device_id] = None client.close() - return SCADA_result_dict @@ -1594,9 +1593,7 @@ def query_scheme_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], que except Exception as e: print(f"Error querying InfluxDB for device ID {device_id}: {e}") SCADA_result_dict[device_id] = None - client.close() - return SCADA_result_dict # 2025/03/14 @@ -1675,11 +1672,10 @@ def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, str(start_time), str(end_time), bucket) - # 2025/04/17 -def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): +def query_cleaning_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): """ - 查询指定时间范围内,多个SCADA设备的清洗后的数据 + 查询指定时间范围内,多个SCADA设备的修复的单个的数据 :param query_ids_list: SCADA设备ID的列表 :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 @@ -1726,6 +1722,106 @@ def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str return SCADA_dict +# 2025/04/22 +def query_filling_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): + """ + 查询指定时间范围内,多个SCADA设备的填补的单个的数据 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + print('start_time', start_time) + print('end_time', end_time) + # 将北京时间转换为 UTC 时间 + beijing_start_time = datetime.fromisoformat(start_time) + print('beijing_start_time', beijing_start_time) + utc_start_time = time_api.to_utc_time(beijing_start_time) + print('utc_start_time', utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + print('beijing_end_time', beijing_end_time) + utc_stop_time = time_api.to_utc_time(beijing_end_time) + print('utc_stop_time', utc_stop_time) + SCADA_dict = {} + for device_id in query_ids_list: + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datafilling_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + print(tables) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list + client.close() + return SCADA_dict + + +# 2025/04/22 +def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): + """ + 查询指定时间范围内,多个SCADA设备的清洗完毕后的完整数据 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + print('start_time', start_time) + print('end_time', end_time) + # 将北京时间转换为 UTC 时间 + beijing_start_time = datetime.fromisoformat(start_time) + print('beijing_start_time', beijing_start_time) + utc_start_time = time_api.to_utc_time(beijing_start_time) + print('utc_start_time', utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + print('beijing_end_time', beijing_end_time) + utc_stop_time = time_api.to_utc_time(beijing_end_time) + print('utc_stop_time', utc_stop_time) + SCADA_dict = {} + for device_id in query_ids_list: + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "cleaned_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + print(tables) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list + client.close() + return SCADA_dict + + # 2025/02/01 def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], result_start_time: str, bucket: str = "realtime_simulation_result"): @@ -2027,7 +2123,8 @@ def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_ client = get_new_client() # 记录开始时间 time_cost_start = time.perf_counter() - print('{} -- query_all_records_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + print('{} -- Hydraulic simulation started.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -3304,6 +3401,8 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat datetime_value = datetime.strptime(row['time'], '%Y-%m-%d %H:%M:%S%z') # 处理datacleaning_value为空的情况 datacleaning_value = float(row['datacleaning_value']) if row['datacleaning_value'] else None + datafilling_value = float(row['datafilling_value']) if row['datafilling_value'] else None + cleaned_value = float(row['cleaned_value']) if row['cleaned_value'] else None # 处理monitored_value字段类型错误 try: monitored_value = float(row['monitored_value']) if row['monitored_value'] else None @@ -3317,6 +3416,8 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat 'description': row['description'], 'monitored_value': monitored_value, 'datacleaning_value': datacleaning_value, + 'datafilling_value': datafilling_value, + 'cleaned_value': cleaned_value, 'datetime': datetime_value }) @@ -3328,7 +3429,6 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat write_api = client.write_api(write_options=SYNCHRONOUS) # 写入数据 for data in data_list: - print(data) # 创建Point对象 point = ( Point(data['measurement']) # measurement为mpointName @@ -3337,6 +3437,8 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat .tag('description', data['description']) .field("monitored_value", data['monitored_value']) # field key为dataValue .field('datacleaning_value', data['datacleaning_value']) + .field('datafilling_value', data['datafilling_value']) + .field('cleaned_value', data['cleaned_value']) .time(data['datetime']) # 时间以datetime为准 ) @@ -3383,7 +3485,7 @@ if __name__ == "__main__": # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-03-08T12:00:00+08:00') # 示例3:download_history_data_manually - # download_history_data_manually(begin_time='2025-04-16T00:00:00+08:00', end_time='2025-04-16T23:59:00+08:00') + # download_history_data_manually(begin_time='2025-04-17T00:00:00+08:00', end_time='2025-04-17T23:59:00+08:00') # step3: 查询测试示例 diff --git a/online_Analysis.py b/online_Analysis.py index 0d06d5a..a3e1f29 100644 --- a/online_Analysis.py +++ b/online_Analysis.py @@ -781,11 +781,11 @@ def submit_scada_info(name: str, coord_id: str) -> None: id, type, associated_element_id, associated_pattern, associated_pipe_flow_id, {associated_columns}, API_query_id, transmission_mode, transmission_frequency, - X_coor, Y_coor, coord + reliability, X_coor, Y_coor, coord ) VALUES ( %s, %s, %s, %s, %s, {associated_placeholders}, - %s, %s, %s, %s, %s, %s + %s, %s, %s, %s, %s, %s, %s ); """).format( associated_columns=sql.SQL(", ").join(sql.Identifier(col) for col in associated_columns), @@ -797,7 +797,7 @@ def submit_scada_info(name: str, coord_id: str) -> None: cleaned_row.get('associated_pattern'), cleaned_row.get('associated_pipe_flow_id'), *associated_values, cleaned_row.get('API_query_id'), cleaned_row['transmission_mode'], cleaned_row['transmission_frequency'], - x_coor, y_coor, coord + cleaned_row['reliability'], x_coor, y_coor, coord )) conn.commit() print("数据成功导入到 'scada_info' 表格。") @@ -1073,8 +1073,8 @@ if __name__ == '__main__': # print(f"方案名不存在,可以使用。") # 示例1:burst_analysis - # burst_analysis(name='bb', modify_pattern_start_time='2025-03-30T12:00:00+08:00', - # burst_ID='ZBBGXSZK001105', burst_size=25, modify_total_duration=1800, scheme_Name='burst0330') + # burst_analysis(name='bb', modify_pattern_start_time='2025-04-17T00:00:00+08:00', + # burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_Name='GSD230112144241FA18292A84CB_400') # 示例:create_user create_user(name='bb', username='admin', password='123456')