From af55aca4696c636b495c9493341590d905ac646c Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sun, 16 Mar 2025 22:03:52 +0800 Subject: [PATCH] Refine --- influxdb_api.py | 480 +++++++++++++++++++----------------------------- 1 file changed, 185 insertions(+), 295 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index 492e3da..9ed12c0 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -239,7 +239,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ .field("simulation_value", None) \ - .time("2024-11-21T00:00:00Z") + .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(point) # write_api.write(bucket="SCADA_data", org=org_name, record=point) # print("Initialized SCADA_data with default structure.") @@ -256,7 +256,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("quality", 0.0) \ .field("reaction", 0.0) \ .field("friction", 0.0) \ - .time("2024-11-21T00:00:00Z") + .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(link_point) node_point = Point("node") \ .tag("date", None) \ @@ -267,7 +267,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("demanddeficit", 0.0) \ .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ - .time("2024-11-21T00:00:00Z") + .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(node_point) # write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point) # write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point) @@ -285,7 +285,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("status", None) \ .field("setting", 0.0) \ .field("quality", 0.0) \ - .time("2024-11-21T00:00:00Z") + .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(link_point) node_point = Point("node") \ .tag("date", None) \ @@ -298,7 +298,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("demanddeficit", 0.0) \ .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ - .time("2024-11-21T00:00:00Z") + .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(node_point) SCADA_point = Point("SCADA") \ .tag("date", None) \ @@ -309,7 +309,7 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ .field("scheme_simulation_value", None) \ - .time("2024-11-21T00:00:00Z") + .time("2024-11-21T00:00:00Z", write_precision='s') points_to_write.append(SCADA_point) # write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point) # write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point) @@ -330,16 +330,13 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) write_options = WriteOptions( jitter_interval=200, # 添加抖动以避免同时写入 max_retry_delay=30000 # 最大重试延迟(毫秒) ) - write_api = client.write_api(write_options=write_options) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -407,7 +404,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -435,7 +432,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", (monitored_value)) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -463,7 +460,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -491,7 +488,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -519,7 +516,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -547,7 +544,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -575,7 +572,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -603,7 +600,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -631,7 +628,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) - .time(get_real_value_time_utc) + .time(get_real_value_time_utc, write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -662,11 +659,9 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + write_options = WriteOptions( jitter_interval=200, # 添加抖动以避免同时写入 max_retry_delay=30000 # 最大重试延迟(毫秒) @@ -767,7 +762,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -782,7 +777,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -797,7 +792,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -812,7 +807,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -827,7 +822,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -842,7 +837,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -857,7 +852,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -872,7 +867,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -887,7 +882,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -908,11 +903,9 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = :param client: 已初始化的 InfluxDBClient 实例 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + write_options = WriteOptions( jitter_interval=200, # 添加抖动以避免同时写入 max_retry_delay=30000 # 最大重试延迟(毫秒) @@ -1064,7 +1057,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1079,7 +1072,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1094,7 +1087,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1109,7 +1102,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1124,7 +1117,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1139,7 +1132,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1154,7 +1147,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1169,7 +1162,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1184,7 +1177,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1199,7 +1192,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1214,7 +1207,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1229,7 +1222,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1244,7 +1237,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1259,7 +1252,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1274,7 +1267,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1289,7 +1282,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1304,7 +1297,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1319,7 +1312,7 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -1338,11 +1331,9 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time :param client: 已初始化的 InfluxDBClient 实例。 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) @@ -1377,123 +1368,51 @@ def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time SCADA_result_dict[device_id] = None return SCADA_result_dict -# DingZQ, 2025-02-15 -def query_SCADA_data_by_device_ID_and_time_range(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list[dict[str, float]]: +# 2025/03/14 +def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, + bucket: str="SCADA_data", client: InfluxDBClient=client): """ - 根据SCADA设备的ID和时间查询值 - :param query_ids_list: SCADA设备ID的列表, 是api_query 而不是 普通的Id + 查询指定时间范围内,多个SCADA设备的数据,用于漏损定位 + :param query_ids_list: SCADA设备ID的列表 :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 :param client: 已初始化的 InfluxDBClient 实例。 :return: """ - if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将北京时间转换为 UTC 时间 - - bg_start_time = time_api.parse_beijing_time(start_time) - utc_start_time = bg_start_time.astimezone(timezone.utc) - bg_end_time = time_api.parse_beijing_time(end_time) - utc_end_time = bg_end_time.astimezone(timezone.utc) - - # 构建查询字典 - SCADA_results = [] - + beijing_start_time = datetime.fromisoformat(start_time) + utc_start_time = beijing_start_time.astimezone(timezone.utc) - timedelta(seconds=1) + print(utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + utc_end_time = beijing_end_time.astimezone(timezone.utc) + timedelta(seconds=1) + print(utc_end_time) + SCADA_dict = {} for device_id in query_ids_list: - # 构建 Flux 查询语句 flux_query = f''' - from(bucket: "{bucket}") - |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) - |> filter(fn: (r) => r["device_ID"] == "{device_id}") - |> filter(fn: (r) => r["_field"] == "monitored_value") - |> sort(columns: ["_time"], desc: false) - ''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "SCADA_data" and r["device_ID"] = {device_id} and r["_field"] == "monitored_value") + |> pivot(rowKey: ["_time"], columnKey: ["device_ID"], valueColumn: "_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list - # 执行查询 - try: - result = query_api.query(flux_query) - - # 从查询结果中提取 monitored_value - if result: - # 假设返回的结果为一行数据 - for table in result: - for record in table.records: - # 获取字段 "_value" 即为 monitored_value - monitored_value = record.get_value() - rec = { - "ID": device_id, - "time": record.get_time(), - "value": monitored_value - } - SCADA_results.append(rec) - - except Exception as e: - print(f"Error querying InfluxDB for device ID {device_id}: {e}") - - return SCADA_results - -# DingZQ, 2025-03-08 -def query_all_SCADA_records_by_date(query_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list[dict[str, float]]: - - """ - 根据日期查询所有SCADA数据 - - :param query_date: 输入的日期,格式为 '2024-11-24', 日期是北京时间的日期 - :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 - :param client: 已初始化的 InfluxDBClient 实例。 - - :return: - """ - - if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - - query_api = client.query_api() - # 将北京时间转换为 UTC 时间 - - bg_start_time, bg_end_time = time_api.parse_beijing_date_range(query_date) - # bg_end_time = bg_start_time + timedelta(hours=2) # 服务器性能不行,暂时返回2个小时的数据 - utc_start_time = bg_start_time.astimezone(timezone.utc) - utc_end_time = bg_end_time.astimezone(timezone.utc) - - print(f"utc_start_time: {utc_start_time}, utc_end_time: {utc_end_time}") - - # 构建查询字典 - SCADA_results = [] - - # 构建 Flux 查询语句 - flux_query = f''' - from(bucket: "{bucket}") - |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) - |> filter(fn: (r) => r["_field"] == "monitored_value") - |> sort(columns: ["_time"], desc: false) - ''' - - # 执行查询 - try: - result = query_api.query(flux_query) - - # 从查询结果中提取 monitored_value - if result: - # 假设返回的结果为一行数据 - for table in result: - for record in table.records: - # 获取字段 "_value" 即为 monitored_value - monitored_value = record.get_value() - rec = { - "ID": record['device_ID'], # 是api_query 而不是 普通的Id - "time": record.get_time(), - record['_measurement']: monitored_value - } - SCADA_results.append(rec) - - except Exception as e: - print(f"Error querying InfluxDB for date {query_date}: {e}") - - return SCADA_results + return SCADA_dict # DingZQ, 2025-02-15 @@ -1510,6 +1429,7 @@ def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, str(start_time), str(end_time), bucket, client) + # 2025/02/01 def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], result_start_time: str, @@ -1523,11 +1443,9 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + # 开始写入数据 try: write_options = WriteOptions( @@ -1554,7 +1472,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str .field("demanddeficit", None) \ .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ - .time(time_beijing) + .time(time_beijing, write_precision='s') points_to_write.append(node_point) # 写入数据到 InfluxDB,多个 field 在同一个 point 中 # write_api.write(bucket=bucket, org=org_name, record=node_point) @@ -1575,7 +1493,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str .field("setting", data.get('setting', 0.0)) \ .field("reaction", data.get('reaction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \ - .time(time_beijing) + .time(time_beijing, write_precision='s') points_to_write.append(link_point) # write_api.write(bucket=bucket, org=org_name, record=link_point) # write_api.flush() @@ -1598,16 +1516,14 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati :param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。 :return: dict: 最新记录的数据,如果没有找到则返回 None。 """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() if type == "node": flux_query = f''' from(bucket: "{bucket}") - |> range(start: -7d) // 查找最近七天的记录 + |> range(start: -1d, stop: now()) // 查找最近七天的记录 |> filter(fn: (r) => r["_measurement"] == "node" and r["ID"] == "{ID}") |> pivot( rowKey:["_time"], @@ -1624,7 +1540,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati for record in table.records: return { "time": record["_time"], - "nodeID": ID, + "ID": ID, "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], @@ -1635,7 +1551,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati elif type == "link": flux_query = f''' from(bucket: "{bucket}") - |> range(start: -7d) // 查找最近七天的记录 + |> range(start: -1d, stop: now()) // 查找最近七天的记录 |> filter(fn: (r) => r["_measurement"] == "link" and r["ID"] == "{ID}") |> pivot( rowKey:["_time"], @@ -1652,7 +1568,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati for record in table.records: return { "time": record["_time"], - "linkID": ID, + "ID": ID, "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], @@ -1698,11 +1614,9 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) @@ -1743,7 +1657,7 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r elif measurement == "link": link_records.append({ "time": record["_time"], - "linkID": record["ID"], + "ID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], @@ -1768,11 +1682,9 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str, :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: list(dict): result_records """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 确定 measurement if type == "node": @@ -1820,18 +1732,17 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r print('{} -- Hydraulic simulation started.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间 start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") - |> range(start: {start_time}) + |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link" and r["date"] == "{query_date}") |> pivot( rowKey:["_time"], @@ -1862,7 +1773,7 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r elif measurement == "link": link_records.append({ "time": record["_time"], - "linkID": record["ID"], + "ID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], @@ -1980,11 +1891,9 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, time_cost_start = time.perf_counter() print('{} -- Hydraulic simulation started.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 确定 measurement if type == "node": @@ -1995,11 +1904,12 @@ def query_all_record_by_date_property(query_date: str, type: str, property: str, raise ValueError(f"不支持的类型: {type}") # 将 start_date 的北京时间转换为 UTC 时间 start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") - |> range(start: {start_time}) - |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["date"] == "{query_date}" and r["_field"] == "{property}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["date"] == "{query_date}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) @@ -2033,11 +1943,9 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例 :return: 查询结果的列表 """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 确定 measurement if type == "node": @@ -2123,7 +2031,7 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, .field("demanddeficit", None) \ .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ - .time(scheme_time) + .time(scheme_time, write_precision='s') points_to_write.append(node_point) # 写入数据到 InfluxDB,多个 field 在同一个 point 中 # write_api.write(bucket=bucket, org=org_name, record=node_point) @@ -2147,7 +2055,7 @@ def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, .field("setting", data.get('setting', 0.0)) \ .field("reaction", data.get('reaction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \ - .time(scheme_time) + .time(scheme_time, write_precision='s') points_to_write.append(link_point) # write_api.write(bucket=bucket, org=org_name, record=link_point) # write_api.flush() @@ -2212,19 +2120,13 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: :param client: 已初始化的 InfluxDBClient 实例 :return: """ - print("fill_scheme_simulation_result_to_SCADA") + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) write_options = WriteOptions( jitter_interval=200, # 添加抖动以避免同时写入 max_retry_delay=30000 # 最大重试延迟(毫秒) ) - - write_api = client.write_api(write_options=write_options) # 创建一个临时存储点数据的列表 points_to_write = [] @@ -2241,13 +2143,11 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: .tag("scheme_Type", scheme_Type) .tag("scheme_Name", scheme_Name) .field("monitored_value", data['value']) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) - print("fill_scheme_simulation_result_to_SCADA 2163") - for key, value in globals.scheme_pipe_flow_ids.items(): scheme_pipe_flow_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, query_date=query_date, ID=value, type='link', property='flow')) @@ -2259,13 +2159,11 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: .tag("scheme_Type", scheme_Type) .tag("scheme_Name", scheme_Name) .field("monitored_value", data['value']) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) - print("fill_scheme_simulation_result_to_SCADA - 2181") - for key, value in globals.scheme_pressure_ids.items(): scheme_pressure_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, query_date=query_date, ID=value, type='node', property='pressure')) @@ -2277,7 +2175,7 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: .tag("scheme_Type", scheme_Type) .tag("scheme_Name", scheme_Name) .field("monitored_value", data['value']) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -2293,7 +2191,7 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: .tag("scheme_Type", scheme_Type) .tag("scheme_Name", scheme_Name) .field("monitored_value", data['value']) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) @@ -2309,13 +2207,10 @@ def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: .tag("scheme_Type", scheme_Type) .tag("scheme_Name", scheme_Name) .field("monitored_value", data['value']) - .time(data['time']) + .time(data['time'], write_precision='s') ) points_to_write.append(point) # write_api.write(bucket=bucket, org=org_name, record=point) - - print('fill_scheme_simulation_result_to_SCADA - 2231') - # 批量写入数据 if points_to_write: write_api.write(bucket=bucket, org=org_name, record=points_to_write) @@ -2333,11 +2228,9 @@ def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bu :param client: 已初始化的 InfluxDBClient 实例 :return: 查询结果的列表 """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() @@ -2373,10 +2266,9 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti :param client: 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ - if client.ping(): - pass - else: + if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) @@ -2417,7 +2309,7 @@ def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_ti elif measurement == "link": link_records.append({ "time": record["_time"], - "linkID": record["ID"], + "ID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], @@ -2444,11 +2336,9 @@ def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, :param client: 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 确定 measurement if type == "node": @@ -2496,11 +2386,9 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_ :param client: 已初始化的 InfluxDBClient 实例 :return: 查询结果的列表 """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 确定 measurement if type == "node": @@ -2509,11 +2397,14 @@ def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_ measurement = "link" else: raise ValueError(f"不支持的类型: {type}") + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") - |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["ID"] == "{ID}" and r["_field"] == "{property}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["ID"] == "{ID}" and r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) @@ -2540,17 +2431,17 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, :param client: 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") - |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["_measurement"] == "node" or r["_measurement"] == "link") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], @@ -2580,7 +2471,7 @@ def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, elif measurement == "link": link_records.append({ "time": record["_time"], - "linkID": record["ID"], + "ID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], @@ -2609,11 +2500,9 @@ def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, query_d :param client: 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 确定 measurement if type == "node": @@ -2622,10 +2511,12 @@ def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, query_d measurement = "link" else: raise ValueError(f"不支持的类型: {type}") + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") - |> range(start: 2025-01-01T00:00:00Z) + |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") ''' # 执行查询 @@ -2652,11 +2543,9 @@ def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_ :param client: 已初始化的 InfluxDBClient 实例 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() @@ -2703,11 +2592,9 @@ def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, buc :param client: 已初始化的 InfluxDBClient 实例 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() @@ -2787,11 +2674,9 @@ def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, :param client: 已初始化的 InfluxDBClient 实例 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() @@ -2876,17 +2761,17 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: :param client: 已初始化的 InfluxDBClient 实例 :return: """ - if client.ping(): - pass - else: - print("{} -- Failed to connect to InfluxDB.".format( - datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_link = f''' from(bucket: "{bucket}") - |> range(start: 2025-01-01T00:00:00Z) - |> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") ''' # 执行查询 link_tables = query_api.query(flux_query_link) @@ -2907,7 +2792,7 @@ def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_node = f''' from(bucket: "{bucket}") - |> range(start: 2025-01-01T00:00:00Z) + |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "node" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") ''' # 执行查询 @@ -2977,7 +2862,7 @@ if __name__ == "__main__": # 手动执行存储测试 # 示例1:store_realtime_SCADA_data_to_influxdb - store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-03-12T23:45:00+08:00') + # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-03-16T11:13:00+08:00') # 示例2:store_non_realtime_SCADA_data_to_influxdb # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-03-08T12:00:00+08:00') @@ -2986,7 +2871,7 @@ if __name__ == "__main__": # download_history_data_manually(begin_time='2025-03-04T00:00:00+08:00', end_time='2025-03-10T00:00:00+08:00') # step3: 查询测试示例 - # with InfluxDBClient(url=url, token=token, org=org_name) as client: + with InfluxDBClient(url=url, token=token, org=org_name) as client: # # 示例1:query_latest_record_by_ID # bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 @@ -3042,7 +2927,7 @@ if __name__ == "__main__": # print(curve_result) # 示例12:query_all_record_by_date - # node_records, link_records = query_all_record_by_date(query_date='2025-02-27') + node_records, link_records = query_all_record_by_date(query_date='2025-02-27') # print("Node 数据:", node_records) # print("Link 数据:", link_records) @@ -3071,5 +2956,10 @@ if __name__ == "__main__": # 示例18:fill_scheme_simulation_result_to_SCADA # fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_Name='burst_scheme', query_date='2025-03-10') + # 示例19:query_SCADA_data_by_device_ID_and_timerange + # result = query_SCADA_data_by_device_ID_and_timerange(query_ids_list=globals.fixed_pump_realtime_ids, start_time='2025-03-09T12:00:00+08:00', + # end_time='2025-03-09T12:10:00+08:00') + # print(result) +