From fa435b105f142b463a1b7f614ec781797ccd2226 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sat, 22 Feb 2025 17:15:47 +0800 Subject: [PATCH] Add influxdb_api --- influxdb_api.py | 821 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 784 insertions(+), 37 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index b859b4e..e4dc153 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -12,6 +12,9 @@ from tjnetwork import * import schedule import threading import globals +import csv +import pandas as pd +import openpyxl import influxdb_info import time_api @@ -139,6 +142,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None: open_project(name) dic_time = get_time(name) globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP'] + close_project(name) # 连接数据库 conn_string = f"dbname={name} host=127.0.0.1" @@ -205,7 +209,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None: # print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids) # print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids) # print("Maximum Transmission Frequency:", globals.transmission_frequency) - # print("Hydraulic Timestep:", globals.hydraulic_timestep) + print("Hydraulic Timestep:", globals.hydraulic_timestep) except Exception as e: @@ -220,17 +224,23 @@ def delete_buckets(client: InfluxDBClient, org_name: str) -> None: :param org_name: InfluxDB中organization的名称。 :return: None """ + # 定义需要删除的 bucket 名称列表 + buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result'] + buckets_api = client.buckets_api() buckets_obj = buckets_api.find_buckets(org=org_name) # 确保 buckets_obj 拥有 buckets 属性 if hasattr(buckets_obj, 'buckets'): for bucket in buckets_obj.buckets: - try: - buckets_api.delete_bucket(bucket) - print(f"Bucket {bucket.name} has been deleted successfully.") - except Exception as e: - print(f"Failed to delete bucket {bucket.name}: {e}") + if bucket.name in buckets_to_delete: # 只删除特定名称的 bucket + try: + buckets_api.delete_bucket(bucket) + print(f"Bucket {bucket.name} has been deleted successfully.") + except Exception as e: + print(f"Failed to delete bucket {bucket.name}: {e}") + else: + print(f"Skipping bucket {bucket.name}. Not in the deletion list.") else: print("未找到 buckets 属性,无法迭代 buckets。") @@ -298,6 +308,8 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None .field("status", None) \ .field("setting", 0.0) \ .field("quality", 0.0) \ + .field("reaction", 0.0) \ + .field("friction", 0.0) \ .time("2024-11-21T00:00:00Z") node_point = Point("node") \ @@ -380,7 +392,8 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str try: try_count += 1 if globals.reservoir_liquid_level_realtime_ids: - reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.reservoir_liquid_level_realtime_ids)) + reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue( + ids=','.join(globals.reservoir_liquid_level_realtime_ids)) if globals.tank_liquid_level_realtime_ids: tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids)) if globals.fixed_pump_realtime_ids: @@ -424,7 +437,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str # 创建Point对象 point = ( - Point('reservoir_liquid_level_realtime') # measurement name + Point('reservoir_liquid_level_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) @@ -1158,22 +1171,24 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati :return: dict: 最新记录的数据,如果没有找到则返回 None。 """ if client.ping(): - print("{} -- Successfully connected to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: - print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() if type == "node": flux_query = f''' from(bucket: "{bucket}") - |> range(start: -30d) // 查找最近一月的记录 + |> range(start: -7d) // 查找最近七天的记录 |> filter(fn: (r) => r["_measurement"] == "node") |> filter(fn: (r) => r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" - ) + ) + |> group() // 将所有数据聚合到同一个 group |> sort(columns: ["_time"], desc: true) |> limit(n: 1) ''' @@ -1195,14 +1210,15 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati elif type == "link": flux_query = f''' from(bucket: "{bucket}") - |> range(start: -30d) // 查找最近一月的记录 + |> range(start: -7d) // 查找最近七天的记录 |> filter(fn: (r) => r["_measurement"] == "link") |> filter(fn: (r) => r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" - ) + ) + |> group() // 将所有数据聚合到同一个 group |> sort(columns: ["_time"], desc: true) |> limit(n: 1) ''' @@ -1268,10 +1284,8 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) utc_time = beijing_time.astimezone(timezone.utc) - # DingZQ, 2025-02-10, set delta time to 15, original is 1 - utc_start_time = utc_time - timedelta(seconds=15) - utc_stop_time = utc_time + timedelta(seconds=15) - + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") @@ -1283,13 +1297,11 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r valueColumn:"_value" ) ''' - # 执行查询 tables = query_api.query(flux_query) node_records = [] link_records = [] # 解析查询结果 - for table in tables: for record in table.records: # print(record.values) # 打印完整记录内容 @@ -1322,6 +1334,69 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r return node_records, link_records +# 2025/02/21 +def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple: + """ + 查询指定日期的所有记录,包括‘node’和‘link’,分别以指定的格式返回 + :param query_date: 输入的日期,格式为‘2025-02-14’ + :param bucket: 数据存储的bucket名称 + :param client: 已初始化的InfluxDBClient 实例。 + :return: dict: tuple: (node_records, link_records) + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: 2025-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") + |> filter(fn: (r) => r["date"] == "{query_date}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "linkID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + + return node_records, link_records # 2025/02/01 def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list: @@ -1383,7 +1458,6 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star return results - def query_buckets(client: InfluxDBClient=client) -> list[str]: # 获取 Buckets API 实例 buckets_api = client.buckets_api() @@ -1411,6 +1485,642 @@ def query_measurements(bucket: str, client: InfluxDBClient=client) -> list[str]: return measurements + + +# WMH 2025/02/13 +def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], + scheme_start_time: str, num_periods: int = 1, scheme_Type: str = None, scheme_Name: str = None, + bucket: str = "scheme_simulation_result", client: InfluxDBClient = client): + """ + 将方案模拟计算结果存入 InfluxuDb 的scheme_simulation_result这个bucket中。 + :param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。 + :param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。 + :param scheme_start_time: (str): 方案模拟开始时间。 + :param num_periods: (int): 方案模拟的周期数 + :param scheme_Type: (str): 方案类型 + :param scheme_Name: (str): 方案名称 + :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"。 + :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 + :return: + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + try: + write_api = client.write_api() + + date_str = scheme_start_time.split('T')[0] + time_beijing = datetime.strptime(scheme_start_time, '%Y-%m-%dT%H:%M:%S%z') + + timestep_parts = globals.hydraulic_timestep.split(':') + timestep = timedelta(hours=int(timestep_parts[0]), minutes=int(timestep_parts[1]), seconds=int(timestep_parts[2])) + + for node_result in node_result_list: + # 提取节点信息和数据结果 + node_id = node_result.get('node') + # 从period 0 到 period num_period - 1 + for period_index in range(num_periods): + scheme_time = (time_beijing + (timestep * period_index)).isoformat() + data_list = [node_result.get('result', [])[period_index]] + for data in data_list: + # 构建 Point 数据,多个 field 存在于一个数据点中 + node_point = Point("node") \ + .tag("date", date_str) \ + .tag("ID", node_id) \ + .tag("scheme_Type", scheme_Type) \ + .tag("scheme_Name", scheme_Name) \ + .field("head", data.get('head', 0.0)) \ + .field("pressure", data.get('pressure', 0.0)) \ + .field("actualdemand", data.get('demand', 0.0)) \ + .field("demanddeficit", None) \ + .field("totalExternalOutflow", None) \ + .field("quality", data.get('quality', 0.0)) \ + .time(scheme_time) + # 写入数据到 InfluxDB,多个 field 在同一个 point 中 + write_api.write(bucket=bucket, org=org_name, record=node_point) + write_api.flush() + + for link_result in link_result_list: + link_id = link_result.get('link') + for period_index in range(num_periods): + scheme_time = (time_beijing + (timestep * period_index)).isoformat() + data_list = [link_result.get('result', [])[period_index]] + for data in data_list: + link_point = Point("link") \ + .tag("date", date_str) \ + .tag("ID", link_id) \ + .tag("scheme_Type", scheme_Type) \ + .tag("scheme_Name", scheme_Name) \ + .field("flow", data.get('flow', 0.0)) \ + .field("velocity", data.get('velocity', 0.0)) \ + .field("headloss", data.get('headloss', 0.0)) \ + .field("quality", data.get('quality', 0.0)) \ + .field("status", data.get('status', "UNKNOWN")) \ + .field("setting", data.get('setting', 0.0)) \ + .field("reaction", data.get('reaction', 0.0)) \ + .field("friction", data.get('friction', 0.0)) \ + .time(scheme_time) + write_api.write(bucket=bucket, org=org_name, record=link_point) + write_api.flush() + + except Exception as e: + raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") + + +# 2025/02/15 +def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list: + """ + 根据SCADA设备的api_query_id和时间范围,查询得到曲线,查到的数据为0时区时间 + :param api_query_id: SCADA设备的api_query_id + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :param client: 已初始化的 InfluxDBClient 实例 + :return: 查询结果的列表 + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["device_ID"] == "{api_query_id}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + # 解析查询结果 + results = [] + for table in tables: + for record in table.records: + results.append({ + "time": record["_time"], + "value": record["_value"] + }) + return results + + +# 2025/02/18 +def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_time: str, + bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> tuple: + """ + 查询指定方案某一时刻的所有记录,包括‘node'和‘link’,分别以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: 数据存储的 bucket 名称。 + :param client: 已初始化的 InfluxDBClient 实例。 + :return: dict: tuple: (node_records, link_records) + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") + |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "linkID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + + return node_records, link_records + +# 2025/02/19 +def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: str, type: str, property: str, + bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: + """ + 根据scheme_Type和scheme_Name,查询该模拟方案中,某一node或link的某一属性值的所有时间的结果 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param ID: 元素的ID + :param type: 元素的类型,node或link + :param property: 元素的属性值 + :param bucket: 数据存储的 bucket 名称,默认值为 "scheme_simulation_result" + :param client: 已初始化的 InfluxDBClient 实例 + :return: 查询结果的列表 + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: 2025-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") + |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["ID"] == "{ID}") + |> filter(fn: (r) => r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + # 解析查询结果 + results = [] + for table in tables: + for record in table.records: + results.append({ + "time": record["_time"], + "value": record["_value"] + }) + return results + + +# 2025/02/21 +def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", + client: InfluxDBClient=client) -> tuple: + """ + 查询指定方案的所有记录,包括‘node'和‘link’,分别以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param bucket: 数据存储的 bucket 名称。 + :param client: 已初始化的 InfluxDBClient 实例。 + :return: dict: tuple: (node_records, link_records) + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: 2025-01-01T00:00:00Z) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") + |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "linkID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + + return node_records, link_records + +# 2025/02/16 +def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> None: + """ + 导出influxdb中SCADA_data这个bucket的数据到csv中 + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :param client: 已初始化的 InfluxDBClient 实例 + :return: + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + ''' + # 执行查询 + tables = query_api.query(flux_query) + # 存储查询结果 + rows = [] + for table in tables: + for record in table.records: + row = { + 'time': record.get_time(), + 'measurement': record.get_measurement(), + 'date': record.values.get('date', None), + 'description': record.values.get('description', None), + 'device_ID': record.values.get('device_ID', None), + 'monitored_value': record.get_value() if record.get_field() == 'monitored_value' else None, + 'datacleaning_value': record.get_value() if record.get_field() == 'datacleaning_value' else None, + 'simulation_value': record.get_value() if record.get_field() == 'simulation_value' else None, + } + rows.append(row) + # 动态生成 CSV 文件名 + csv_filename = f"SCADA_data_{start_date}至{end_date}.csv" + # 写入到 CSV 文件 + with open(csv_filename, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'description', 'device_ID', 'monitored_value', 'datacleaning_value', 'simulation_value']) + writer.writeheader() + writer.writerows(rows) + + print(f"Data exported to {csv_filename} successfully.") + +# 2025/02/17 +def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> None: + """ + 导出influxdb中realtime_simulation_result这个bucket的数据到csv中 + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :param client: 已初始化的 InfluxDBClient 实例 + :return: + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_link = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "link") + ''' + # 执行查询 + link_tables = query_api.query(flux_query_link) + # 存储link类的数据 + link_rows = [] + link_data = {} + for table in link_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in link_data: + link_data[key] = {} + field = record.get_field() + link_data[key][field] = record.get_value() + link_data[key]['measurement'] = record.get_measurement() + link_data[key]['date'] = record.values.get('date', None) + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_node = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "node") + ''' + # 执行查询 + node_tables = query_api.query(flux_query_node) + # 存储node类的数据 + node_rows = [] + node_data = {} + for table in node_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in node_data: + node_data[key] = {} + field = record.get_field() + node_data[key][field] = record.get_value() + node_data[key]['measurement'] = record.get_measurement() + node_data[key]['date'] = record.values.get('date', None) + + for key in set(link_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(link_data.get(key, {})) + link_rows.append(row) + for key in set(node_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(node_data.get(key, {})) + node_rows.append(row) + + # 动态生成 CSV 文件名 + csv_filename_link = f"realtime_simulation_link_result_{start_date}至{end_date}.csv" + csv_filename_node = f"realtime_simulation_node_result_{start_date}至{end_date}.csv" + # 写入到 CSV 文件 + with open(csv_filename_link, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) + writer.writeheader() + writer.writerows(link_rows) + with open(csv_filename_node, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'head', 'pressure', 'actualdemand', + 'demanddeficit', 'totalExternalOutflow', 'quality']) + writer.writeheader() + writer.writerows(node_rows) + + print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + +# 2025/02/18 +def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: + """ + 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :param client: 已初始化的 InfluxDBClient 实例 + :return: + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_link = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "link") + ''' + # 执行查询 + link_tables = query_api.query(flux_query_link) + # 存储link类的数据 + link_rows = [] + link_data = {} + for table in link_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in link_data: + link_data[key] = {} + field = record.get_field() + link_data[key][field] = record.get_value() + link_data[key]['measurement'] = record.get_measurement() + link_data[key]['date'] = record.values.get('date', None) + link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_node = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "node") + ''' + # 执行查询 + node_tables = query_api.query(flux_query_node) + # 存储node类的数据 + node_rows = [] + node_data = {} + for table in node_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in node_data: + node_data[key] = {} + field = record.get_field() + node_data[key][field] = record.get_value() + node_data[key]['measurement'] = record.get_measurement() + node_data[key]['date'] = record.values.get('date', None) + node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + + for key in set(link_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(link_data.get(key, {})) + link_rows.append(row) + for key in set(node_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(node_data.get(key, {})) + node_rows.append(row) + + # 动态生成 CSV 文件名 + csv_filename_link = f"scheme_simulation_link_result_{start_date}至{end_date}.csv" + csv_filename_node = f"scheme_simulation_node_result_{start_date}至{end_date}.csv" + # 写入到 CSV 文件 + with open(csv_filename_link, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) + writer.writeheader() + writer.writerows(link_rows) + with open(csv_filename_node, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand', + 'demanddeficit', 'totalExternalOutflow', 'quality']) + writer.writeheader() + writer.writerows(node_rows) + + print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + +# 2025/02/18 +def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: + """ + 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 + :param scheme_Type: 查询的方案类型 + :param scheme_Name: 查询的方案名 + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :param client: 已初始化的 InfluxDBClient 实例 + :return: + """ + if client.ping(): + print("{} -- Successfully connected to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print("{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + query_api = client.query_api() + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_link = f''' + from(bucket: "{bucket}") + |> range(start: 2025-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "link") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") + |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + ''' + # 执行查询 + link_tables = query_api.query(flux_query_link) + # 存储link类的数据 + link_rows = [] + link_data = {} + for table in link_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in link_data: + link_data[key] = {} + field = record.get_field() + link_data[key][field] = record.get_value() + link_data[key]['measurement'] = record.get_measurement() + link_data[key]['date'] = record.values.get('date', None) + link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_node = f''' + from(bucket: "{bucket}") + |> range(start: 2025-01-01T00:00:00Z) + |> filter(fn: (r) => r["_measurement"] == "node") + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") + |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") + ''' + # 执行查询 + node_tables = query_api.query(flux_query_node) + # 存储node类的数据 + node_rows = [] + node_data = {} + for table in node_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in node_data: + node_data[key] = {} + field = record.get_field() + node_data[key][field] = record.get_value() + node_data[key]['measurement'] = record.get_measurement() + node_data[key]['date'] = record.values.get('date', None) + node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + + for key in set(link_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(link_data.get(key, {})) + link_rows.append(row) + for key in set(node_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(node_data.get(key, {})) + node_rows.append(row) + + # 动态生成 CSV 文件名 + csv_filename_link = f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv" + csv_filename_node = f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv" + # 写入到 CSV 文件 + with open(csv_filename_link, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) + writer.writeheader() + writer.writerows(link_rows) + with open(csv_filename_node, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand', + 'demanddeficit', 'totalExternalOutflow', 'quality']) + writer.writeheader() + writer.writerows(node_rows) + + print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + # 示例调用 if __name__ == "__main__": url = influxdb_info.url @@ -1438,20 +2148,19 @@ if __name__ == "__main__": # step3: 查询测试示例 with InfluxDBClient(url=url, token=token, org=org_name) as client: - # 示例1:query_latest_record_by_ID - bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 - node_id = "ZBBDTZDP000022" # 查询的节点 ID - link_id = "ZBBGXSZW000002" - - # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client) - # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client) - - all_records = query_all_record_by_time('2025-02-09T17:30:00+08:00') - - if latest_record: - print("最新记录:", latest_record) - else: - print("未找到符合条件的记录。") + + # # 示例1:query_latest_record_by_ID + # bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 + # node_id = "ZBBDTZDP000022" # 查询的节点 ID + # link_id = "ZBBGXSZW000002" + # + # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client) + # # # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client) + # # + # if latest_record: + # print("最新记录:", latest_record) + # else: + # print("未找到符合条件的记录。") # 示例2:query_all_record_by_time # node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00") @@ -1464,5 +2173,43 @@ if __name__ == "__main__": # print(curve_result) # 示例4:query_SCADA_data_by_device_ID_and_time - # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-08T10:30:00+08:00') + # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-14T23:58:00+08:00') # print(SCADA_result_dict) + + # 示例5:query_SCADA_data_curve + # SCADA_result = query_SCADA_data_curve(api_query_id='3853', start_date='2025-02-14', end_date='2025-02-16') + # print(SCADA_result) + + # 示例6:export_SCADA_data_to_csv + # export_SCADA_data_to_csv(start_date='2025-02-13', end_date='2025-02-15') + + # 示例7:export_realtime_simulation_result_to_csv + # export_realtime_simulation_result_to_csv(start_date='2025-02-13', end_date='2025-02-15') + + # 示例8:export_scheme_simulation_result_to_csv_time + # export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15') + + # 示例9:export_scheme_simulation_result_to_csv_scheme + # export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1') + + # 示例10:query_scheme_all_record_by_time + # node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00") + # print("Node 数据:", node_records) + # print("Link 数据:", link_records) + + # 示例11:query_scheme_curve_by_ID_property + # curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', ID='ZBBDTZDP000022', + # type='node', property='head') + # print(curve_result) + + # 示例12:query_all_record_by_date + # node_records, link_records = query_all_record_by_date(query_date='2025-02-14') + # print("Node 数据:", node_records) + # print("Link 数据:", link_records) + + # 示例13:query_scheme_all_record + node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1') + print("Node 数据:", node_records) + print("Link 数据:", link_records) + +