from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from typing import List, Dict from datetime import datetime, timedelta, timezone from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS from dateutil import parser import get_realValue import get_data import psycopg import time import simulation from tjnetwork import * import schedule import threading import globals import csv import pandas as pd import openpyxl import influxdb_info import time_api # influxdb数据库连接信息 url = influxdb_info.url token = influxdb_info.token org_name = influxdb_info.org client = InfluxDBClient(url=url, token=token, org=org_name) # # 所有实时更新数据的SCADA设备的ID # flow_device_ids = ['2498', '3854', '3853'] # pressure_device_ids = ['2510', '2514'] # reservoir_liquid_level_ids = ['2497', '2571'] # tank_liquid_level_ids = ['4780', '9774'] # pump_device_ids = ['2747', '2776', '2730', '2787', '2500', '2502', '2504'] # # # 用于更改数据的SCADA设的ID # change_data_device_ids = ['2498', '3854', '3853', '2497', '2571', '4780', '9774', # '2747', '2776', '2730', '2787', '2500', '2502', '2504'] # # 全局变量,用于存储不同类型的realtime api_query_id # reservoir_liquid_level_realtime_ids = [] # tank_liquid_level_realtime_ids = [] # fixed_pump_realtime_ids = [] # variable_pump_realtime_ids = [] # source_outflow_realtime_ids = [] # pipe_flow_realtime_ids = [] # pressure_realtime_ids = [] # demand_realtime_ids = [] # quality_realtime_ids = [] # # # transmission_frequency的最大值 # transmission_frequency = None # hydraulic_timestep = None # # reservoir_liquid_level_non_realtime_ids = [] # tank_liquid_level_non_realtime_ids = [] # fixed_pump_non_realtime_ids = [] # variable_pump_non_realtime_ids = [] # source_outflow_non_realtime_ids = [] # pipe_flow_non_realtime_ids = [] # pressure_non_realtime_ids = [] # demand_non_realtime_ids = [] # quality_non_realtime_ids = [] def query_pg_scada_info_realtime(name: str) -> None: """ 查询pg数据库中,scada_info中,属于realtime的数据 :param name: 数据库名称 :return: """ # 连接数据库 conn_string = f"dbname={name} host=127.0.0.1" try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 查询 transmission_mode 为 'realtime' 的记录 cur.execute(""" SELECT type, api_query_id FROM scada_info WHERE transmission_mode = 'realtime'; """) records = cur.fetchall() # 清空全局列表 globals.reservoir_liquid_level_realtime_ids.clear() globals.tank_liquid_level_realtime_ids.clear() globals.fixed_pump_realtime_ids.clear() globals.variable_pump_realtime_ids.clear() globals.source_outflow_realtime_ids.clear() globals.pipe_flow_realtime_ids.clear() globals.pressure_realtime_ids.clear() globals.demand_realtime_ids.clear() globals.quality_realtime_ids.clear() # 根据 type 分类存储 api_query_id for record in records: record_type, api_query_id = record if api_query_id is not None: # 确保 api_query_id 不为空 if record_type == "reservoir_liquid_level": globals.reservoir_liquid_level_realtime_ids.append(api_query_id) elif record_type == "tank_liquid_level": globals.tank_liquid_level_realtime_ids.append(api_query_id) elif record_type == "fixed_pump": globals.fixed_pump_realtime_ids.append(api_query_id) elif record_type == "variable_pump": globals.variable_pump_realtime_ids.append(api_query_id) elif record_type == "source_outflow": globals.source_outflow_realtime_ids.append(api_query_id) elif record_type == "pipe_flow": globals.pipe_flow_realtime_ids.append(api_query_id) elif record_type == "pressure": globals.pressure_realtime_ids.append(api_query_id) elif record_type == "demand": globals.demand_realtime_ids.append(api_query_id) elif record_type == "quality": globals.quality_realtime_ids.append(api_query_id) # 打印结果,方便调试 # print("Query completed. Results:") # print("Reservoir Liquid Level IDs:", globals.reservoir_liquid_level_realtime_ids) # print("Tank Liquid Level IDs:", globals.tank_liquid_level_realtime_ids) # print("Fixed Pump IDs:", globals.fixed_pump_realtime_ids) # print("Variable Pump IDs:", globals.variable_pump_realtime_ids) # print("Source Outflow IDs:", globals.source_outflow_realtime_ids) # print("Pipe Flow IDs:", globals.pipe_flow_realtime_ids) # print("Pressure IDs:", globals.pressure_realtime_ids) # print("Demand IDs:", globals.demand_realtime_ids) # print("Quality IDs:", globals.quality_realtime_ids) except Exception as e: print(f"查询时发生错误:{e}") def query_pg_scada_info_non_realtime(name: str) -> None: """ 查询pg数据库中,scada_info中,属于non_realtime的数据,以及这些数据transmission_frequency的最大值 :param name: 数据库名称 :return: """ # 重新打开数据库 if is_project_open(name): close_project(name) open_project(name) dic_time = get_time(name) globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP'] close_project(name) # 连接数据库 conn_string = f"dbname={name} host=127.0.0.1" try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 查询 transmission_mode 为 'non_realtime' 的记录 cur.execute(""" SELECT type, api_query_id, transmission_frequency FROM scada_info WHERE transmission_mode = 'non_realtime'; """) records = cur.fetchall() # 清空全局列表 globals.reservoir_liquid_level_non_realtime_ids.clear() globals.fixed_pump_non_realtime_ids.clear() globals.variable_pump_non_realtime_ids.clear() globals.source_outflow_non_realtime_ids.clear() globals.pipe_flow_non_realtime_ids.clear() globals.pressure_non_realtime_ids.clear() globals.demand_non_realtime_ids.clear() globals.quality_non_realtime_ids.clear() # 用于计算 transmission_frequency 最大值 transmission_frequencies = [] # 根据 type 分类存储 api_query_id for record in records: record_type, api_query_id, freq = record if api_query_id is not None: # 确保 api_query_id 不为空 if record_type == "reservoir_liquid_level": globals.reservoir_liquid_level_non_realtime_ids.append(api_query_id) elif record_type == "fixed_pump": globals.fixed_pump_non_realtime_ids.append(api_query_id) elif record_type == "variable_pump": globals.variable_pump_non_realtime_ids.append(api_query_id) elif record_type == "source_outflow": globals.source_outflow_non_realtime_ids.append(api_query_id) elif record_type == "pipe_flow": globals.pipe_flow_non_realtime_ids.append(api_query_id) elif record_type == "pressure": globals.pressure_non_realtime_ids.append(api_query_id) elif record_type == "demand": globals.demand_non_realtime_ids.append(api_query_id) elif record_type == "quality": globals.quality_non_realtime_ids.append(api_query_id) # 收集 transmission_frequency,用于计算最大值 if freq is not None: transmission_frequencies.append(freq) # 计算 transmission_frequency 最大值 globals.transmission_frequency = max(transmission_frequencies) if transmission_frequencies else None # 打印结果,方便调试 # print("Query completed. Results:") # print("Reservoir Liquid Level Non-Realtime IDs:", globals.reservoir_liquid_level_non_realtime_ids) # print("Fixed Pump Non-Realtime IDs:", globals.fixed_pump_non_realtime_ids) # print("Variable Pump Non-Realtime IDs:", globals.variable_pump_non_realtime_ids) # print("Source Outflow Non-Realtime IDs:", globals.source_outflow_non_realtime_ids) # print("Pipe Flow Non-Realtime IDs:", globals.pipe_flow_non_realtime_ids) # print("Pressure Non-Realtime IDs:", globals.pressure_non_realtime_ids) # print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids) # print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids) # print("Maximum Transmission Frequency:", globals.transmission_frequency) print("Hydraulic Timestep:", globals.hydraulic_timestep) except Exception as e: print(f"查询时发生错误:{e}") # 2025/02/01 def delete_buckets(client: InfluxDBClient, org_name: str) -> None: """ 删除InfluxDB中指定organization下的所有buckets。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :param org_name: InfluxDB中organization的名称。 :return: None """ # 定义需要删除的 bucket 名称列表 buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result'] buckets_api = client.buckets_api() buckets_obj = buckets_api.find_buckets(org=org_name) # 确保 buckets_obj 拥有 buckets 属性 if hasattr(buckets_obj, 'buckets'): for bucket in buckets_obj.buckets: if bucket.name in buckets_to_delete: # 只删除特定名称的 bucket try: buckets_api.delete_bucket(bucket) print(f"Bucket {bucket.name} has been deleted successfully.") except Exception as e: print(f"Failed to delete bucket {bucket.name}: {e}") else: print(f"Skipping bucket {bucket.name}. Not in the deletion list.") else: print("未找到 buckets 属性,无法迭代 buckets。") # 2025/02/01 def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None: """ 初始化influxdb的三个数据存储库,分别为SCADA_data、realtime_simulation_result、scheme_simulation_result :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :param org_name: InfluxDB中organization的名称 :return: """ # 先删除原有的,然后再进行初始化 delete_buckets(client, org_name) bucket_api = BucketsApi(client) write_api = client.write_api() org_api = OrganizationsApi(client) # 获取 org_id org = next((o for o in org_api.find_organizations() if o.name == org_name), None) if not org: raise ValueError(f"Organization '{org_name}' not found.") org_id = org.id print(f"Using Organization ID: {org_id}") # 定义 Buckets 信息 buckets = [ {"name": "SCADA_data", "retention_rules": []}, {"name": "realtime_simulation_result", "retention_rules": []}, {"name": "scheme_simulation_result", "retention_rules": []} ] # 创建 Buckets 并初始化数据 for bucket in buckets: # 创建 Bucket created_bucket = bucket_api.create_bucket( bucket_name=bucket["name"], retention_rules=bucket["retention_rules"], org_id=org_id ) print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}") # 根据 Bucket 初始化数据 if bucket["name"] == "SCADA_data": point = Point("SCADA") \ .tag("date", None) \ .tag("description", None) \ .tag("device_ID", None) \ .field("monitored_value", 0.0) \ .field("datacleaning_value", 0.0) \ .field("simulation_value", 0.0) \ .time("2024-11-21T00:00:00Z") write_api.write(bucket="SCADA_data", org=org_name, record=point) print("Initialized SCADA_data with default structure.") elif bucket["name"] == "realtime_simulation_result": # realtime_simulation_result link_point = Point("link") \ .tag("date", None) \ .tag("ID", None) \ .field("flow", 0.0) \ .field("leakage", 0.0) \ .field("velocity", 0.0) \ .field("headloss", 0.0) \ .field("status", None) \ .field("setting", 0.0) \ .field("quality", 0.0) \ .field("reaction", 0.0) \ .field("friction", 0.0) \ .time("2024-11-21T00:00:00Z") node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ .field("head", 0.0) \ .field("pressure", 0.0) \ .field("actualdemand", 0.0) \ .field("demanddeficit", 0.0) \ .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point) write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point) print("Initialized realtime_simulation_result with default structure.") elif bucket["name"] == "scheme_simulation_result": link_point = Point("link") \ .tag("date", None) \ .tag("ID", None) \ .tag("scheme_Type", None) \ .tag("scheme_Name", None) \ .field("flow", 0.0) \ .field("leakage", 0.0) \ .field("velocity", 0.0) \ .field("headloss", 0.0) \ .field("status", None) \ .field("setting", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") node_point = Point("node") \ .tag("date", None) \ .tag("ID", None) \ .tag("scheme_Type", None) \ .tag("scheme_Name", None) \ .field("head", 0.0) \ .field("pressure", 0.0) \ .field("actualdemand", 0.0) \ .field("demanddeficit", 0.0) \ .field("totalExternalOutflow", 0.0) \ .field("quality", 0.0) \ .time("2024-11-21T00:00:00Z") write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point) write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point) print("Initialized scheme_simulation_result with default structure.") print("All buckets created and initialized successfully.") def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None: """ 将SCADA数据通过数据接口导入数据库 :param get_real_value_time: 获取数据的时间,格式如'2024-11-25T09:00:00+08:00' :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) write_api = client.write_api(write_options=ASYNCHRONOUS) try_count = 0 reservoir_liquid_level_realtime_data_list = [] tank_liquid_level_realtime_data_list = [] fixed_pump_realtime_data_list =[] variable_pump_realtime_data_list =[] source_outflow_realtime_data_list = [] pipe_flow_realtime_data_list = [] pressure_realtime_data_list =[] demand_realtime_data_list = [] quality_realtime_data_list = [] while try_count <= 5: # 尝试6次 ******* try: try_count += 1 if globals.reservoir_liquid_level_realtime_ids: reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue( ids=','.join(globals.reservoir_liquid_level_realtime_ids)) if globals.tank_liquid_level_realtime_ids: tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids)) if globals.fixed_pump_realtime_ids: fixed_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.fixed_pump_realtime_ids)) if globals.variable_pump_realtime_ids: variable_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.variable_pump_realtime_ids)) if globals.source_outflow_realtime_ids: source_outflow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.source_outflow_realtime_ids)) if globals.pipe_flow_realtime_ids: pipe_flow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pipe_flow_realtime_ids)) if globals.pressure_realtime_ids: pressure_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pressure_realtime_ids)) if globals.demand_realtime_ids: demand_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.demand_realtime_ids)) if globals.quality_realtime_ids: quality_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.quality_realtime_ids)) except Exception as e: print(e) time.sleep(10) else: try_count = 100 # 写入数据 if reservoir_liquid_level_realtime_data_list: for data in reservoir_liquid_level_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('reservoir_liquid_level_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if tank_liquid_level_realtime_data_list: for data in tank_liquid_level_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('tank_liquid_level_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", (monitored_value)) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if fixed_pump_realtime_data_list: for data in fixed_pump_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('fixed_pump_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if variable_pump_realtime_data_list: for data in variable_pump_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('variable_pump_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if source_outflow_realtime_data_list: for data in source_outflow_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('source_outflow_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if pipe_flow_realtime_data_list: for data in pipe_flow_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('pipe_flow_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if pressure_realtime_data_list: for data in pressure_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('pressure_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if demand_realtime_data_list: for data in demand_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('demand_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() if quality_realtime_data_list: for data in quality_realtime_data_list: # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 data_time = datetime.fromisoformat(data['time']) get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) # 将获取的时间转换为 UTC 时间 get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) # 计算时间差(绝对值) time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) # 判断时间差是否超过1分钟 if time_difference > 60: # 超过1分钟 monitored_value = None else: # 小于等于3分钟 monitored_value = data['monitored_value'] # 创建Point对象 point = ( Point('quality_realtime') .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", monitored_value) .field("datacleaning_value", None) .field("simulation_value", None) .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) write_api.flush() def convert_time_format(original_time: str) -> str: """ 格式转换,将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00” :param original_time: str, “2024-04-13T08:00:00+08:00"格式的时间 :return: str,“2024-04-13 08:00:00”格式的时间 """ new_time = original_time.replace('T', ' ') new_time = new_time.replace('+08:00', '') return new_time # 筛选符合条件的数据 def is_timestep_multiple(data_time, timestep): # 获取时间点距离当天0点的时间差 midnight = data_time.replace(hour=0, minute=0, second=0, microsecond=0) delta_since_midnight = data_time - midnight # 检查时间差是否为时间步长的整数倍 return delta_since_midnight.total_seconds() % timestep.total_seconds() == 0 # 2025/01/10 def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None: """ 获取某段时间内传回的scada数据 :param get_history_data_end_time: 获取历史数据的终止时间时间,格式如'2024-11-25T09:00:00+08:00' :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) write_api = client.write_api(write_options=SYNCHRONOUS) # 将end_date字符串转换为datetime对象 end_date_dt = datetime.strptime(convert_time_format(get_history_data_end_time), '%Y-%m-%d %H:%M:%S') end_date = end_date_dt.strftime('%Y-%m-%d %H:%M:%S') # 将transmission_frequency字符串转换为timedelta对象 transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1) get_history_data_start_time = end_date_dt - transmission_frequency_dt begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S') # print(begin_date) # print(end_date) reservoir_liquid_level_non_realtime_data_list = [] tank_liquid_level_non_realtime_data_list = [] fixed_pump_non_realtime_data_list = [] variable_pump_non_realtime_data_list = [] source_outflow_non_realtime_data_list = [] pipe_flow_non_realtime_data_list = [] pressure_non_realtime_data_list = [] demand_non_realtime_data_list = [] quality_non_realtime_data_list = [] try_count = 0 while try_count < 5: try: try_count += 1 # reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( # ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') if globals.reservoir_liquid_level_non_realtime_ids: reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') if globals.tank_liquid_level_non_realtime_ids: tank_liquid_level_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.tank_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') if globals.fixed_pump_non_realtime_ids: fixed_pump_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.fixed_pump_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') if globals.variable_pump_non_realtime_ids: variable_pump_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.variable_pump_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') if globals.source_outflow_non_realtime_ids: source_outflow_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.source_outflow_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') if globals.pipe_flow_non_realtime_ids: pipe_flow_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.pipe_flow_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') # print(pipe_flow_non_realtime_data_list) if globals.pressure_non_realtime_ids: pressure_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.pressure_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') # print(pressure_non_realtime_data_list) if globals.demand_non_realtime_ids: demand_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.demand_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') if globals.quality_non_realtime_ids: quality_non_realtime_data_list = get_data.get_history_data( ids=','.join(globals.quality_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') except Exception as e: print(f"Attempt {try_count} failed with error: {e}") if try_count < 5: print("Retrying in 10 seconds...") time.sleep(10) else: print("Max retries reached. Exiting.") else: print("Data fetched successfully.") break # 成功后退出循环 if reservoir_liquid_level_non_realtime_data_list: for data in reservoir_liquid_level_non_realtime_data_list: # 创建Point对象 point = ( Point('reservoir_liquid_level_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) if tank_liquid_level_non_realtime_data_list: for data in tank_liquid_level_non_realtime_data_list: # 创建Point对象 point = ( Point('tank_liquid_level_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) if fixed_pump_non_realtime_data_list: for data in fixed_pump_non_realtime_data_list: # 创建Point对象 point = ( Point('fixed_pump_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) if variable_pump_non_realtime_data_list: for data in variable_pump_non_realtime_data_list: # 创建Point对象 point = ( Point('variable_pump_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) if source_outflow_non_realtime_data_list: for data in source_outflow_non_realtime_data_list: # 创建Point对象 point = ( Point('source_outflow_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) # if pipe_flow_non_realtime_data_list: for data in pipe_flow_non_realtime_data_list: # 创建Point对象 point = ( Point('pipe_flow_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) if pressure_non_realtime_data_list: for data in pressure_non_realtime_data_list: # 创建Point对象 point = ( Point('pressure_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) if demand_non_realtime_data_list: for data in demand_non_realtime_data_list: # 创建Point对象 point = ( Point('demand_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) if quality_non_realtime_data_list: for data in quality_non_realtime_data_list: # 创建Point对象 point = ( Point('quality_non_realtime') .tag("date", data['time'].strftime('%Y-%m-%d')) .tag("description", data['description']) .tag("device_ID", data['device_ID']) .field("monitored_value", data['monitored_value']) .field("datacleaning_value", None) .field("simulation_value", None) .time(data['time']) ) write_api.write(bucket=bucket, org=org_name, record=point) def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]: """ 根据SCADA设备的ID和时间查询值 :param query_ids_list: SCADA设备ID的列表 :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 :param client: 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) utc_time = beijing_time.astimezone(timezone.utc) # DingZQ temp change delta to 5 from 1 utc_start_time = utc_time - timedelta(seconds=5) utc_stop_time = utc_time + timedelta(seconds=5) # 构建查询字典 SCADA_result_dict = {} for device_id in query_ids_list: # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) |> filter(fn: (r) => r["device_ID"] == "{device_id}") |> filter(fn: (r) => r["_field"] == "monitored_value") ''' # 执行查询 try: result = query_api.query(flux_query) # 从查询结果中提取 monitored_value if result: # 假设返回的结果为一行数据 for table in result: for record in table.records: # 获取字段 "_value" 即为 monitored_value monitored_value = record.get_value() SCADA_result_dict[device_id] = monitored_value else: # 如果没有结果,默认设置为 None 或其他值 SCADA_result_dict[device_id] = None except Exception as e: print(f"Error querying InfluxDB for device ID {device_id}: {e}") SCADA_result_dict[device_id] = None return SCADA_result_dict # DingZQ, 2025-02-15 def query_SCADA_data_by_device_ID_and_time_range(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list[dict[str, float]]: """ 根据SCADA设备的ID和时间查询值 :param query_ids_list: SCADA设备ID的列表 :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 :param client: 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将北京时间转换为 UTC 时间 utc_start_time = time_api.to_utc_time(start_time) utc_end_time = time_api.to_utc_time(end_time) # 构建查询字典 SCADA_results = [] for device_id in query_ids_list: # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) |> filter(fn: (r) => r["device_ID"] == "{device_id}") |> filter(fn: (r) => r["_field"] == "monitored_value") |> sort(columns: ["_time"], desc: false) ''' # 执行查询 try: result = query_api.query(flux_query) # 从查询结果中提取 monitored_value if result: # 假设返回的结果为一行数据 for table in result: for record in table.records: # 获取字段 "_value" 即为 monitored_value monitored_value = record.get_value() rec = { "ID": device_id, "time": record.get_time(), "value": monitored_value } SCADA_results.append(rec) except Exception as e: print(f"Error querying InfluxDB for device ID {device_id}: {e}") return SCADA_results # 2025/02/01 def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], result_start_time: str, bucket: str = "realtime_simulation_result", client: InfluxDBClient = client): """ 将实时模拟计算结果数据存储到 InfluxDB 的realtime_simulation_result这个bucket中。 :param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。 :param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。 :param result_start_time: (str): 计算结果的模拟开始时间。 :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_simulation_result"。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): print("{} -- store_realtime_simulation_result_to_influxdb : Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- store_realtime_simulation_result_to_influxdb : Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 开始写入数据 try: write_api = client.write_api() date_str = result_start_time.split('T')[0] time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat() for result in node_result_list: # 提取节点信息和结果数据 node_id = result.get('node') data_list = result.get('result', []) for data in data_list: # 构建 Point 数据,多个 field 存在于一个数据点中 node_point = Point("node") \ .tag("date", date_str) \ .tag("ID", node_id) \ .field("head", data.get('head', 0.0)) \ .field("pressure", data.get('pressure', 0.0)) \ .field("actualdemand", data.get('demand', 0.0)) \ .field("demanddeficit", None) \ .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ .time(time_beijing) # 写入数据到 InfluxDB,多个 field 在同一个 point 中 write_api.write(bucket=bucket, org=org_name, record=node_point) write_api.flush() print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。") for result in link_result_list: link_id = result.get('link') data_list = result.get('result', []) for data in data_list: link_point = Point("link") \ .tag("date", date_str) \ .tag("ID", link_id) \ .field("flow", data.get('flow', 0.0)) \ .field("velocity", data.get('velocity', 0.0)) \ .field("headloss", data.get('headloss', 0.0)) \ .field("quality", data.get('quality', 0.0)) \ .field("status", data.get('status', "UNKNOWN")) \ .field("setting", data.get('setting', 0.0)) \ .field("reaction", data.get('reaction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \ .time(time_beijing) write_api.write(bucket=bucket, org=org_name, record=link_point) write_api.flush() print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") # 2025/02/01 def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> dict: """ 查询指定ID的最新的一条记录 :param ID: (str): 要查询的 ID。 :param type: (str): "node"或“link”或'scada' :param bucket: (str): 数据存储的 bucket 名称。 :param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。 :return: dict: 最新记录的数据,如果没有找到则返回 None。 """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() if type == "node": flux_query = f''' from(bucket: "{bucket}") |> range(start: -7d) // 查找最近七天的记录 |> filter(fn: (r) => r["_measurement"] == "node") |> filter(fn: (r) => r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) |> group() // 将所有数据聚合到同一个 group |> sort(columns: ["_time"], desc: true) |> limit(n: 1) ''' tables = query_api.query(flux_query) # 解析查询结果 for table in tables: for record in table.records: return { "time": record["_time"], "nodeID": ID, "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], # "demanddeficit": record["demanddeficit"], # "totalExternalOutflow": record["totalExternalOutflow"], "quality": record["quality"] } elif type == "link": flux_query = f''' from(bucket: "{bucket}") |> range(start: -7d) // 查找最近七天的记录 |> filter(fn: (r) => r["_measurement"] == "link") |> filter(fn: (r) => r["ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) |> group() // 将所有数据聚合到同一个 group |> sort(columns: ["_time"], desc: true) |> limit(n: 1) ''' tables = query_api.query(flux_query) # 解析查询结果 for table in tables: for record in table.records: return { "time": record["_time"], "linkID": ID, "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], "quality": record["quality"], "status": record["status"], "setting": record["setting"], "reaction": record["reaction"], "friction": record["friction"] } elif type == "scada": flux_query = f''' from(bucket: "SCADA_data") |> range(start: -30d) // 查找最近一月的记录 |> filter(fn: (r) => r["_measurement"] == "pressure_realtime") |> filter(fn: (r) => r["device_ID"] == "{ID}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) |> sort(columns: ["_time"], desc: true) |> limit(n: 1) ''' tables = query_api.query(flux_query) # 解析查询结果 for table in tables: for record in table.records: return { "time": record["_time"], "device_ID": ID, "value": record["monitored_value"], } return None # 如果没有找到记录 # 2025/02/01 def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple: """ 查询指定北京时间的所有记录,包括 'node' 和 'link' measurement,分别以指定格式返回。 :param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: (str): 数据存储的 bucket 名称。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) utc_time = beijing_time.astimezone(timezone.utc) utc_start_time = utc_time - timedelta(seconds=1) utc_stop_time = utc_time + timedelta(seconds=1) # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) ''' # 执行查询 tables = query_api.query(flux_query) node_records = [] link_records = [] # 解析查询结果 for table in tables: for record in table.records: # print(record.values) # 打印完整记录内容 measurement = record["_measurement"] # 处理 node 数据 if measurement == "node": node_records.append({ "time": record["_time"], "ID": record["ID"], "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], "quality": record["quality"] }) # 处理 link 数据 elif measurement == "link": link_records.append({ "time": record["_time"], "linkID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], "quality": record["quality"], "status": record["status"], "setting": record["setting"], "reaction": record["reaction"], "friction": record["friction"] }) return node_records, link_records # 2025/02/21 def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple: """ 查询指定日期的所有记录,包括‘node’和‘link’,分别以指定的格式返回 :param query_date: 输入的日期,格式为‘2025-02-14’ :param bucket: 数据存储的bucket名称 :param client: 已初始化的InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() print(start_time) stop_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=17, minute=0, second=0, tzinfo=timezone.utc).isoformat() print(stop_time) # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") |> filter(fn: (r) => r["date"] == "{query_date}") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) ''' # 执行查询 tables = query_api.query(flux_query) node_records = [] link_records = [] # 解析查询结果 for table in tables: for record in table.records: # print(record.values) # 打印完整记录内容 measurement = record["_measurement"] print(measurement) # 处理 node 数据 if measurement == "node": node_records.append({ "time": record["_time"], "ID": record["ID"], "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], "quality": record["quality"] }) # 处理 link 数据 elif measurement == "link": link_records.append({ "time": record["_time"], "linkID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], "quality": record["quality"], "status": record["status"], "setting": record["setting"], "reaction": record["reaction"], "friction": record["friction"] }) return node_records, link_records # 2025/02/01 def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list: """ 根据 type 查询对应的 measurement,根据 ID 和 date 查询对应的 tag,根据 property 查询对应的 field。 :param ID: (str): 要查询的 ID(tag) :param type: (str): 查询的类型(决定 measurement) :param property: (str): 查询的字段名称(field) :param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD' :param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD' :param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_simulation_result" :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例 :return: 查询结果的列表 """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 确定 measurement if type == "node": measurement = "node" elif type == "link": measurement = "link" else: raise ValueError(f"不支持的类型: {type}") # 解析日期范围(当天的 UTC 开始和结束时间) # previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1) # start_time = previous_day.isoformat() + "T16:00:00Z" # stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z" # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["ID"] == "{ID}") |> filter(fn: (r) => r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) # 解析查询结果 results = [] for table in tables: for record in table.records: results.append({ "time": record["_time"], "value": record["_value"] }) return results def query_buckets(client: InfluxDBClient=client) -> list[str]: # 获取 Buckets API 实例 buckets_api = client.buckets_api() # 查询所有 Buckets buckets = buckets_api.find_buckets().buckets print("All Buckets:") buckets_list = [] for bucket in buckets: buckets_list.append(bucket.name) return buckets_list def query_measurements(bucket: str, client: InfluxDBClient=client) -> list[str]: query = f''' import "influxdata/influxdb/schema" schema.measurements(bucket: "{bucket}") ''' result = client.query_api().query(query) # 提取测量名称 measurements = [row.values["_value"] for table in result for row in table.records] return measurements # WMH 2025/02/13 def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], scheme_start_time: str, num_periods: int = 1, scheme_Type: str = None, scheme_Name: str = None, bucket: str = "scheme_simulation_result", client: InfluxDBClient = client): """ 将方案模拟计算结果存入 InfluxuDb 的scheme_simulation_result这个bucket中。 :param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。 :param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。 :param scheme_start_time: (str): 方案模拟开始时间。 :param num_periods: (int): 方案模拟的周期数 :param scheme_Type: (str): 方案类型 :param scheme_Name: (str): 方案名称 :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"。 :param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) try: write_api = client.write_api() date_str = scheme_start_time.split('T')[0] time_beijing = datetime.strptime(scheme_start_time, '%Y-%m-%dT%H:%M:%S%z') timestep_parts = globals.hydraulic_timestep.split(':') timestep = timedelta(hours=int(timestep_parts[0]), minutes=int(timestep_parts[1]), seconds=int(timestep_parts[2])) for node_result in node_result_list: # 提取节点信息和数据结果 node_id = node_result.get('node') # 从period 0 到 period num_period - 1 for period_index in range(num_periods): scheme_time = (time_beijing + (timestep * period_index)).isoformat() data_list = [node_result.get('result', [])[period_index]] for data in data_list: # 构建 Point 数据,多个 field 存在于一个数据点中 node_point = Point("node") \ .tag("date", date_str) \ .tag("ID", node_id) \ .tag("scheme_Type", scheme_Type) \ .tag("scheme_Name", scheme_Name) \ .field("head", data.get('head', 0.0)) \ .field("pressure", data.get('pressure', 0.0)) \ .field("actualdemand", data.get('demand', 0.0)) \ .field("demanddeficit", None) \ .field("totalExternalOutflow", None) \ .field("quality", data.get('quality', 0.0)) \ .time(scheme_time) # 写入数据到 InfluxDB,多个 field 在同一个 point 中 write_api.write(bucket=bucket, org=org_name, record=node_point) write_api.flush() for link_result in link_result_list: link_id = link_result.get('link') for period_index in range(num_periods): scheme_time = (time_beijing + (timestep * period_index)).isoformat() data_list = [link_result.get('result', [])[period_index]] for data in data_list: link_point = Point("link") \ .tag("date", date_str) \ .tag("ID", link_id) \ .tag("scheme_Type", scheme_Type) \ .tag("scheme_Name", scheme_Name) \ .field("flow", data.get('flow', 0.0)) \ .field("velocity", data.get('velocity', 0.0)) \ .field("headloss", data.get('headloss', 0.0)) \ .field("quality", data.get('quality', 0.0)) \ .field("status", data.get('status', "UNKNOWN")) \ .field("setting", data.get('setting', 0.0)) \ .field("reaction", data.get('reaction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \ .time(scheme_time) write_api.write(bucket=bucket, org=org_name, record=link_point) write_api.flush() except Exception as e: raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") # 2025/02/15 def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list: """ 根据SCADA设备的api_query_id和时间范围,查询得到曲线,查到的数据为0时区时间 :param api_query_id: SCADA设备的api_query_id :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" :param client: 已初始化的 InfluxDBClient 实例 :return: 查询结果的列表 """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["device_ID"] == "{api_query_id}") ''' # 执行查询 tables = query_api.query(flux_query) # 解析查询结果 results = [] for table in tables: for record in table.records: results.append({ "time": record["_time"], "value": record["_value"] }) return results # 2025/02/18 def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_time: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> tuple: """ 查询指定方案某一时刻的所有记录,包括‘node'和‘link’,分别以指定格式返回。 :param scheme_Type: 方案类型 :param scheme_Name: 方案名称 :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param bucket: 数据存储的 bucket 名称。 :param client: 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将北京时间转换为 UTC 时间 beijing_time = datetime.fromisoformat(query_time) utc_time = beijing_time.astimezone(timezone.utc) utc_start_time = utc_time - timedelta(seconds=1) utc_stop_time = utc_time + timedelta(seconds=1) # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) ''' # 执行查询 tables = query_api.query(flux_query) node_records = [] link_records = [] # 解析查询结果 for table in tables: for record in table.records: # print(record.values) # 打印完整记录内容 measurement = record["_measurement"] # 处理 node 数据 if measurement == "node": node_records.append({ "time": record["_time"], "ID": record["ID"], "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], "quality": record["quality"] }) # 处理 link 数据 elif measurement == "link": link_records.append({ "time": record["_time"], "linkID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], "quality": record["quality"], "status": record["status"], "setting": record["setting"], "reaction": record["reaction"], "friction": record["friction"] }) return node_records, link_records # 2025/02/19 def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: str, type: str, property: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list: """ 根据scheme_Type和scheme_Name,查询该模拟方案中,某一node或link的某一属性值的所有时间的结果 :param scheme_Type: 方案类型 :param scheme_Name: 方案名称 :param ID: 元素的ID :param type: 元素的类型,node或link :param property: 元素的属性值 :param bucket: 数据存储的 bucket 名称,默认值为 "scheme_simulation_result" :param client: 已初始化的 InfluxDBClient 实例 :return: 查询结果的列表 """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 确定 measurement if type == "node": measurement = "node" elif type == "link": measurement = "link" else: raise ValueError(f"不支持的类型: {type}") # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) |> filter(fn: (r) => r["_measurement"] == "{measurement}") |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") |> filter(fn: (r) => r["ID"] == "{ID}") |> filter(fn: (r) => r["_field"] == "{property}") ''' # 执行查询 tables = query_api.query(flux_query) # 解析查询结果 results = [] for table in tables: for record in table.records: results.append({ "time": record["_time"], "value": record["_value"] }) return results # 2025/02/21 def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> tuple: """ 查询指定方案的所有记录,包括‘node'和‘link’,分别以指定格式返回。 :param scheme_Type: 方案类型 :param scheme_Name: 方案名称 :param bucket: 数据存储的 bucket 名称。 :param client: 已初始化的 InfluxDBClient 实例。 :return: dict: tuple: (node_records, link_records) """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") |> pivot( rowKey:["_time"], columnKey:["_field"], valueColumn:"_value" ) ''' # 执行查询 tables = query_api.query(flux_query) node_records = [] link_records = [] # 解析查询结果 for table in tables: for record in table.records: # print(record.values) # 打印完整记录内容 measurement = record["_measurement"] # 处理 node 数据 if measurement == "node": node_records.append({ "time": record["_time"], "ID": record["ID"], "head": record["head"], "pressure": record["pressure"], "actualdemand": record["actualdemand"], "quality": record["quality"] }) # 处理 link 数据 elif measurement == "link": link_records.append({ "time": record["_time"], "linkID": record["ID"], "flow": record["flow"], "velocity": record["velocity"], "headloss": record["headloss"], "quality": record["quality"], "status": record["status"], "setting": record["setting"], "reaction": record["reaction"], "friction": record["friction"] }) return node_records, link_records # 2025/02/16 def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> None: """ 导出influxdb中SCADA_data这个bucket的数据到csv中 :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" :param client: 已初始化的 InfluxDBClient 实例 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句 flux_query = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) ''' # 执行查询 tables = query_api.query(flux_query) # 存储查询结果 rows = [] for table in tables: for record in table.records: row = { 'time': record.get_time(), 'measurement': record.get_measurement(), 'date': record.values.get('date', None), 'description': record.values.get('description', None), 'device_ID': record.values.get('device_ID', None), 'monitored_value': record.get_value() if record.get_field() == 'monitored_value' else None, 'datacleaning_value': record.get_value() if record.get_field() == 'datacleaning_value' else None, 'simulation_value': record.get_value() if record.get_field() == 'simulation_value' else None, } rows.append(row) # 动态生成 CSV 文件名 csv_filename = f"SCADA_data_{start_date}至{end_date}.csv" # 写入到 CSV 文件 with open(csv_filename, mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'description', 'device_ID', 'monitored_value', 'datacleaning_value', 'simulation_value']) writer.writeheader() writer.writerows(rows) print(f"Data exported to {csv_filename} successfully.") # 2025/02/17 def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> None: """ 导出influxdb中realtime_simulation_result这个bucket的数据到csv中 :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" :param client: 已初始化的 InfluxDBClient 实例 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_link = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "link") ''' # 执行查询 link_tables = query_api.query(flux_query_link) # 存储link类的数据 link_rows = [] link_data = {} for table in link_tables: for record in table.records: key = (record.get_time(), record.values.get('ID', None)) if key not in link_data: link_data[key] = {} field = record.get_field() link_data[key][field] = record.get_value() link_data[key]['measurement'] = record.get_measurement() link_data[key]['date'] = record.values.get('date', None) # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_node = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "node") ''' # 执行查询 node_tables = query_api.query(flux_query_node) # 存储node类的数据 node_rows = [] node_data = {} for table in node_tables: for record in table.records: key = (record.get_time(), record.values.get('ID', None)) if key not in node_data: node_data[key] = {} field = record.get_field() node_data[key][field] = record.get_value() node_data[key]['measurement'] = record.get_measurement() node_data[key]['date'] = record.values.get('date', None) for key in set(link_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(link_data.get(key, {})) link_rows.append(row) for key in set(node_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(node_data.get(key, {})) node_rows.append(row) # 动态生成 CSV 文件名 csv_filename_link = f"realtime_simulation_link_result_{start_date}至{end_date}.csv" csv_filename_node = f"realtime_simulation_node_result_{start_date}至{end_date}.csv" # 写入到 CSV 文件 with open(csv_filename_link, mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) writer.writeheader() writer.writerows(link_rows) with open(csv_filename_node, mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'head', 'pressure', 'actualdemand', 'demanddeficit', 'totalExternalOutflow', 'quality']) writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") # 2025/02/18 def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: """ 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" :param client: 已初始化的 InfluxDBClient 实例 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 将 start_date 的北京时间转换为 UTC 时间范围 start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_link = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "link") ''' # 执行查询 link_tables = query_api.query(flux_query_link) # 存储link类的数据 link_rows = [] link_data = {} for table in link_tables: for record in table.records: key = (record.get_time(), record.values.get('ID', None)) if key not in link_data: link_data[key] = {} field = record.get_field() link_data[key][field] = record.get_value() link_data[key]['measurement'] = record.get_measurement() link_data[key]['date'] = record.values.get('date', None) link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_node = f''' from(bucket: "{bucket}") |> range(start: {start_time}, stop: {stop_time}) |> filter(fn: (r) => r["_measurement"] == "node") ''' # 执行查询 node_tables = query_api.query(flux_query_node) # 存储node类的数据 node_rows = [] node_data = {} for table in node_tables: for record in table.records: key = (record.get_time(), record.values.get('ID', None)) if key not in node_data: node_data[key] = {} field = record.get_field() node_data[key][field] = record.get_value() node_data[key]['measurement'] = record.get_measurement() node_data[key]['date'] = record.values.get('date', None) node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) for key in set(link_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(link_data.get(key, {})) link_rows.append(row) for key in set(node_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(node_data.get(key, {})) node_rows.append(row) # 动态生成 CSV 文件名 csv_filename_link = f"scheme_simulation_link_result_{start_date}至{end_date}.csv" csv_filename_node = f"scheme_simulation_node_result_{start_date}至{end_date}.csv" # 写入到 CSV 文件 with open(csv_filename_link, mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) writer.writeheader() writer.writerows(link_rows) with open(csv_filename_node, mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand', 'demanddeficit', 'totalExternalOutflow', 'quality']) writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") # 2025/02/18 def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None: """ 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 :param scheme_Type: 查询的方案类型 :param scheme_Name: 查询的方案名 :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" :param client: 已初始化的 InfluxDBClient 实例 :return: """ if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) query_api = client.query_api() # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_link = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) |> filter(fn: (r) => r["_measurement"] == "link") |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") ''' # 执行查询 link_tables = query_api.query(flux_query_link) # 存储link类的数据 link_rows = [] link_data = {} for table in link_tables: for record in table.records: key = (record.get_time(), record.values.get('ID', None)) if key not in link_data: link_data[key] = {} field = record.get_field() link_data[key][field] = record.get_value() link_data[key]['measurement'] = record.get_measurement() link_data[key]['date'] = record.values.get('date', None) link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) # 构建 Flux 查询语句,查询指定时间范围内的数据 flux_query_node = f''' from(bucket: "{bucket}") |> range(start: 2025-01-01T00:00:00Z) |> filter(fn: (r) => r["_measurement"] == "node") |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}") |> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}") ''' # 执行查询 node_tables = query_api.query(flux_query_node) # 存储node类的数据 node_rows = [] node_data = {} for table in node_tables: for record in table.records: key = (record.get_time(), record.values.get('ID', None)) if key not in node_data: node_data[key] = {} field = record.get_field() node_data[key][field] = record.get_value() node_data[key]['measurement'] = record.get_measurement() node_data[key]['date'] = record.values.get('date', None) node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) for key in set(link_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(link_data.get(key, {})) link_rows.append(row) for key in set(node_data.keys()): row = {'time': key[0], "ID": key[1]} row.update(node_data.get(key, {})) node_rows.append(row) # 动态生成 CSV 文件名 csv_filename_link = f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv" csv_filename_node = f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv" # 写入到 CSV 文件 with open(csv_filename_link, mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) writer.writeheader() writer.writerows(link_rows) with open(csv_filename_node, mode='w', newline='') as file: writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand', 'demanddeficit', 'totalExternalOutflow', 'quality']) writer.writeheader() writer.writerows(node_rows) print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") # 示例调用 if __name__ == "__main__": url = influxdb_info.url token = influxdb_info.token org_name = influxdb_info.org # client = InfluxDBClient(url=url, token=token) # step1: 检查连接状态,初始化influxdb的buckets # try: # # delete_buckets(client, org_name) # create_and_initialize_buckets(client, org_name) # except Exception as e: # print(f"连接失败: {e}") # finally: # client.close() # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 # query_pg_scada_info_realtime('bb') # query_pg_scada_info_non_realtime('bb') # 手动执行 # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-07T16:52:00+08:00') # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00') # step3: 查询测试示例 with InfluxDBClient(url=url, token=token, org=org_name, timeout=1000*10000) as client: # create_and_initialize_buckets(client=client, org_name=org_name) # # 示例1:query_latest_record_by_ID # bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 # node_id = "ZBBDTZDP000022" # 查询的节点 ID # link_id = "ZBBGXSZW000002" # # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client) # # # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client) # # # if latest_record: # print("最新记录:", latest_record) # else: # print("未找到符合条件的记录。") # 示例2:query_all_record_by_time # node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00") # print("Node 数据:", node_records) # print("Link 数据:", link_records) # 示例3:query_curve_by_ID_property_daterange # curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head", # start_date="2024-11-25", end_date="2024-11-25") # print(curve_result) # 示例4:query_SCADA_data_by_device_ID_and_time # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-14T23:58:00+08:00') # print(SCADA_result_dict) # 示例5:query_SCADA_data_curve # SCADA_result = query_SCADA_data_curve(api_query_id='3853', start_date='2025-02-14', end_date='2025-02-16') # print(SCADA_result) # 示例6:export_SCADA_data_to_csv # export_SCADA_data_to_csv(start_date='2025-02-13', end_date='2025-02-15') # 示例7:export_realtime_simulation_result_to_csv # export_realtime_simulation_result_to_csv(start_date='2025-02-13', end_date='2025-02-15') # 示例8:export_scheme_simulation_result_to_csv_time # export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15') # 示例9:export_scheme_simulation_result_to_csv_scheme # export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1') # 示例10:query_scheme_all_record_by_time # node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00") # print("Node 数据:", node_records) # print("Link 数据:", link_records) # 示例11:query_scheme_curve_by_ID_property # curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', ID='ZBBDTZDP000022', # type='node', property='head') # print(curve_result) # 示例12:query_all_record_by_date # node_records, link_records = query_all_record_by_date(query_date='2025-02-14') # print("Node 数据:", node_records) # print("Link 数据:", link_records) # 示例13:query_scheme_all_record # node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1') # print("Node 数据:", node_records) # print("Link 数据:", link_records) results = query_all_record_by_date('2025-02-23', client=client) # results = query_all_record_by_time('2025-02-12T14:15:00+08:00', client=client) print(results)