From 0f18355a9c82ba5c604c50f2ca75cf02254673ad Mon Sep 17 00:00:00 2001 From: DingZQ Date: Mon, 24 Mar 2025 21:33:00 +0800 Subject: [PATCH] Refine --- all_auto_task.py | 24 + auto_realtime.py | 6 - auto_store_non_realtime_SCADA_data.py | 3 - influxdb_api(2).py | 2865 +++++++++++++++++++++++++ 4 files changed, 2889 insertions(+), 9 deletions(-) create mode 100644 all_auto_task.py create mode 100644 influxdb_api(2).py diff --git a/all_auto_task.py b/all_auto_task.py new file mode 100644 index 0000000..fa4f4c4 --- /dev/null +++ b/all_auto_task.py @@ -0,0 +1,24 @@ +import auto_realtime +import auto_store_non_realtime_SCADA_data +import asyncio +import influxdb_api + + +# 为了让多个任务并发运行,我们可以用 asyncio.to_thread 分别启动它们 +async def main(): + task1 = asyncio.to_thread(auto_realtime.realtime_task) + task2 = asyncio.to_thread(auto_store_non_realtime_SCADA_data.store_non_realtime_SCADA_data_task) + await asyncio.gather(task1, task2) + + +if __name__ == "__main__": + url = influxdb_info.url + token = influxdb_info.token + org_name = influxdb_info.org + + influxdb_api.query_pg_scada_info_realtime('bb') + influxdb_api.query_pg_scada_info_non_realtime('bb') + + # 用 asyncio 并发启动两个任务 + asyncio.run(main()) + \ No newline at end of file diff --git a/auto_realtime.py b/auto_realtime.py index 830e1a9..7e05416 100644 --- a/auto_realtime.py +++ b/auto_realtime.py @@ -47,7 +47,6 @@ def get_next_15minute_time() -> str: if next_15minute == 60: next_15minute = 0 now = now + timedelta(hours=1) - next_time = now.replace(minute=next_15minute, second=0, microsecond=0) return next_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') @@ -62,16 +61,13 @@ def run_simulation_job() -> None: current_time = datetime.now() if current_time.minute % 15 == 0: print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.") - # 计算前,获取scada_info中的信息,按照设定的方法修改pg数据库 simulation.query_corresponding_element_id_and_query_id("bb") simulation.query_corresponding_pattern_id_and_query_id('bb') region_result = simulation.query_non_realtime_region('bb') - globals.source_outflow_region_id = simulation.get_source_outflow_region_id('bb', region_result) globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id('bb', region_result) globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns('bb') - globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns('bb', region_result) globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns('bb', globals.source_outflow_region_id, @@ -97,7 +93,6 @@ def realtime_task() -> None: now = datetime.now() wait_seconds = 60 - now.second time.sleep(wait_seconds) - # 使用 .at(":00") 指定在每分钟的第0秒执行 schedule.every(1).minute.at(":00").do(store_realtime_SCADA_data_job) # 每15分钟执行一次run_simulation_job @@ -114,7 +109,6 @@ if __name__ == "__main__": org_name = influxdb_info.org client = InfluxDBClient(url=url, token=token) - # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 influxdb_api.query_pg_scada_info_realtime('bb') # 自动执行 diff --git a/auto_store_non_realtime_SCADA_data.py b/auto_store_non_realtime_SCADA_data.py index addfd7f..81af4a0 100644 --- a/auto_store_non_realtime_SCADA_data.py +++ b/auto_store_non_realtime_SCADA_data.py @@ -61,7 +61,6 @@ def store_non_realtime_SCADA_data_job() -> None: print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping store non realtime SCADA data task.") - # 2025/02/06 def store_non_realtime_SCADA_data_task() -> None: """ @@ -76,7 +75,6 @@ def store_non_realtime_SCADA_data_task() -> None: try: # 每分钟检查一次,执行store_non_realtime_SCADA_data_job schedule.every(1).minute.at(":00").do(store_non_realtime_SCADA_data_job) - # 持续执行任务,检查是否有待执行的任务 while True: schedule.run_pending() # 执行所有待处理的定时任务 @@ -92,7 +90,6 @@ if __name__ == "__main__": org_name = influxdb_info.org client = InfluxDBClient(url=url, token=token) - # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 influxdb_api.query_pg_scada_info_non_realtime('bb') # 自动执行 diff --git a/influxdb_api(2).py b/influxdb_api(2).py new file mode 100644 index 0000000..e30c710 --- /dev/null +++ b/influxdb_api(2).py @@ -0,0 +1,2865 @@ +from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi, WriteOptions +from typing import List, Dict +from datetime import datetime, timedelta, timezone +from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS +from dateutil import parser +import get_realValue +import get_data +import psycopg +import time +import simulation +from tjnetwork import * +import schedule +import threading +import globals +import csv +import pandas as pd +import openpyxl +import pytz + + +# influxdb数据库连接信息 +url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 +token = "MhJDl7odKW-y6wNXXUhUMRJ9oPzOvEe52E4NYD5GXtAAMV7BoHMFdet6HqUOt4DjZ-syKjwGao_k0ZIcgrGAPA==" # 替换为你的InfluxDB Token +org_name = "beibei" # 替换为你的Organization名称 + + +def query_pg_scada_info_realtime(name: str) -> None: + """ + 查询pg数据库中,scada_info中,属于realtime的数据 + :param name: 数据库名称 + :return: + """ + # 连接数据库 + conn_string = f"dbname={name} host=127.0.0.1" + try: + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 查询 transmission_mode 为 'realtime' 的记录 + cur.execute(""" + SELECT type, api_query_id + FROM scada_info + WHERE transmission_mode = 'realtime'; + """) + records = cur.fetchall() + # 清空全局列表 + globals.reservoir_liquid_level_realtime_ids.clear() + globals.tank_liquid_level_realtime_ids.clear() + globals.fixed_pump_realtime_ids.clear() + globals.variable_pump_realtime_ids.clear() + globals.source_outflow_realtime_ids.clear() + globals.pipe_flow_realtime_ids.clear() + globals.pressure_realtime_ids.clear() + globals.demand_realtime_ids.clear() + globals.quality_realtime_ids.clear() + # 根据 type 分类存储 api_query_id + for record in records: + record_type, api_query_id = record + if api_query_id is not None: # 确保 api_query_id 不为空 + if record_type == "reservoir_liquid_level": + globals.reservoir_liquid_level_realtime_ids.append(api_query_id) + elif record_type == "tank_liquid_level": + globals.tank_liquid_level_realtime_ids.append(api_query_id) + elif record_type == "fixed_pump": + globals.fixed_pump_realtime_ids.append(api_query_id) + elif record_type == "variable_pump": + globals.variable_pump_realtime_ids.append(api_query_id) + elif record_type == "source_outflow": + globals.source_outflow_realtime_ids.append(api_query_id) + elif record_type == "pipe_flow": + globals.pipe_flow_realtime_ids.append(api_query_id) + elif record_type == "pressure": + globals.pressure_realtime_ids.append(api_query_id) + elif record_type == "demand": + globals.demand_realtime_ids.append(api_query_id) + elif record_type == "quality": + globals.quality_realtime_ids.append(api_query_id) + # 打印结果,方便调试 + # print("Query completed. Results:") + # print("Reservoir Liquid Level IDs:", globals.reservoir_liquid_level_realtime_ids) + # print("Tank Liquid Level IDs:", globals.tank_liquid_level_realtime_ids) + # print("Fixed Pump IDs:", globals.fixed_pump_realtime_ids) + # print("Variable Pump IDs:", globals.variable_pump_realtime_ids) + # print("Source Outflow IDs:", globals.source_outflow_realtime_ids) + # print("Pipe Flow IDs:", globals.pipe_flow_realtime_ids) + # print("Pressure IDs:", globals.pressure_realtime_ids) + # print("Demand IDs:", globals.demand_realtime_ids) + # print("Quality IDs:", globals.quality_realtime_ids) + except Exception as e: + print(f"查询时发生错误:{e}") + + +def query_pg_scada_info_non_realtime(name: str) -> None: + """ + 查询pg数据库中,scada_info中,属于non_realtime的数据,以及这些数据transmission_frequency的最大值 + :param name: 数据库名称 + :return: + """ + # 重新打开数据库 + if is_project_open(name): + close_project(name) + open_project(name) + dic_time = get_time(name) + globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP'] + close_project(name) + # 连接数据库 + conn_string = f"dbname={name} host=127.0.0.1" + try: + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 查询 transmission_mode 为 'non_realtime' 的记录 + cur.execute(""" + SELECT type, api_query_id, transmission_frequency + FROM scada_info + WHERE transmission_mode = 'non_realtime'; + """) + records = cur.fetchall() + # 清空全局列表 + globals.reservoir_liquid_level_non_realtime_ids.clear() + globals.fixed_pump_non_realtime_ids.clear() + globals.variable_pump_non_realtime_ids.clear() + globals.source_outflow_non_realtime_ids.clear() + globals.pipe_flow_non_realtime_ids.clear() + globals.pressure_non_realtime_ids.clear() + globals.demand_non_realtime_ids.clear() + globals.quality_non_realtime_ids.clear() + # 用于计算 transmission_frequency 最大值 + transmission_frequencies = [] + # 根据 type 分类存储 api_query_id + for record in records: + record_type, api_query_id, freq = record + if api_query_id is not None: # 确保 api_query_id 不为空 + if record_type == "reservoir_liquid_level": + globals.reservoir_liquid_level_non_realtime_ids.append(api_query_id) + elif record_type == "fixed_pump": + globals.fixed_pump_non_realtime_ids.append(api_query_id) + elif record_type == "variable_pump": + globals.variable_pump_non_realtime_ids.append(api_query_id) + elif record_type == "source_outflow": + globals.source_outflow_non_realtime_ids.append(api_query_id) + elif record_type == "pipe_flow": + globals.pipe_flow_non_realtime_ids.append(api_query_id) + elif record_type == "pressure": + globals.pressure_non_realtime_ids.append(api_query_id) + elif record_type == "demand": + globals.demand_non_realtime_ids.append(api_query_id) + elif record_type == "quality": + globals.quality_non_realtime_ids.append(api_query_id) + # 收集 transmission_frequency,用于计算最大值 + if freq is not None: + transmission_frequencies.append(freq) + # 计算 transmission_frequency 最大值 + globals.transmission_frequency = max(transmission_frequencies) if transmission_frequencies else None + # 打印结果,方便调试 + # print("Query completed. Results:") + # print("Reservoir Liquid Level Non-Realtime IDs:", globals.reservoir_liquid_level_non_realtime_ids) + # print("Fixed Pump Non-Realtime IDs:", globals.fixed_pump_non_realtime_ids) + # print("Variable Pump Non-Realtime IDs:", globals.variable_pump_non_realtime_ids) + # print("Source Outflow Non-Realtime IDs:", globals.source_outflow_non_realtime_ids) + # print("Pipe Flow Non-Realtime IDs:", globals.pipe_flow_non_realtime_ids) + # print("Pressure Non-Realtime IDs:", globals.pressure_non_realtime_ids) + # print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids) + # print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids) + # print("Maximum Transmission Frequency:", globals.transmission_frequency) + # print("Hydraulic Timestep:", globals.hydraulic_timestep) + except Exception as e: + print(f"查询时发生错误:{e}") + + +# 2025/03/23 +def get_new_client() -> InfluxDBClient: + """每次调用返回一个新的 InfluxDBClient 实例。""" + return InfluxDBClient(url=url, token=token, org=org_name) + + +# 2025/02/01 +def delete_buckets(org_name: str) -> None: + """ + 删除InfluxDB中指定organization下的所有buckets。 + :param org_name: InfluxDB中organization的名称。 + :return: None + """ + client = get_new_client() + # 定义需要删除的 bucket 名称列表 + buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result'] + buckets_api = client.buckets_api() + buckets_obj = buckets_api.find_buckets(org=org_name) + # 确保 buckets_obj 拥有 buckets 属性 + if hasattr(buckets_obj, 'buckets'): + for bucket in buckets_obj.buckets: + if bucket.name in buckets_to_delete: # 只删除特定名称的 bucket + try: + buckets_api.delete_bucket(bucket) + print(f"Bucket {bucket.name} has been deleted successfully.") + except Exception as e: + print(f"Failed to delete bucket {bucket.name}: {e}") + else: + print(f"Skipping bucket {bucket.name}. Not in the deletion list.") + else: + print("未找到 buckets 属性,无法迭代 buckets。") + client.close() + + +# 2025/02/01 +def create_and_initialize_buckets(org_name: str) -> None: + """ + 初始化influxdb的三个数据存储库,分别为SCADA_data、realtime_simulation_result、scheme_simulation_result + :param org_name: InfluxDB中organization的名称 + :return: + """ + client = get_new_client() + # 先删除原有的,然后再进行初始化 + delete_buckets(org_name) + bucket_api = BucketsApi(client) + write_api = client.write_api() + org_api = OrganizationsApi(client) + # 获取 org_id + org = next((o for o in org_api.find_organizations() if o.name == org_name), None) + if not org: + raise ValueError(f"Organization '{org_name}' not found.") + org_id = org.id + print(f"Using Organization ID: {org_id}") + # 定义 Buckets 信息 + buckets = [ + {"name": "SCADA_data", "retention_rules": []}, + {"name": "realtime_simulation_result", "retention_rules": []}, + {"name": "scheme_simulation_result", "retention_rules": []} + ] + # 创建一个临时存储点数据的列表 + points_to_write = [] + # 创建 Buckets 并初始化数据 + for bucket in buckets: + # 创建 Bucket + created_bucket = bucket_api.create_bucket( + bucket_name=bucket["name"], + retention_rules=bucket["retention_rules"], + org_id=org_id + ) + print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}") + # 根据 Bucket 初始化数据 + if bucket["name"] == "SCADA_data": + point = Point("SCADA") \ + .tag("date", None) \ + .tag("description", None) \ + .tag("device_ID", None) \ + .field("monitored_value", 0.0) \ + .field("datacleaning_value", 0.0) \ + .field("simulation_value", None) \ + .time("2024-11-21T00:00:00Z", write_precision='s') + points_to_write.append(point) + # write_api.write(bucket="SCADA_data", org=org_name, record=point) + # print("Initialized SCADA_data with default structure.") + elif bucket["name"] == "realtime_simulation_result": # realtime_simulation_result + link_point = Point("link") \ + .tag("date", None) \ + .tag("ID", None) \ + .field("flow", 0.0) \ + .field("leakage", 0.0) \ + .field("velocity", 0.0) \ + .field("headloss", 0.0) \ + .field("status", None) \ + .field("setting", 0.0) \ + .field("quality", 0.0) \ + .field("reaction", 0.0) \ + .field("friction", 0.0) \ + .time("2024-11-21T00:00:00Z", write_precision='s') + points_to_write.append(link_point) + node_point = Point("node") \ + .tag("date", None) \ + .tag("ID", None) \ + .field("head", 0.0) \ + .field("pressure", 0.0) \ + .field("actualdemand", 0.0) \ + .field("demanddeficit", 0.0) \ + .field("totalExternalOutflow", 0.0) \ + .field("quality", 0.0) \ + .time("2024-11-21T00:00:00Z", write_precision='s') + points_to_write.append(node_point) + # write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point) + # write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point) + # print("Initialized realtime_simulation_result with default structure.") + elif bucket["name"] == "scheme_simulation_result": + link_point = Point("link") \ + .tag("date", None) \ + .tag("ID", None) \ + .tag("scheme_Type", None) \ + .tag("scheme_Name", None) \ + .field("flow", 0.0) \ + .field("leakage", 0.0) \ + .field("velocity", 0.0) \ + .field("headloss", 0.0) \ + .field("status", None) \ + .field("setting", 0.0) \ + .field("quality", 0.0) \ + .time("2024-11-21T00:00:00Z", write_precision='s') + points_to_write.append(link_point) + node_point = Point("node") \ + .tag("date", None) \ + .tag("ID", None) \ + .tag("scheme_Type", None) \ + .tag("scheme_Name", None) \ + .field("head", 0.0) \ + .field("pressure", 0.0) \ + .field("actualdemand", 0.0) \ + .field("demanddeficit", 0.0) \ + .field("totalExternalOutflow", 0.0) \ + .field("quality", 0.0) \ + .time("2024-11-21T00:00:00Z", write_precision='s') + points_to_write.append(node_point) + SCADA_point = Point("SCADA") \ + .tag("date", None) \ + .tag("description", None) \ + .tag("device_ID", None) \ + .tag("scheme_Type", None) \ + .tag("scheme_Name", None) \ + .field("monitored_value", 0.0) \ + .field("datacleaning_value", 0.0) \ + .field("scheme_simulation_value", None) \ + .time("2024-11-21T00:00:00Z", write_precision='s') + points_to_write.append(SCADA_point) + # write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point) + # write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point) + # write_api.write(bucket="scheme_simulation_result", org=org_name, record=SCADA_point) + # print("Initialized scheme_simulation_result with default structure.") + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + print("All buckets created and initialized successfully.") + client.close() + + +def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str = "SCADA_data") -> None: + """ + 将SCADA数据通过数据接口导入数据库 + :param get_real_value_time: 获取数据的时间,格式如'2024-11-25T09:00:00+08:00' + :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + + try_count = 0 + reservoir_liquid_level_realtime_data_list = [] + tank_liquid_level_realtime_data_list = [] + fixed_pump_realtime_data_list =[] + variable_pump_realtime_data_list =[] + source_outflow_realtime_data_list = [] + pipe_flow_realtime_data_list = [] + pressure_realtime_data_list =[] + demand_realtime_data_list = [] + quality_realtime_data_list = [] + while try_count <= 5: # 尝试6次 ******* + try: + try_count += 1 + if globals.reservoir_liquid_level_realtime_ids: + # print(globals.reservoir_liquid_level_realtime_ids) + reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue( + ids=','.join(globals.reservoir_liquid_level_realtime_ids)) + # print(reservoir_liquid_level_realtime_data_list) + if globals.tank_liquid_level_realtime_ids: + tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids)) + if globals.fixed_pump_realtime_ids: + fixed_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.fixed_pump_realtime_ids)) + if globals.variable_pump_realtime_ids: + variable_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.variable_pump_realtime_ids)) + if globals.source_outflow_realtime_ids: + source_outflow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.source_outflow_realtime_ids)) + if globals.pipe_flow_realtime_ids: + pipe_flow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pipe_flow_realtime_ids)) + if globals.pressure_realtime_ids: + pressure_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pressure_realtime_ids)) + if globals.demand_realtime_ids: + demand_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.demand_realtime_ids)) + if globals.quality_realtime_ids: + quality_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.quality_realtime_ids)) + except Exception as e: + print(e) + time.sleep(10) + else: + try_count = 100 + # 写入数据 + if reservoir_liquid_level_realtime_data_list: + for data in reservoir_liquid_level_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过3分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('reservoir_liquid_level_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if tank_liquid_level_realtime_data_list: + for data in tank_liquid_level_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('tank_liquid_level_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", (monitored_value)) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if fixed_pump_realtime_data_list: + for data in fixed_pump_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('fixed_pump_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if variable_pump_realtime_data_list: + for data in variable_pump_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('variable_pump_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if source_outflow_realtime_data_list: + for data in source_outflow_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('source_outflow_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if pipe_flow_realtime_data_list: + for data in pipe_flow_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('pipe_flow_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if pressure_realtime_data_list: + for data in pressure_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('pressure_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if demand_realtime_data_list: + for data in demand_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('demand_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + if quality_realtime_data_list: + for data in quality_realtime_data_list: + # 将 data['time'] 和 get_realValue_time 转换为 datetime 对象 + data_time = datetime.fromisoformat(data['time']) + get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None) + # 将获取的时间转换为 UTC 时间 + get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc) + # 计算时间差(绝对值) + time_difference = abs((data_time - get_real_value_time_dt).total_seconds()) + # 判断时间差是否超过1分钟 + if time_difference > 60: # 超过1分钟 + monitored_value = None + else: # 小于等于3分钟 + monitored_value = data['monitored_value'] + # 创建Point对象 + point = ( + Point('quality_realtime') + .tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", monitored_value) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(get_real_value_time_utc, write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # write_api.flush() + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + client.close() + + +def convert_time_format(original_time: str) -> str: + """ + 格式转换,将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00” + :param original_time: str, “2024-04-13T08:00:00+08:00"格式的时间 + :return: str,“2024-04-13 08:00:00”格式的时间 + """ + new_time = original_time.replace('T', ' ') + new_time = new_time.replace('+08:00', '') + return new_time + + +# 2025/01/10 +def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bucket: str = "SCADA_data") -> None: + """ + 获取某段时间内传回的scada数据 + :param get_history_data_end_time: 获取历史数据的终止时间时间,格式如'2024-11-25T09:00:00+08:00' + :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + + # 将end_date字符串转换为datetime对象 + end_date_dt = datetime.strptime(convert_time_format(get_history_data_end_time), '%Y-%m-%d %H:%M:%S') + end_date = end_date_dt.strftime('%Y-%m-%d %H:%M:%S') + # 将transmission_frequency字符串转换为timedelta对象 + transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1) + get_history_data_start_time = end_date_dt - transmission_frequency_dt + begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S') + # print(begin_date) + # print(end_date) + reservoir_liquid_level_non_realtime_data_list = [] + tank_liquid_level_non_realtime_data_list = [] + fixed_pump_non_realtime_data_list = [] + variable_pump_non_realtime_data_list = [] + source_outflow_non_realtime_data_list = [] + pipe_flow_non_realtime_data_list = [] + pressure_non_realtime_data_list = [] + demand_non_realtime_data_list = [] + quality_non_realtime_data_list = [] + try_count = 0 + while try_count < 5: + try: + try_count += 1 + # reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( + # ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') + if globals.reservoir_liquid_level_non_realtime_ids: + reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.reservoir_liquid_level_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.tank_liquid_level_non_realtime_ids: + tank_liquid_level_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.tank_liquid_level_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.fixed_pump_non_realtime_ids: + fixed_pump_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.fixed_pump_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.variable_pump_non_realtime_ids: + variable_pump_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.variable_pump_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.source_outflow_non_realtime_ids: + source_outflow_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.source_outflow_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.pipe_flow_non_realtime_ids: + pipe_flow_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pipe_flow_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # print(pipe_flow_non_realtime_data_list) + if globals.pressure_non_realtime_ids: + pressure_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pressure_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # print(pressure_non_realtime_data_list) + if globals.demand_non_realtime_ids: + demand_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.demand_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.quality_non_realtime_ids: + quality_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.quality_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + except Exception as e: + print(f"Attempt {try_count} failed with error: {e}") + if try_count < 5: + print("Retrying in 10 seconds...") + time.sleep(10) + else: + print("Max retries reached. Exiting.") + else: + print("Data fetched successfully.") + break # 成功后退出循环 + if reservoir_liquid_level_non_realtime_data_list: + for data in reservoir_liquid_level_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('reservoir_liquid_level_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if tank_liquid_level_non_realtime_data_list: + for data in tank_liquid_level_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('tank_liquid_level_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if fixed_pump_non_realtime_data_list: + for data in fixed_pump_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('fixed_pump_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if variable_pump_non_realtime_data_list: + for data in variable_pump_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('variable_pump_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if source_outflow_non_realtime_data_list: + for data in source_outflow_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('source_outflow_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if pipe_flow_non_realtime_data_list: + for data in pipe_flow_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('pipe_flow_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if pressure_non_realtime_data_list: + for data in pressure_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('pressure_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if demand_non_realtime_data_list: + for data in demand_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('demand_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if quality_non_realtime_data_list: + for data in quality_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('quality_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + client.close() + + +# 2025/03/01 +def download_history_data_manually(begin_time: str, end_time: str, bucket: str = "SCADA_data") -> None: + """ + 获取某个时间段内所有SCADA设备的历史数据,非实时执行,手动补充数据版 + :param begin_time: 获取历史数据的开始时间,格式如'2024-11-25T09:00:00+08:00' + :param end_time: 获取历史数据的结束时间,格式如'2024-11-25T09:00:00+08:00' + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + + begin_date = convert_time_format(begin_time) + end_date = convert_time_format(end_time) + + reservoir_liquid_level_realtime_data_list = [] + tank_liquid_level_realtime_data_list = [] + fixed_pump_realtime_data_list =[] + variable_pump_realtime_data_list =[] + source_outflow_realtime_data_list = [] + pipe_flow_realtime_data_list = [] + pressure_realtime_data_list =[] + demand_realtime_data_list = [] + quality_realtime_data_list = [] + + reservoir_liquid_level_non_realtime_data_list = [] + tank_liquid_level_non_realtime_data_list = [] + fixed_pump_non_realtime_data_list = [] + variable_pump_non_realtime_data_list = [] + source_outflow_non_realtime_data_list = [] + pipe_flow_non_realtime_data_list = [] + pressure_non_realtime_data_list = [] + demand_non_realtime_data_list = [] + quality_non_realtime_data_list = [] + + try_count = 0 + while try_count < 5: + try: + try_count += 1 + if globals.reservoir_liquid_level_realtime_ids: + reservoir_liquid_level_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.reservoir_liquid_level_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.tank_liquid_level_realtime_ids: + tank_liquid_level_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.tank_liquid_level_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.fixed_pump_realtime_ids: + fixed_pump_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.fixed_pump_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.variable_pump_realtime_ids: + variable_pump_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.variable_pump_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.source_outflow_realtime_ids: + source_outflow_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.source_outflow_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.pipe_flow_realtime_ids: + pipe_flow_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pipe_flow_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.pressure_realtime_ids: + pressure_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pressure_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.demand_realtime_ids: + demand_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.demand_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.quality_realtime_ids: + quality_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.quality_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( + # ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') + if globals.reservoir_liquid_level_non_realtime_ids: + reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.reservoir_liquid_level_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.tank_liquid_level_non_realtime_ids: + tank_liquid_level_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.tank_liquid_level_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.fixed_pump_non_realtime_ids: + fixed_pump_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.fixed_pump_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.variable_pump_non_realtime_ids: + variable_pump_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.variable_pump_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.source_outflow_non_realtime_ids: + source_outflow_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.source_outflow_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.pipe_flow_non_realtime_ids: + pipe_flow_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pipe_flow_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # print(pipe_flow_non_realtime_data_list) + if globals.pressure_non_realtime_ids: + pressure_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.pressure_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + # print(pressure_non_realtime_data_list) + if globals.demand_non_realtime_ids: + demand_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.demand_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + if globals.quality_non_realtime_ids: + quality_non_realtime_data_list = get_data.get_history_data( + ids=','.join(globals.quality_non_realtime_ids), + begin_date=begin_date, end_date=end_date, + downsample='1m') + except Exception as e: + print(f"Attempt {try_count} failed with error: {e}") + if try_count < 5: + print("Retrying in 10 seconds...") + time.sleep(10) + else: + print("Max retries reached. Exiting.") + else: + print("Data fetched successfully.") + break # 成功后退出循环 + + if reservoir_liquid_level_realtime_data_list: + for data in reservoir_liquid_level_realtime_data_list: + # 创建Point对象 + point = ( + Point('reservoir_liquid_level_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if tank_liquid_level_realtime_data_list: + for data in tank_liquid_level_realtime_data_list: + # 创建Point对象 + point = ( + Point('tank_liquid_level_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if fixed_pump_realtime_data_list: + for data in fixed_pump_realtime_data_list: + # 创建Point对象 + point = ( + Point('fixed_pump_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if variable_pump_realtime_data_list: + for data in variable_pump_realtime_data_list: + # 创建Point对象 + point = ( + Point('variable_pump_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if source_outflow_realtime_data_list: + for data in source_outflow_realtime_data_list: + # 创建Point对象 + point = ( + Point('source_outflow_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if pipe_flow_realtime_data_list: + for data in pipe_flow_realtime_data_list: + # 创建Point对象 + point = ( + Point('pipe_flow_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if pressure_realtime_data_list: + for data in pressure_realtime_data_list: + # 创建Point对象 + point = ( + Point('pressure_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if demand_realtime_data_list: + for data in demand_realtime_data_list: + # 创建Point对象 + point = ( + Point('demand_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if quality_realtime_data_list: + for data in quality_realtime_data_list: + # 创建Point对象 + point = ( + Point('quality_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if reservoir_liquid_level_non_realtime_data_list: + for data in reservoir_liquid_level_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('reservoir_liquid_level_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if tank_liquid_level_non_realtime_data_list: + for data in tank_liquid_level_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('tank_liquid_level_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if fixed_pump_non_realtime_data_list: + for data in fixed_pump_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('fixed_pump_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if variable_pump_non_realtime_data_list: + for data in variable_pump_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('variable_pump_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if source_outflow_non_realtime_data_list: + for data in source_outflow_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('source_outflow_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if pipe_flow_non_realtime_data_list: + for data in pipe_flow_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('pipe_flow_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if pressure_non_realtime_data_list: + for data in pressure_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('pressure_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if demand_non_realtime_data_list: + for data in demand_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('demand_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + if quality_non_realtime_data_list: + for data in quality_non_realtime_data_list: + # 创建Point对象 + point = ( + Point('quality_non_realtime') + .tag("date", data['time'].strftime('%Y-%m-%d')) + .tag("description", data['description']) + .tag("device_ID", data['device_ID']) + .field("monitored_value", data['monitored_value']) + .field("datacleaning_value", None) + .field("simulation_value", None) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + client.close() + + +def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data") -> Dict[str, float]: + """ + 根据SCADA设备的ID和时间查询值 + :param query_ids_list: SCADA设备ID的列表 + :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建查询字典 + SCADA_result_dict = {} + for device_id in query_ids_list: + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value") + ''' + # 执行查询 + try: + result = query_api.query(flux_query) + # 从查询结果中提取 monitored_value + if result: + # 假设返回的结果为一行数据 + for table in result: + for record in table.records: + # 获取字段 "_value" 即为 monitored_value + monitored_value = record.get_value() + SCADA_result_dict[device_id] = monitored_value + else: + # 如果没有结果,默认设置为 None 或其他值 + SCADA_result_dict[device_id] = None + except Exception as e: + print(f"Error querying InfluxDB for device ID {device_id}: {e}") + SCADA_result_dict[device_id] = None + client.close() + return SCADA_result_dict + +# 2025/03/14 +def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): + """ + 查询指定时间范围内,多个SCADA设备的数据,用于漏损定位 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + beijing_start_time = datetime.fromisoformat(start_time) + utc_start_time = beijing_start_time.astimezone(timezone.utc) - timedelta(seconds=1) + print(utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + utc_end_time = beijing_end_time.astimezone(timezone.utc) + timedelta(seconds=1) + print(utc_end_time) + SCADA_dict = {} + for device_id in query_ids_list: + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "SCADA_data" and r["device_ID"] = {device_id} and r["_field"] == "monitored_value") + |> pivot(rowKey: ["_time"], columnKey: ["device_ID"], valueColumn: "_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list + client.close() + return SCADA_dict + + +# 2025/02/01 +def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], + result_start_time: str, bucket: str = "realtime_simulation_result"): + """ + 将实时模拟计算结果数据存储到 InfluxDB 的realtime_simulation_result这个bucket中。 + :param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。 + :param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。 + :param result_start_time: (str): 计算结果的模拟开始时间。 + :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_simulation_result"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + # 开始写入数据 + try: + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + date_str = result_start_time.split('T')[0] + time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat() + for result in node_result_list: + # 提取节点信息和结果数据 + node_id = result.get('node') + data_list = result.get('result', []) + for data in data_list: + # 构建 Point 数据,多个 field 存在于一个数据点中 + node_point = Point("node") \ + .tag("date", date_str) \ + .tag("ID", node_id) \ + .field("head", data.get('head', 0.0)) \ + .field("pressure", data.get('pressure', 0.0)) \ + .field("actualdemand", data.get('demand', 0.0)) \ + .field("demanddeficit", None) \ + .field("totalExternalOutflow", None) \ + .field("quality", data.get('quality', 0.0)) \ + .time(time_beijing, write_precision='s') + points_to_write.append(node_point) + # 写入数据到 InfluxDB,多个 field 在同一个 point 中 + # write_api.write(bucket=bucket, org=org_name, record=node_point) + # write_api.flush() + # print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。") + for result in link_result_list: + link_id = result.get('link') + data_list = result.get('result', []) + for data in data_list: + link_point = Point("link") \ + .tag("date", date_str) \ + .tag("ID", link_id) \ + .field("flow", data.get('flow', 0.0)) \ + .field("velocity", data.get('velocity', 0.0)) \ + .field("headloss", data.get('headloss', 0.0)) \ + .field("quality", data.get('quality', 0.0)) \ + .field("status", data.get('status', "UNKNOWN")) \ + .field("setting", data.get('setting', 0.0)) \ + .field("reaction", data.get('reaction', 0.0)) \ + .field("friction", data.get('friction', 0.0)) \ + .time(time_beijing, write_precision='s') + points_to_write.append(link_point) + # write_api.write(bucket=bucket, org=org_name, record=link_point) + # write_api.flush() + # print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + except Exception as e: + raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") + client.close() + + +# 2025/02/01 +def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulation_result") -> dict: + """ + 查询指定ID的最新的一条记录 + :param ID: (str): 要查询的 ID。 + :param type: (str): "node"或“link” + :param bucket: (str): 数据存储的 bucket 名称。 + :return: dict: 最新记录的数据,如果没有找到则返回 None。 + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + if type == "node": + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: -1d, stop: now()) // 查找最近七天的记录 + |> filter(fn: (r) => r["_measurement"] == "node" and r["ID"] == "{ID}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + |> group() // 将所有数据聚合到同一个 group + |> sort(columns: ["_time"], desc: true) + |> limit(n: 1) + ''' + tables = query_api.query(flux_query) + # 解析查询结果 + for table in tables: + for record in table.records: + return { + "time": record["_time"], + "ID": ID, + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + # "demanddeficit": record["demanddeficit"], + # "totalExternalOutflow": record["totalExternalOutflow"], + "quality": record["quality"] + } + elif type == "link": + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: -1d, stop: now()) // 查找最近七天的记录 + |> filter(fn: (r) => r["_measurement"] == "link" and r["ID"] == "{ID}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + |> group() // 将所有数据聚合到同一个 group + |> sort(columns: ["_time"], desc: true) + |> limit(n: 1) + ''' + tables = query_api.query(flux_query) + # 解析查询结果 + for table in tables: + for record in table.records: + return { + "time": record["_time"], + "ID": ID, + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + } + client.close() + return None # 如果没有找到记录 + + +# 2025/02/01 +def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_result") -> tuple: + """ + 查询指定北京时间的所有记录,包括 'node' 和 'link' measurement,分别以指定格式返回。 + :param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: (str): 数据存储的 bucket 名称。 + :return: dict: tuple: (node_records, link_records) + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "ID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + client.close() + return node_records, link_records + + +# 2025/03/03 +def query_all_record_by_time_property(query_time: str, type: str, property: str, bucket: str="realtime_simulation_result") -> list: + """ + 查询指定北京时间的所有记录,查询 'node' 或 'link' 的某一属性值,以指定格式返回。 + :param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param type: (str): 查询的类型(决定 measurement) + :param property: (str): 查询的字段名称(field) + :param bucket: (str): 数据存储的 bucket 名称。 + :return: list(dict): result_records + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + result_records.append({ + "ID": record["ID"], + "value": record["_value"] + }) + client.close() + return result_records + + +# 2025/02/21 +def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result") -> tuple: + """ + 查询指定日期的所有记录,包括‘node’和‘link’,分别以指定的格式返回 + :param query_date: 输入的日期,格式为‘2025-02-14’ + :param bucket: 数据存储的bucket名称 + :return: dict: tuple: (node_records, link_records) + """ + client = get_new_client() + # 记录开始时间 + time_cost_start = time.perf_counter() + print('{} -- Hydraulic simulation started.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间 + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link" and r["date"] == "{query_date}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "ID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + time_cost_end = time.perf_counter() + print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), + time_cost_end - time_cost_start)) + client.close() + return node_records, link_records + + +# 2025/02/21 +def query_all_record_by_date_property(query_date: str, type: str, property: str, bucket: str="realtime_simulation_result") -> list: + """ + 查询指定日期的‘node’或‘link’的某一属性值的所有记录,以指定的格式返回 + :param query_date: 输入的日期,格式为‘2025-02-14’ + :param type: (str): 查询的类型(决定 measurement) + :param property: (str): 查询的字段名称(field) + :param bucket: 数据存储的bucket名称 + :return: list(dict): result_records + """ + client = get_new_client() + # 记录开始时间 + time_cost_start = time.perf_counter() + print('{} -- Hydraulic simulation started.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 将 start_date 的北京时间转换为 UTC 时间 + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["date"] == "{query_date}" and r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + result_records.append({ + "ID": record["ID"], + "time": record["_time"], + "value": record["_value"] + }) + time_cost_end = time.perf_counter() + print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), + time_cost_end - time_cost_start)) + client.close() + return result_records + + +# 2025/02/01 +def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result") -> list: + """ + 根据 type 查询对应的 measurement,根据 ID 和 date 查询对应的 tag,根据 property 查询对应的 field。 + :param ID: (str): 要查询的 ID(tag) + :param type: (str): 查询的类型(决定 measurement) + :param property: (str): 查询的字段名称(field) + :param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD' + :param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD' + :param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_simulation_result" + :return: 查询结果的列表 + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 解析日期范围(当天的 UTC 开始和结束时间) + # previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1) + # start_time = previous_day.isoformat() + "T16:00:00Z" + # stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z" + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["ID"] == "{ID}" and r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + # 解析查询结果 + results = [] + for table in tables: + for record in table.records: + results.append({ + "time": record["_time"], + "value": record["_value"] + }) + client.close() + return results + + +# 2025/02/13 +def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], + scheme_start_time: str, num_periods: int = 1, scheme_Type: str = None, scheme_Name: str = None, + bucket: str = "scheme_simulation_result"): + """ + 将方案模拟计算结果存入 InfluxuDb 的scheme_simulation_result这个bucket中。 + :param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。 + :param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。 + :param scheme_start_time: (str): 方案模拟开始时间。 + :param num_periods: (int): 方案模拟的周期数 + :param scheme_Type: (str): 方案类型 + :param scheme_Name: (str): 方案名称 + :param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + try: + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + date_str = scheme_start_time.split('T')[0] + time_beijing = datetime.strptime(scheme_start_time, '%Y-%m-%dT%H:%M:%S%z') + timestep_parts = globals.hydraulic_timestep.split(':') + timestep = timedelta(hours=int(timestep_parts[0]), minutes=int(timestep_parts[1]), seconds=int(timestep_parts[2])) + for node_result in node_result_list: + # 提取节点信息和数据结果 + node_id = node_result.get('node') + # 从period 0 到 period num_period - 1 + for period_index in range(num_periods): + scheme_time = (time_beijing + (timestep * period_index)).isoformat() + data_list = [node_result.get('result', [])[period_index]] + for data in data_list: + # 构建 Point 数据,多个 field 存在于一个数据点中 + node_point = Point("node") \ + .tag("date", date_str) \ + .tag("ID", node_id) \ + .tag("scheme_Type", scheme_Type) \ + .tag("scheme_Name", scheme_Name) \ + .field("head", data.get('head', 0.0)) \ + .field("pressure", data.get('pressure', 0.0)) \ + .field("actualdemand", data.get('demand', 0.0)) \ + .field("demanddeficit", None) \ + .field("totalExternalOutflow", None) \ + .field("quality", data.get('quality', 0.0)) \ + .time(scheme_time, write_precision='s') + points_to_write.append(node_point) + # 写入数据到 InfluxDB,多个 field 在同一个 point 中 + # write_api.write(bucket=bucket, org=org_name, record=node_point) + # write_api.flush() + for link_result in link_result_list: + link_id = link_result.get('link') + for period_index in range(num_periods): + scheme_time = (time_beijing + (timestep * period_index)).isoformat() + data_list = [link_result.get('result', [])[period_index]] + for data in data_list: + link_point = Point("link") \ + .tag("date", date_str) \ + .tag("ID", link_id) \ + .tag("scheme_Type", scheme_Type) \ + .tag("scheme_Name", scheme_Name) \ + .field("flow", data.get('flow', 0.0)) \ + .field("velocity", data.get('velocity', 0.0)) \ + .field("headloss", data.get('headloss', 0.0)) \ + .field("quality", data.get('quality', 0.0)) \ + .field("status", data.get('status', "UNKNOWN")) \ + .field("setting", data.get('setting', 0.0)) \ + .field("reaction", data.get('reaction', 0.0)) \ + .field("friction", data.get('friction', 0.0)) \ + .time(scheme_time, write_precision='s') + points_to_write.append(link_point) + # write_api.write(bucket=bucket, org=org_name, record=link_point) + # write_api.flush() + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + except Exception as e: + raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}") + client.close() + + +# 2025/03/12 +def query_corresponding_query_id_and_element_id(name: str) -> None: + """ + 查询scada_info这张表中,api_query_id与associated_element_id的对应关系,用于下一步fill_scheme_simulation_result_to_SCADA + :param name: 数据库名称 + :return: + """ + # 连接数据库 + conn_string = f"dbname={name} host=127.0.0.1" + try: + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 查询 transmission_mode 为 'realtime' 的记录 + cur.execute(""" + SELECT type, associated_element_id, api_query_id + FROM scada_info + WHERE type IN ('source_outflow', 'pipe_flow', 'demand', 'pressure', 'quality'); + """) + records = cur.fetchall() + # 遍历查询结果,根据 type 分类存入对应的字典 + for record in records: + record_type, associated_element_id, api_query_id = record + if record_type == "source_outflow": + globals.scheme_source_outflow_ids[api_query_id] = associated_element_id + elif record_type == "pipe_flow": + globals.scheme_pipe_flow_ids[api_query_id] = associated_element_id + elif record_type == "pressure": + globals.scheme_pressure_ids[api_query_id] = associated_element_id + elif record_type == "demand": + globals.scheme_demand_ids[api_query_id] = associated_element_id + elif record_type == "quality": + globals.scheme_quality_ids[api_query_id] = associated_element_id + # 如果需要调试,可以打印该字典 + # print("scheme_source_outflow_ids:", globals.scheme_source_outflow_ids) + # print("scheme_pipe_flow_ids:", globals.scheme_pipe_flow_ids) + # print("scheme_pressure_ids:", globals.scheme_pressure_ids) + # print("scheme_demand_ids:", globals.scheme_demand_ids) + # print("scheme_quality_ids:", globals.scheme_quality_ids) + except psycopg.Error as e: + print(f"数据库连接或查询出错: {e}") + + +# 2025/03/22 +# def auto_get_burst_flow(): + + +# 2025/03/22 +# def manually_get_burst_flow(): + + +# 2025/03/11 +def fill_scheme_simulation_result_to_SCADA(scheme_Type: str = None, scheme_Name: str = None, query_date: str = None, + bucket: str = "scheme_simulation_result"): + """ + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' + :param bucket: InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result" + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + write_options = WriteOptions( + jitter_interval=200, # 添加抖动以避免同时写入 + max_retry_delay=30000 # 最大重试延迟(毫秒) + ) + write_api = client.write_api(write_options=write_options) + # 创建一个临时存储点数据的列表 + points_to_write = [] + # 查找associated_element_id的对应值 + for key, value in globals.scheme_source_outflow_ids.items(): + scheme_source_outflow_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='link', property='flow')) + # print(f"Key: {key}, Query result: {scheme_source_outflow_result}") # 调试输出 + for data in scheme_source_outflow_result: + point = ( + Point('scheme_source_outflow') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_pipe_flow_ids.items(): + scheme_pipe_flow_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='link', property='flow')) + for data in scheme_pipe_flow_result: + point = ( + Point('scheme_pipe_flow') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_pressure_ids.items(): + scheme_pressure_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='node', property='pressure')) + for data in scheme_pressure_result: + point = ( + Point('scheme_pressure') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_demand_ids.items(): + scheme_demand_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='node', property='actualdemand')) + for data in scheme_demand_result: + point = ( + Point('scheme_demand') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + + for key, value in globals.scheme_quality_ids.items(): + scheme_quality_result = (query_scheme_curve_by_ID_property(scheme_Type=scheme_Type, scheme_Name=scheme_Name, + query_date=query_date, ID=value, type='node', property='quality')) + for data in scheme_quality_result: + point = ( + Point('scheme_quality') + .tag("date", query_date) + .tag("device_ID", key) + .tag("scheme_Type", scheme_Type) + .tag("scheme_Name", scheme_Name) + .field("monitored_value", data['value']) + .time(data['time'], write_precision='s') + ) + points_to_write.append(point) + # write_api.write(bucket=bucket, org=org_name, record=point) + # 批量写入数据 + if points_to_write: + write_api.write(bucket=bucket, org=org_name, record=points_to_write) + write_api.flush() # 刷新缓存一次 + client.close() + + +# 2025/02/15 +def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bucket: str="SCADA_data") -> list: + """ + 根据SCADA设备的api_query_id和时间范围,查询得到曲线,查到的数据为0时区时间 + :param api_query_id: SCADA设备的api_query_id + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: 查询结果的列表 + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["device_ID"] == "{api_query_id}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + # 解析查询结果 + results = [] + for table in tables: + for record in table.records: + results.append({ + "time": record["_time"], + "value": record["_value"] + }) + client.close() + return results + + +# 2025/02/18 +def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_time: str, bucket: str="scheme_simulation_result") -> tuple: + """ + 查询指定方案某一时刻的所有记录,包括‘node'和‘link’,分别以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: 数据存储的 bucket 名称。 + :return: dict: tuple: (node_records, link_records) + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "ID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + client.close() + return node_records, link_records + + +# 2025/03/04 +def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, query_time: str, type: str, property: str, + bucket: str="scheme_simulation_result") -> list: + """ + 查询指定方案某一时刻‘node'或‘link’某一属性值,以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param type: 查询的类型(决定 measurement) + :param property: 查询的字段名称(field) + :param bucket: 数据存储的 bucket 名称。 + :return: dict: tuple: (node_records, link_records) + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + # 将北京时间转换为 UTC 时间 + beijing_time = datetime.fromisoformat(query_time) + utc_time = beijing_time.astimezone(timezone.utc) + utc_start_time = utc_time - timedelta(seconds=1) + utc_stop_time = utc_time + timedelta(seconds=1) + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + result_records.append({ + "ID": record["ID"], + "value": record["_value"] + }) + client.close() + return result_records + + +# 2025/02/19 +def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, query_date: str, ID: str, type: str, property: str, + bucket: str="scheme_simulation_result") -> list: + """ + 根据scheme_Type和scheme_Name,查询该模拟方案中,某一node或link的某一属性值的所有时间的结果 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' + :param ID: 元素的ID + :param type: 元素的类型,node或link + :param property: 元素的属性值 + :param bucket: 数据存储的 bucket 名称,默认值为 "scheme_simulation_result" + :return: 查询结果的列表 + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "{measurement}" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["ID"] == "{ID}" and r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + # 解析查询结果 + results = [] + for table in tables: + for record in table.records: + results.append({ + "time": record["_time"], + "value": record["_value"] + }) + client.close() + return results + + +# 2025/02/21 +def query_scheme_all_record(scheme_Type: str, scheme_Name: str, query_date: str, bucket: str="scheme_simulation_result") -> tuple: + """ + 查询指定方案的所有记录,包括‘node'和‘link’,分别以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称。 + :return: dict: tuple: (node_records, link_records) + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["_measurement"] == "node" or r["_measurement"] == "link") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "ID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + client.close() + return node_records, link_records + + +# 2025/03/04 +def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, query_date: str, type: str, property: str, + bucket: str="scheme_simulation_result") -> list: + """ + 查询指定方案的‘node'或‘link’的某一属性值,以指定格式返回。 + :param scheme_Type: 方案类型 + :param scheme_Name: 方案名称 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' + :param type: 查询的类型(决定 measurement) + :param property: 查询的字段名称(field) + :param bucket: 数据存储的 bucket 名称。 + :return: dict: tuple: (node_records, link_records) + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 确定 measurement + if type == "node": + measurement = "node" + elif type == "link": + measurement = "link" + else: + raise ValueError(f"不支持的类型: {type}") + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}" and r["date"] == "{query_date}" and r["_measurement"] == "{measurement}" and r["_field"] == "{property}") + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + result_records.append({ + "time": record["_time"], + "ID": record["ID"], + "value": record["_value"] + }) + client.close() + return result_records + + +# 2025/02/16 +def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_data") -> None: + """ + 导出influxdb中SCADA_data这个bucket的数据到csv中 + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + ''' + # 执行查询 + tables = query_api.query(flux_query) + # 存储查询结果 + rows = [] + for table in tables: + for record in table.records: + row = { + 'time': record.get_time(), + 'measurement': record.get_measurement(), + 'date': record.values.get('date', None), + 'description': record.values.get('description', None), + 'device_ID': record.values.get('device_ID', None), + 'monitored_value': record.get_value() if record.get_field() == 'monitored_value' else None, + 'datacleaning_value': record.get_value() if record.get_field() == 'datacleaning_value' else None, + 'simulation_value': record.get_value() if record.get_field() == 'simulation_value' else None, + } + rows.append(row) + # 动态生成 CSV 文件名 + csv_filename = f"SCADA_data_{start_date}至{end_date}.csv" + # 写入到 CSV 文件 + with open(csv_filename, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'description', 'device_ID', 'monitored_value', 'datacleaning_value', 'simulation_value']) + writer.writeheader() + writer.writerows(rows) + print(f"Data exported to {csv_filename} successfully.") + client.close() + + +# 2025/02/17 +def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, bucket: str="realtime_simulation_result") -> None: + """ + 导出influxdb中realtime_simulation_result这个bucket的数据到csv中 + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_link = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "link") + ''' + # 执行查询 + link_tables = query_api.query(flux_query_link) + # 存储link类的数据 + link_rows = [] + link_data = {} + for table in link_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in link_data: + link_data[key] = {} + field = record.get_field() + link_data[key][field] = record.get_value() + link_data[key]['measurement'] = record.get_measurement() + link_data[key]['date'] = record.values.get('date', None) + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_node = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "node") + ''' + # 执行查询 + node_tables = query_api.query(flux_query_node) + # 存储node类的数据 + node_rows = [] + node_data = {} + for table in node_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in node_data: + node_data[key] = {} + field = record.get_field() + node_data[key][field] = record.get_value() + node_data[key]['measurement'] = record.get_measurement() + node_data[key]['date'] = record.values.get('date', None) + + for key in set(link_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(link_data.get(key, {})) + link_rows.append(row) + for key in set(node_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(node_data.get(key, {})) + node_rows.append(row) + # 动态生成 CSV 文件名 + csv_filename_link = f"realtime_simulation_link_result_{start_date}至{end_date}.csv" + csv_filename_node = f"realtime_simulation_node_result_{start_date}至{end_date}.csv" + # 写入到 CSV 文件 + with open(csv_filename_link, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) + writer.writeheader() + writer.writerows(link_rows) + with open(csv_filename_node, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'head', 'pressure', 'actualdemand', + 'demanddeficit', 'totalExternalOutflow', 'quality']) + writer.writeheader() + writer.writerows(node_rows) + print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + client.close() + + +# 2025/02/18 +def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, bucket: str="scheme_simulation_result") -> None: + """ + 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 + :param start_date: 查询开始的时间,格式为 'YYYY-MM-DD' + :param end_date: 查询结束的时间,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间范围 + start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_link = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "link") + ''' + # 执行查询 + link_tables = query_api.query(flux_query_link) + # 存储link类的数据 + link_rows = [] + link_data = {} + for table in link_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in link_data: + link_data[key] = {} + field = record.get_field() + link_data[key][field] = record.get_value() + link_data[key]['measurement'] = record.get_measurement() + link_data[key]['date'] = record.values.get('date', None) + link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_node = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "node") + ''' + # 执行查询 + node_tables = query_api.query(flux_query_node) + # 存储node类的数据 + node_rows = [] + node_data = {} + for table in node_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in node_data: + node_data[key] = {} + field = record.get_field() + node_data[key][field] = record.get_value() + node_data[key]['measurement'] = record.get_measurement() + node_data[key]['date'] = record.values.get('date', None) + node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + for key in set(link_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(link_data.get(key, {})) + link_rows.append(row) + for key in set(node_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(node_data.get(key, {})) + node_rows.append(row) + # 动态生成 CSV 文件名 + csv_filename_link = f"scheme_simulation_link_result_{start_date}至{end_date}.csv" + csv_filename_node = f"scheme_simulation_node_result_{start_date}至{end_date}.csv" + # 写入到 CSV 文件 + with open(csv_filename_link, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) + writer.writeheader() + writer.writerows(link_rows) + with open(csv_filename_node, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand', + 'demanddeficit', 'totalExternalOutflow', 'quality']) + writer.writeheader() + writer.writerows(node_rows) + print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + client.close() + + +# 2025/02/18 +def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, query_date: str, bucket: str="scheme_simulation_result") -> None: + """ + 导出influxdb中scheme_simulation_result这个bucket的数据到csv中 + :param scheme_Type: 查询的方案类型 + :param scheme_Name: 查询的方案名 + :param query_date: 查询日期,格式为 'YYYY-MM-DD' + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + stop_time = datetime.strptime(query_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_link = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "link" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") + ''' + # 执行查询 + link_tables = query_api.query(flux_query_link) + # 存储link类的数据 + link_rows = [] + link_data = {} + for table in link_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in link_data: + link_data[key] = {} + field = record.get_field() + link_data[key][field] = record.get_value() + link_data[key]['measurement'] = record.get_measurement() + link_data[key]['date'] = record.values.get('date', None) + link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + # 构建 Flux 查询语句,查询指定时间范围内的数据 + flux_query_node = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}, stop: {stop_time}) + |> filter(fn: (r) => r["_measurement"] == "node" and r["scheme_Type"] == "{scheme_Type}" and r["scheme_Name"] == "{scheme_Name}") + ''' + # 执行查询 + node_tables = query_api.query(flux_query_node) + # 存储node类的数据 + node_rows = [] + node_data = {} + for table in node_tables: + for record in table.records: + key = (record.get_time(), record.values.get('ID', None)) + if key not in node_data: + node_data[key] = {} + field = record.get_field() + node_data[key][field] = record.get_value() + node_data[key]['measurement'] = record.get_measurement() + node_data[key]['date'] = record.values.get('date', None) + node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None) + node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None) + for key in set(link_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(link_data.get(key, {})) + link_rows.append(row) + for key in set(node_data.keys()): + row = {'time': key[0], "ID": key[1]} + row.update(node_data.get(key, {})) + node_rows.append(row) + # 动态生成 CSV 文件名 + csv_filename_link = f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv" + csv_filename_node = f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv" + # 写入到 CSV 文件 + with open(csv_filename_link, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction']) + writer.writeheader() + writer.writerows(link_rows) + with open(csv_filename_node, mode='w', newline='') as file: + writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand', + 'demanddeficit', 'totalExternalOutflow', 'quality']) + writer.writeheader() + writer.writerows(node_rows) + print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.") + client.close() + + +# 示例调用 +if __name__ == "__main__": + url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 + token = "MhJDl7odKW-y6wNXXUhUMRJ9oPzOvEe52E4NYD5GXtAAMV7BoHMFdet6HqUOt4DjZ-syKjwGao_k0ZIcgrGAPA==" # 替换为你的InfluxDB Token + org_name = "beibei" # 替换为你的Organization名称 + + # step1: 检查连接状态,初始化influxdb的buckets + # try: + # # delete_buckets(org_name) + # create_and_initialize_buckets(org_name) + # except Exception as e: + # print(f"连接失败: {e}") + + + # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 + query_pg_scada_info_realtime('bb') + query_pg_scada_info_non_realtime('bb') + + query_corresponding_query_id_and_element_id('bb') + + + + # 手动执行存储测试 + # 示例1:store_realtime_SCADA_data_to_influxdb + # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-03-16T11:13:00+08:00') + + # 示例2:store_non_realtime_SCADA_data_to_influxdb + # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-03-08T12:00:00+08:00') + + # 示例3:download_history_data_manually + # download_history_data_manually(begin_time='2025-03-21T00:00:00+08:00', end_time='2025-03-22T00:00:00+08:00') + + # step3: 查询测试示例 + + # 示例1:query_latest_record_by_ID + # bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称 + # node_id = "ZBBDTZDP000022" # 查询的节点 ID + # link_id = "ZBBGXSZW000002" + # + # latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name) + # # # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name) + # # + # if latest_record: + # print("最新记录:", latest_record) + # else: + # print("未找到符合条件的记录。") + + # 示例2:query_all_record_by_time + # node_records, link_records = query_all_record_by_time(query_time="2025-02-14T10:30:00+08:00") + # print("Node 数据:", node_records) + # print("Link 数据:", link_records) + + # 示例3:query_curve_by_ID_property_daterange + # curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head", + # start_date="2024-11-25", end_date="2024-11-25") + # print(curve_result) + + # 示例4:query_SCADA_data_by_device_ID_and_time + # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.fixed_pump_realtime_ids, query_time='2025-03-09T23:45:00+08:00') + # print(SCADA_result_dict) + + # 示例5:query_SCADA_data_curve + # SCADA_result = query_SCADA_data_curve(api_query_id='9519', start_date='2025-03-08', end_date='2025-03-08') + # print(SCADA_result) + + # 示例6:export_SCADA_data_to_csv + # export_SCADA_data_to_csv(start_date='2025-02-13', end_date='2025-02-15') + + # 示例7:export_realtime_simulation_result_to_csv + # export_realtime_simulation_result_to_csv(start_date='2025-02-13', end_date='2025-02-15') + + # 示例8:export_scheme_simulation_result_to_csv_time + # export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15') + + # 示例9:export_scheme_simulation_result_to_csv_scheme + # export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10') + + # 示例10:query_scheme_all_record_by_time + # node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00") + # print("Node 数据:", node_records) + # print("Link 数据:", link_records) + + # 示例11:query_scheme_curve_by_ID_property + # curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', ID='ZBBDTZDP000022', + # type='node', property='head') + # print(curve_result) + + # 示例12:query_all_record_by_date + # node_records, link_records = query_all_record_by_date(query_date='2025-02-27') + # print("Node 数据:", node_records) + # print("Link 数据:", link_records) + + # 示例13:query_scheme_all_record + # node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10') + # print("Node 数据:", node_records) + # print("Link 数据:", link_records) + + # 示例14:query_all_record_by_time_property + # result_records = query_all_record_by_time_property(query_time='2025-02-25T23:45:00+08:00', type='node', property='head') + # print(result_records) + + # 示例15:query_all_record_by_date_property + # result_records = query_all_record_by_date_property(query_date='2025-02-14', type='node', property='head') + # print(result_records) + + # 示例16:query_scheme_all_record_by_time_property + # result_records = query_scheme_all_record_by_time_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', + # query_time='2025-02-14T10:30:00+08:00', type='node', property='head') + # print(result_records) + + # 示例17:query_scheme_all_record_property + # result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_date='2025-03-10', type='node', property='head') + # print(result_records) + + # 示例18:fill_scheme_simulation_result_to_SCADA + # fill_scheme_simulation_result_to_SCADA(scheme_Type='burst_Analysis', scheme_Name='burst_scheme', query_date='2025-03-10') + + # 示例19:query_SCADA_data_by_device_ID_and_timerange + # result = query_SCADA_data_by_device_ID_and_timerange(query_ids_list=globals.fixed_pump_realtime_ids, start_time='2025-03-09T12:00:00+08:00', + # end_time='2025-03-09T12:10:00+08:00') + # print(result) + + +