From edc6d6d9c29cb668741f0ee6e1c0fa328acda24e Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sat, 19 Apr 2025 11:27:12 +0800 Subject: [PATCH] Refine --- influxdb_api.py | 645 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 448 insertions(+), 197 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index aa76ee3..ea89b85 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -200,7 +200,6 @@ def delete_buckets(org_name: str) -> None: :return: None """ client = get_new_client() - # 定义需要删除的 bucket 名称列表 buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result'] buckets_api = client.buckets_api() @@ -218,7 +217,6 @@ def delete_buckets(org_name: str) -> None: print(f"Skipping bucket {bucket.name}. Not in the deletion list.") else: print("未找到 buckets 属性,无法迭代 buckets。") - client.close() @@ -230,11 +228,30 @@ def create_and_initialize_buckets(org_name: str) -> None: :return: """ client = get_new_client() - + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 先删除原有的,然后再进行初始化 delete_buckets(org_name) bucket_api = BucketsApi(client) - write_api = client.write_api() + # 本地变量,用于记录成功写入的数据点数量 + points_written = 0 + lock = threading.Lock() + + # 回调函数中使用 nonlocal 来修改外层的变量 points_written + def success_callback(batch, response): + nonlocal points_written + count = len(batch) if isinstance(batch, list) else 1 + with lock: + points_written += count + + def error_callback(exception): + print("Error writing batch:", exception) + + write_api = client.write_api( + write_options=WriteOptions(batch_size=1000, flush_interval=1000), + success_callback=success_callback, + error_callback=error_callback + ) org_api = OrganizationsApi(client) # 获取 org_id org = next((o for o in org_api.find_organizations() if o.name == org_name), None) @@ -267,7 +284,7 @@ def create_and_initialize_buckets(org_name: str) -> None: .tag("device_ID", None) \ .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ - .field("simulation_value", None) \ + .field("simulation_value", 0.0) \ .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(point) # write_api.write(bucket="SCADA_data", org=org_name, record=point) @@ -337,7 +354,7 @@ def create_and_initialize_buckets(org_name: str) -> None: .tag("scheme_Name", None) \ .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ - .field("scheme_simulation_value", None) \ + .field("scheme_simulation_value", 0.0) \ .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(SCADA_point) # write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point) @@ -345,11 +362,13 @@ def create_and_initialize_buckets(org_name: str) -> None: # write_api.write(bucket="scheme_simulation_result", org=org_name, record=SCADA_point) # print("Initialized scheme_simulation_result with default structure.") # 批量写入数据 + print("points to write:", len(points_to_write)) if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 print("All buckets created and initialized successfully.") - + time.sleep(10) + print("Total points written:", points_written) client.close() @@ -361,13 +380,27 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + # 本地变量,用于记录成功写入的数据点数量 + points_written = 0 + lock = threading.Lock() - write_options = create_write_options() + # 回调函数中使用 nonlocal 来修改外层的变量 points_written + def success_callback(batch, response): + nonlocal points_written + count = len(batch) if isinstance(batch, list) else 1 + with lock: + points_written += count - write_api = client.write_api(write_options=write_options) + def error_callback(exception): + print("Error writing batch:", exception) + # 使用异步写入模式配置写入选项和回调函数 + write_api = client.write_api( + write_options=WriteOptions(batch_size=1000, flush_interval=1000), + success_callback=success_callback, + error_callback=error_callback + ) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -424,7 +457,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('reservoir_liquid_level_realtime') @@ -452,14 +485,14 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('tank_liquid_level_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", (monitored_value)) + .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc, write_precision='s') @@ -480,7 +513,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('fixed_pump_realtime') @@ -508,7 +541,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('variable_pump_realtime') @@ -536,7 +569,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('source_outflow_realtime') @@ -564,7 +597,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('pipe_flow_realtime') @@ -592,7 +625,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('pressure_realtime') @@ -620,7 +653,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('demand_realtime') @@ -648,7 +681,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 - monitored_value = data['monitored_value'] + monitored_value = float(data['monitored_value']) # 创建Point对象 point = ( Point('quality_realtime') @@ -664,10 +697,12 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str # write_api.write(bucket=bucket, org=org_name, record=point) # write_api.flush() # 批量写入数据 + print("points to write:", len(points_to_write)) if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) - write_api.flush() # 刷新缓存一次 - + write_api.flush() # + time.sleep(10) + print("Total points written:", points_written) client.close() @@ -691,13 +726,33 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_options = create_write_options() - write_api = client.write_api(write_options=write_options) + # 本地变量,用于记录成功写入的数据点数量 + points_written = 0 + lock = threading.Lock() + # 回调函数中使用 nonlocal 来修改外层的变量 points_written + def success_callback(batch, response): + nonlocal points_written + count = len(batch) if isinstance(batch, list) else 1 + with lock: + points_written += count + + def error_callback(exception): + print("Error writing batch:", exception) + + # write_options = WriteOptions( + # jitter_interval=200, # 添加抖动以避免同时写入 + # max_retry_delay=30000 # 最大重试延迟(毫秒) + # ) + # 使用异步写入模式配置写入选项和回调函数 + write_api = client.write_api( + write_options=WriteOptions(batch_size=1000, flush_interval=1000), + success_callback=success_callback, + error_callback=error_callback + ) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -790,7 +845,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -805,7 +860,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -820,7 +875,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -835,7 +890,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -850,7 +905,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -865,7 +920,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -880,7 +935,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -895,7 +950,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -910,7 +965,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -918,10 +973,12 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) # 批量写入数据 + print("points to write:", len(points_to_write)) if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 - + time.sleep(10) + print("Total points written:", points_written) client.close() @@ -935,12 +992,33 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_options = create_write_options() - write_api = client.write_api(write_options=write_options) + # 本地变量,用于记录成功写入的数据点数量 + points_written = 0 + lock = threading.Lock() + + # 回调函数中使用 nonlocal 来修改外层的变量 points_written + def success_callback(batch, response): + nonlocal points_written + count = len(batch) if isinstance(batch, list) else 1 + with lock: + points_written += count + + def error_callback(exception): + print("Error writing batch:", exception) + # write_options = WriteOptions( + # jitter_interval=200, # 添加抖动以避免同时写入 + # max_retry_delay=30000 # 最大重试延迟(毫秒) + # ) + # write_api = client.write_api(write_options=SYNCHRONOUS, success_callback=success_callback, error_callback=error_callback) + # 使用异步写入模式配置写入选项和回调函数 + write_api = client.write_api( + write_options=WriteOptions(batch_size=1000, flush_interval=1000), + success_callback=success_callback, + error_callback=error_callback + ) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -1084,7 +1162,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1099,7 +1177,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1114,7 +1192,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1129,7 +1207,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1144,7 +1222,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1159,7 +1237,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1174,7 +1252,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1189,7 +1267,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1204,7 +1282,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1219,7 +1297,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1234,7 +1312,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1249,7 +1327,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1264,7 +1342,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1279,7 +1357,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1294,7 +1372,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1309,7 +1387,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1324,7 +1402,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1339,7 +1417,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) - .field("monitored_value", data['monitored_value']) + .field("monitored_value", float(data['monitored_value'])) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time'], write_precision='s') @@ -1347,10 +1425,12 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) # 批量写入数据 + print("points to write:", len(points_to_write)) if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 - + time.sleep(10) + print("Total points written:", points_written) client.close() ########################SCADA############################################################################################################ @@ -1418,6 +1498,8 @@ def query_all_SCADA_records_by_date(query_date: str, bucket: str="SCADA_data") - return SCADA_results + + def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data") -> Dict[str, float]: """ 根据SCADA设备的ID和时间查询值 @@ -1427,7 +1509,6 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1463,9 +1544,59 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time except Exception as e: print(f"Error querying InfluxDB for device ID {device_id}: {e}") SCADA_result_dict[device_id] = None - client.close() + + return SCADA_result_dict + +def query_scheme_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, scheme_Type: str, + scheme_Name: str, bucket: str="scheme_simulation_result") -> Dict[str, float]: + """ + 根据SCADA设备的ID和时间查询方案中的值 + :param query_ids_list: SCADA设备ID的列表 + :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建查询字典 + SCADA_result_dict = {} + for device_id in query_ids_list: + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") + ''' + # 执行查询 + try: + result = query_api.query(flux_query) + # 从查询结果中提取 monitored_value + if result: + # 假设返回的结果为一行数据 + for table in result: + for record in table.records: + # 获取字段 "_value" 即为 monitored_value + monitored_value = record.get_value() + SCADA_result_dict[device_id] = monitored_value + else: + # 如果没有结果,默认设置为 None 或其他值 + SCADA_result_dict[device_id] = None + except Exception as e: + print(f"Error querying InfluxDB for device ID {device_id}: {e}") + SCADA_result_dict[device_id] = None + + client.close() + return SCADA_result_dict # 2025/03/14 @@ -1479,64 +1610,56 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() - print('start_time', start_time) print('end_time', end_time) - # 将北京时间转换为 UTC 时间 + # beijing_start_time = datetime.fromisoformat(start_time) + # utc_start_time = beijing_start_time.astimezone(timezone.utc) - timedelta(seconds=1) + # print(utc_start_time) + # beijing_end_time = datetime.fromisoformat(end_time) + # utc_end_time = beijing_end_time.astimezone(timezone.utc) + timedelta(seconds=1) + # print(utc_end_time) beijing_start_time = datetime.fromisoformat(start_time) print('beijing_start_time', beijing_start_time) - utc_start_time = time_api.to_utc_time(beijing_start_time) print('utc_start_time', utc_start_time) - beijing_end_time = datetime.fromisoformat(end_time) print('beijing_end_time', beijing_end_time) - utc_stop_time = time_api.to_utc_time(beijing_end_time) print('utc_stop_time', utc_stop_time) - SCADA_dict = {} for device_id in query_ids_list: - # 构建 Flux 查询语句 + # flux_query = f''' + # from(bucket: "{bucket}") + # |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + # |> filter(fn: (r) => r["_measurement"] == "SCADA_data" and r["device_ID"] == {device_id} and r["_field"] == "monitored_value") + # |> pivot(rowKey: ["_time"], columnKey: ["device_ID"], valueColumn: "_value") + # |> sort(columns: ["_time"]) + # ''' flux_query = f''' - from(bucket: "{bucket}") - |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value") - |> sort(columns: ["_time"]) - ''' - # 执行查询 - try: - result = query_api.query(flux_query) - # 从查询结果中提取 monitored_value - if result: - # 假设返回的结果为一行数据 - records_list = [] - - for table in result: - for record in table.records: - # 获取记录的时间和监测值 - records_list.append({ - "time": record["_time"], - "value": record["_value"] - }) - - SCADA_dict[device_id] = records_list - - except Exception as e: - print(f"Error querying InfluxDB for device ID {device_id}: {e}") - SCADA_dict[device_id] = None - + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list client.close() - return SCADA_dict - # DingZQ, 2025-02-15 def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date: str, bucket: str="SCADA_data") -> list[dict[str, float]]: """ @@ -1553,6 +1676,56 @@ def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, str(start_time), str(end_time), bucket) +# 2025/04/17 +def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): + """ + 查询指定时间范围内,多个SCADA设备的清洗后的数据 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + print('start_time', start_time) + print('end_time', end_time) + # 将北京时间转换为 UTC 时间 + beijing_start_time = datetime.fromisoformat(start_time) + print('beijing_start_time', beijing_start_time) + utc_start_time = time_api.to_utc_time(beijing_start_time) + print('utc_start_time', utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + print('beijing_end_time', beijing_end_time) + utc_stop_time = time_api.to_utc_time(beijing_end_time) + print('utc_stop_time', utc_stop_time) + SCADA_dict = {} + for device_id in query_ids_list: + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datacleaning_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + print(tables) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list + client.close() + return SCADA_dict + + # 2025/02/01 def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], result_start_time: str, bucket: str = "realtime_simulation_result"): @@ -1565,14 +1738,31 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + # 本地变量,用于记录成功写入的数据点数量 + points_written = 0 + lock = threading.Lock() + + # 回调函数中使用 nonlocal 来修改外层的变量 points_written + def success_callback(batch, response): + nonlocal points_written + count = len(batch) if isinstance(batch, list) else 1 + with lock: + points_written += count + + def error_callback(exception): + print("Error writing batch:", exception) + # 开始写入数据 try: - write_options = create_write_options() - write_api = client.write_api(write_options=write_options) + # 使用异步写入模式配置写入选项和回调函数 + write_api = client.write_api( + write_options=WriteOptions(batch_size=1000, flush_interval=1000), + success_callback=success_callback, + error_callback=error_callback + ) # 创建一个临时存储点数据的列表 points_to_write = [] date_str = result_start_time.split('T')[0] @@ -1619,13 +1809,16 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str # write_api.flush() # print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") # 批量写入数据 + print("points to write:", len(points_to_write)) if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") - client.close() - + client.close() + + time.sleep(10) + print("Total points written:", points_written) client.close() @@ -1639,7 +1832,6 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati :return: dict: 最新记录的数据,如果没有找到则返回 None。 """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1702,32 +1894,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati "reaction": record["reaction"], "friction": record["friction"] } - elif type == "scada": - flux_query = f''' - from(bucket: "SCADA_data") - |> range(start: -30d) // 查找最近一月的记录 - |> filter(fn: (r) => r["_measurement"] == "pressure_realtime") - |> filter(fn: (r) => r["device_ID"] == "{ID}") - |> pivot( - rowKey:["_time"], - columnKey:["_field"], - valueColumn:"_value" - ) - |> sort(columns: ["_time"], desc: true) - |> limit(n: 1) - ''' - tables = query_api.query(flux_query) - # 解析查询结果 - for table in tables: - for record in table.records: - return { - "time": record["_time"], - "device_ID": ID, - "value": record["monitored_value"], - } - client.close() - return None # 如果没有找到记录 @@ -1740,7 +1907,6 @@ def query_all_records_by_time(query_time: str, bucket: str="realtime_simulation_ :return: dict: tuple: (node_records, link_records) """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1794,9 +1960,7 @@ def query_all_records_by_time(query_time: str, bucket: str="realtime_simulation_ "reaction": record["reaction"], "friction": record["friction"] }) - client.close() - return node_records, link_records @@ -1844,9 +2008,7 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str, "ID": record["ID"], "value": record["_value"] }) - client.close() - return result_records @@ -1859,7 +2021,6 @@ def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_ :return: dict: tuple: (node_records, link_records) """ client = get_new_client() - # 记录开始时间 time_cost_start = time.perf_counter() print('{} -- query_all_records_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) @@ -1892,10 +2053,8 @@ def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_ valueColumn:"_value" ) ''' - # 执行查询 tables = query_api.query(flux_query) - node_records = [] link_records = [] # 解析查询结果 @@ -1927,12 +2086,15 @@ def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_ "reaction": record["reaction"], "friction": record["friction"] }) - time_cost_end = time.perf_counter() print('{} -- query_all_records_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) client.close() + return node_records, link_records + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), + time_cost_end - time_cost_start)) + client.close() return node_records, link_records # 2025/04/12 DingZQ @@ -2115,10 +2277,10 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, :return: list(dict): result_records """ client = get_new_client() - # 记录开始时间 time_cost_start = time.perf_counter() - print('{} -- query_all_record_by_date_property started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + print('{} -- Hydraulic simulation started.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2151,12 +2313,11 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, "time": record["_time"], "value": record["_value"] }) - time_cost_end = time.perf_counter() - print('{} -- query_all_record_by_date_property finished, cost time: {:.2f} s.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) - + print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), + time_cost_end - time_cost_start)) client.close() - return result_records @@ -2173,7 +2334,6 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star :return: 查询结果的列表 """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2208,9 +2368,7 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star "time": record["_time"], "value": record["_value"] }) - client.close() - return results @@ -2230,13 +2388,33 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) try: - write_options = create_write_options() - write_api = client.write_api(write_options=write_options) + # 本地变量,用于记录成功写入的数据点数量 + points_written = 0 + lock = threading.Lock() + + # 回调函数中使用 nonlocal 来修改外层的变量 points_written + def success_callback(batch, response): + nonlocal points_written + count = len(batch) if isinstance(batch, list) else 1 + with lock: + points_written += count + + def error_callback(exception): + print("Error writing batch:", exception) + # write_options = WriteOptions( + # jitter_interval=200, # 添加抖动以避免同时写入 + # max_retry_delay=30000 # 最大重试延迟(毫秒) + # ) + # 使用异步写入模式配置写入选项和回调函数 + write_api = client.write_api( + write_options=WriteOptions(batch_size=1000, flush_interval=1000), + success_callback=success_callback, + error_callback=error_callback + ) # 创建一个临时存储点数据的列表 points_to_write = [] date_str = scheme_start_time.split('T')[0] @@ -2292,13 +2470,14 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, # write_api.write(bucket=bucket, org=org_name, record=link_point) # write_api.flush() # 批量写入数据 + print("points to write:", len(points_to_write)) if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") - client.close() - + time.sleep(10) + print("Total points written:", points_written) client.close() @@ -2348,8 +2527,7 @@ def query_corresponding_query_id_and_element_id(name: str) -> None: # def auto_get_burst_flow(): -# 2025/03/22 -# def manually_get_burst_flow(): + # 2025/03/11 @@ -2363,11 +2541,32 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: :return: """ client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + # 本地变量,用于记录成功写入的数据点数量 + points_written = 0 + lock = threading.Lock() - write_options = create_write_options() - write_api = client.write_api(write_options=write_options) + # 回调函数中使用 nonlocal 来修改外层的变量 points_written + def success_callback(batch, response): + nonlocal points_written + count = len(batch) if isinstance(batch, list) else 1 + with lock: + points_written += count + + def error_callback(exception): + print("Error writing batch:", exception) + + # write_options = WriteOptions( + # jitter_interval=200, # 添加抖动以避免同时写入 + # max_retry_delay=30000 # 最大重试延迟(毫秒) + # ) + write_api = client.write_api( + write_options=WriteOptions(batch_size=1000, flush_interval=1000), + success_callback=success_callback, + error_callback=error_callback + ) # 创建一个临时存储点数据的列表 points_to_write = [] # 查找associated_element_id的对应值 @@ -2452,10 +2651,12 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) # 批量写入数据 + print("points to write:", len(points_to_write)) if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 - + time.sleep(10) + print("Total points written:", points_written) client.close() @@ -2470,8 +2671,8 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu :return: 查询结果的列表 """ client = get_new_client() - - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 @@ -2481,7 +2682,7 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) - |> filter(fn: (r) => r["device_ID"] == "{api_query_id}") + |> filter(fn: (r) => r["device_ID"] == "{api_query_id}" and r["_field"] == "monitored_value") ''' # 执行查询 tables = query_api.query(flux_query) @@ -2493,9 +2694,7 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu "time": record["_time"], "value": record["_value"] }) - client.close() - return results @@ -2510,8 +2709,8 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti :return: dict: tuple: (node_records, link_records) """ client = get_new_client() - - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将北京时间转换为 UTC 时间 @@ -2563,9 +2762,7 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti "reaction": record["reaction"], "friction": record["friction"] }) - client.close() - return node_records, link_records @@ -2583,8 +2780,8 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, :return: dict: tuple: (node_records, link_records) """ client = get_new_client() - - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 确定 measurement @@ -2615,9 +2812,7 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, "ID": record["ID"], "value": record["_value"] }) - - client.close() - + client.close() return result_records @@ -2636,7 +2831,6 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_ :return: 查询结果的列表 """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2667,15 +2861,11 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_ "time": record["_time"], "value": record["_value"] }) - client.close() - return results -# D # 2025/02/21 -# query_date 是模拟的时间,不是查询的时s def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, bucket: str="scheme_simulation_result") -> tuple: """ 查询指定方案的所有记录,包括‘node'和‘link’,分别以指定格式返回。 @@ -2686,7 +2876,6 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, :return: dict: tuple: (node_records, link_records) """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2742,9 +2931,7 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, "reaction": record["reaction"], "friction": record["friction"] }) - client.close() - return node_records, link_records @@ -2792,9 +2979,7 @@ def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, query_d "ID": record["ID"], "value": record["_value"] }) - client.close() - return result_records @@ -2808,7 +2993,6 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_ :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2846,7 +3030,6 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_ writer.writeheader() writer.writerows(rows) print(f"Data exported to {csv_filename} successfully.") - client.close() @@ -2860,7 +3043,6 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -2931,7 +3113,6 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") - client.close() @@ -2945,7 +3126,6 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -3019,7 +3199,6 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") - client.close() @@ -3034,7 +3213,6 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: :return: """ client = get_new_client() - if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -3107,10 +3285,72 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") - client.close() +def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_data") -> None: + """ + 将清洗后的SCADA数据导入influxdb,有标准化导入格式 + :param file_path: 导入数据的文件 + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + + data_list = [] + with open(file_path, mode='r', encoding='utf-8-sig') as csv_file: + csv_reader = csv.DictReader(csv_file) + for row in csv_reader: + # 解析日期和时间字段 + datetime_value = datetime.strptime(row['time'], '%Y-%m-%d %H:%M:%S%z') + # 处理datacleaning_value为空的情况 + datacleaning_value = float(row['datacleaning_value']) if row['datacleaning_value'] else None + # 处理monitored_value字段类型错误 + try: + monitored_value = float(row['monitored_value']) if row['monitored_value'] else None + except ValueError: + monitored_value = None # 如果转换失败,则设为None(或其他适当的默认值) + + data_list.append({ + 'measurement': row['measurement'], + 'device_ID': row['device_ID'], + 'date': datetime_value.strftime('%Y-%m-%d'), + 'description': row['description'], + 'monitored_value': monitored_value, + 'datacleaning_value': datacleaning_value, + 'datetime': datetime_value + }) + + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + try: + write_api = client.write_api(write_options=SYNCHRONOUS) + # 写入数据 + for data in data_list: + # 创建Point对象 + point = ( + Point(data['measurement']) # measurement为mpointName + .tag("device_ID", data['device_ID']) # tag key为mpointId + .tag("date", data['date']) # 具体日期tag,方便查询 + .tag('description', data['description']) + .field("monitored_value", data['monitored_value']) # field key为dataValue + .field('datacleaning_value', data['datacleaning_value']) + .time(data['datetime']) # 时间以datetime为准 + ) + + write_api.write(bucket=bucket, record=point) + + except InfluxDBError as e: + print(f"InfluxDB错误: {str(e)}") + except Exception as e: + print(f"未知错误: {str(e)}") + finally: + if 'write_api' in locals(): + write_api.close() + client.close() + + # 示例调用 if __name__ == "__main__": url = influxdb_info.url @@ -3142,10 +3382,9 @@ if __name__ == "__main__": # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-03-08T12:00:00+08:00') # 示例3:download_history_data_manually - # download_history_data_manually(begin_time='2025-03-21T00:00:00+08:00', end_time='2025-03-22T00:00:00+08:00') + # download_history_data_manually(begin_time='2025-04-16T00:00:00+08:00', end_time='2025-04-16T23:59:00+08:00') # step3: 查询测试示例 - # with InfluxDBClient(url=url, token=token, org=org_name) as client: # 示例1:query_latest_record_by_ID # bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 @@ -3161,7 +3400,7 @@ if __name__ == "__main__": # print("未找到符合条件的记录。") # 示例2:query_all_record_by_time - # node_records, link_records = query_all_record_by_time(query_time="2025-02-14T10:30:00+08:00") + # node_records, link_records = query_all_record_by_time(query_time="2025-04-04T00:00:00+08:00") # print("Node 数据:", node_records) # print("Link 数据:", link_records) @@ -3175,11 +3414,11 @@ if __name__ == "__main__": # print(SCADA_result_dict) # 示例5:query_SCADA_data_curve - # SCADA_result = query_SCADA_data_curve(api_query_id='9519', start_date='2025-03-08', end_date='2025-03-08') + # SCADA_result = query_SCADA_data_curve(api_query_id='9485', start_date='2024-03-25', end_date='2024-03-25') # print(SCADA_result) # 示例6:export_SCADA_data_to_csv - # export_SCADA_data_to_csv(start_date='2025-02-13', end_date='2025-02-15') + # export_SCADA_data_to_csv(start_date='2025-03-30', end_date='2025-03-30') # 示例7:export_realtime_simulation_result_to_csv # export_realtime_simulation_result_to_csv(start_date='2025-02-13', end_date='2025-02-15') @@ -3211,7 +3450,7 @@ if __name__ == "__main__": # print("Link 数据:", link_records) # 示例14:query_all_record_by_time_property - # result_records = query_all_record_by_time_property(query_time='2025-02-25T23:45:00+08:00', type='node', property='head') + # result_records = query_all_record_by_time_property(query_time='2025-03-30T12:00:00+08:00', type='node', property='pressure') # print(result_records) # 示例15:query_all_record_by_date_property @@ -3228,11 +3467,23 @@ if __name__ == "__main__": # print(result_records) # 示例18:fill_scheme_simulation_result_to_SCADA - # fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_Name='burst_scheme', query_date='2025-03-10') + # fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_Name='burst0330', query_date='2025-03-30') # 示例19:query_SCADA_data_by_device_ID_and_timerange - # result = query_SCADA_data_by_device_ID_and_timerange(query_ids_list=globals.fixed_pump_realtime_ids, start_time='2025-03-09T12:00:00+08:00', - # end_time='2025-03-09T12:10:00+08:00') + # result = query_SCADA_data_by_device_ID_and_timerange(query_ids_list=globals.pressure_non_realtime_ids, start_time='2025-04-16T00:00:00+08:00', + # end_time='2025-04-16T23:59:00+08:00') + # print(result) + + # 示例:manually_get_burst_flow + # leakage = manually_get_burst_flow(scheme_Type='burst_Analysis', scheme_Name='burst_scheme', scheme_start_time='2025-03-10T12:00:00+08:00') + # print(leakage) + + # 示例:upload_cleaned_SCADA_data_to_influxdb + # upload_cleaned_SCADA_data_to_influxdb(file_path='./标准cleaned_demand_data.csv') + + # 示例:query_cleaned_SCADA_data_by_device_ID_and_timerange + # result = query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list=['9485'], start_time='2024-03-24T00:00:00+08:00', + # end_time='2024-03-26T23:59:00+08:00') # print(result)