Files
TJWaterServer/influxdb_api.py
2025-02-08 21:19:14 +08:00

1358 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
from typing import List, Dict
from datetime import datetime, timedelta, timezone
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from dateutil import parser
import get_realValue
import get_data
import psycopg
import time
import simulation
from tjnetwork import *
import schedule
import threading
import globals
import influxdb_info
# influxdb数据库连接信息
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token, org=org_name)
# # 所有实时更新数据的SCADA设备的ID
# flow_device_ids = ['2498', '3854', '3853']
# pressure_device_ids = ['2510', '2514']
# reservoir_liquid_level_ids = ['2497', '2571']
# tank_liquid_level_ids = ['4780', '9774']
# pump_device_ids = ['2747', '2776', '2730', '2787', '2500', '2502', '2504']
#
# # 用于更改数据的SCADA设的ID
# change_data_device_ids = ['2498', '3854', '3853', '2497', '2571', '4780', '9774',
# '2747', '2776', '2730', '2787', '2500', '2502', '2504']
# # 全局变量用于存储不同类型的realtime api_query_id
# reservoir_liquid_level_realtime_ids = []
# tank_liquid_level_realtime_ids = []
# fixed_pump_realtime_ids = []
# variable_pump_realtime_ids = []
# source_outflow_realtime_ids = []
# pipe_flow_realtime_ids = []
# pressure_realtime_ids = []
# demand_realtime_ids = []
# quality_realtime_ids = []
#
# # transmission_frequency的最大值
# transmission_frequency = None
# hydraulic_timestep = None
#
# reservoir_liquid_level_non_realtime_ids = []
# tank_liquid_level_non_realtime_ids = []
# fixed_pump_non_realtime_ids = []
# variable_pump_non_realtime_ids = []
# source_outflow_non_realtime_ids = []
# pipe_flow_non_realtime_ids = []
# pressure_non_realtime_ids = []
# demand_non_realtime_ids = []
# quality_non_realtime_ids = []
def query_pg_scada_info_realtime(name: str) -> None:
"""
查询pg数据库中scada_info中属于realtime的数据
:param name: 数据库名称
:return:
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 transmission_mode 为 'realtime' 的记录
cur.execute("""
SELECT type, api_query_id
FROM scada_info
WHERE transmission_mode = 'realtime';
""")
records = cur.fetchall()
# 清空全局列表
globals.reservoir_liquid_level_realtime_ids.clear()
globals.tank_liquid_level_realtime_ids.clear()
globals.fixed_pump_realtime_ids.clear()
globals.variable_pump_realtime_ids.clear()
globals.source_outflow_realtime_ids.clear()
globals.pipe_flow_realtime_ids.clear()
globals.pressure_realtime_ids.clear()
globals.demand_realtime_ids.clear()
globals.quality_realtime_ids.clear()
# 根据 type 分类存储 api_query_id
for record in records:
record_type, api_query_id = record
if api_query_id is not None: # 确保 api_query_id 不为空
if record_type == "reservoir_liquid_level":
globals.reservoir_liquid_level_realtime_ids.append(api_query_id)
elif record_type == "tank_liquid_level":
globals.tank_liquid_level_realtime_ids.append(api_query_id)
elif record_type == "fixed_pump":
globals.fixed_pump_realtime_ids.append(api_query_id)
elif record_type == "variable_pump":
globals.variable_pump_realtime_ids.append(api_query_id)
elif record_type == "source_outflow":
globals.source_outflow_realtime_ids.append(api_query_id)
elif record_type == "pipe_flow":
globals.pipe_flow_realtime_ids.append(api_query_id)
elif record_type == "pressure":
globals.pressure_realtime_ids.append(api_query_id)
elif record_type == "demand":
globals.demand_realtime_ids.append(api_query_id)
elif record_type == "quality":
globals.quality_realtime_ids.append(api_query_id)
# 打印结果,方便调试
# print("Query completed. Results:")
# print("Reservoir Liquid Level IDs:", globals.reservoir_liquid_level_realtime_ids)
# print("Tank Liquid Level IDs:", globals.tank_liquid_level_realtime_ids)
# print("Fixed Pump IDs:", globals.fixed_pump_realtime_ids)
# print("Variable Pump IDs:", globals.variable_pump_realtime_ids)
# print("Source Outflow IDs:", globals.source_outflow_realtime_ids)
# print("Pipe Flow IDs:", globals.pipe_flow_realtime_ids)
# print("Pressure IDs:", globals.pressure_realtime_ids)
# print("Demand IDs:", globals.demand_realtime_ids)
# print("Quality IDs:", globals.quality_realtime_ids)
except Exception as e:
print(f"查询时发生错误:{e}")
def query_pg_scada_info_non_realtime(name: str) -> None:
"""
查询pg数据库中scada_info中属于non_realtime的数据以及这些数据transmission_frequency的最大值
:param name: 数据库名称
:return:
"""
# 重新打开数据库
if is_project_open(name):
close_project(name)
open_project(name)
dic_time = get_time(name)
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
close_project(name)
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 transmission_mode 为 'non_realtime' 的记录
cur.execute("""
SELECT type, api_query_id, transmission_frequency
FROM scada_info
WHERE transmission_mode = 'non_realtime';
""")
records = cur.fetchall()
# 清空全局列表
globals.reservoir_liquid_level_non_realtime_ids.clear()
globals.fixed_pump_non_realtime_ids.clear()
globals.variable_pump_non_realtime_ids.clear()
globals.source_outflow_non_realtime_ids.clear()
globals.pipe_flow_non_realtime_ids.clear()
globals.pressure_non_realtime_ids.clear()
globals.demand_non_realtime_ids.clear()
globals.quality_non_realtime_ids.clear()
# 用于计算 transmission_frequency 最大值
transmission_frequencies = []
# 根据 type 分类存储 api_query_id
for record in records:
record_type, api_query_id, freq = record
if api_query_id is not None: # 确保 api_query_id 不为空
if record_type == "reservoir_liquid_level":
globals.reservoir_liquid_level_non_realtime_ids.append(api_query_id)
elif record_type == "fixed_pump":
globals.fixed_pump_non_realtime_ids.append(api_query_id)
elif record_type == "variable_pump":
globals.variable_pump_non_realtime_ids.append(api_query_id)
elif record_type == "source_outflow":
globals.source_outflow_non_realtime_ids.append(api_query_id)
elif record_type == "pipe_flow":
globals.pipe_flow_non_realtime_ids.append(api_query_id)
elif record_type == "pressure":
globals.pressure_non_realtime_ids.append(api_query_id)
elif record_type == "demand":
globals.demand_non_realtime_ids.append(api_query_id)
elif record_type == "quality":
globals.quality_non_realtime_ids.append(api_query_id)
# 收集 transmission_frequency用于计算最大值
if freq is not None:
transmission_frequencies.append(freq)
# 计算 transmission_frequency 最大值
globals.transmission_frequency = max(transmission_frequencies) if transmission_frequencies else None
# 打印结果,方便调试
# print("Query completed. Results:")
# print("Reservoir Liquid Level Non-Realtime IDs:", globals.reservoir_liquid_level_non_realtime_ids)
# print("Fixed Pump Non-Realtime IDs:", globals.fixed_pump_non_realtime_ids)
# print("Variable Pump Non-Realtime IDs:", globals.variable_pump_non_realtime_ids)
# print("Source Outflow Non-Realtime IDs:", globals.source_outflow_non_realtime_ids)
# print("Pipe Flow Non-Realtime IDs:", globals.pipe_flow_non_realtime_ids)
# print("Pressure Non-Realtime IDs:", globals.pressure_non_realtime_ids)
# print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids)
# print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids)
# print("Maximum Transmission Frequency:", globals.transmission_frequency)
# print("Hydraulic Timestep:", globals.hydraulic_timestep)
except Exception as e:
print(f"查询时发生错误:{e}")
# 2025/02/01
def delete_buckets(client: InfluxDBClient, org_name: str) -> None:
"""
删除InfluxDB中指定organization下的所有buckets。
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:param org_name: InfluxDB中organization的名称。
:return: None
"""
buckets_api = client.buckets_api()
buckets_obj = buckets_api.find_buckets(org=org_name)
# 确保 buckets_obj 拥有 buckets 属性
if hasattr(buckets_obj, 'buckets'):
for bucket in buckets_obj.buckets:
try:
buckets_api.delete_bucket(bucket)
print(f"Bucket {bucket.name} has been deleted successfully.")
except Exception as e:
print(f"Failed to delete bucket {bucket.name}: {e}")
else:
print("未找到 buckets 属性,无法迭代 buckets。")
# 2025/02/01
def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None:
"""
初始化influxdb的三个数据存储库分别为SCADA_data、realtime_simulation_result、scheme_simulation_result
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:param org_name: InfluxDB中organization的名称
:return:
"""
# 先删除原有的,然后再进行初始化
delete_buckets(client, org_name)
bucket_api = BucketsApi(client)
write_api = client.write_api()
org_api = OrganizationsApi(client)
# 获取 org_id
org = next((o for o in org_api.find_organizations() if o.name == org_name), None)
if not org:
raise ValueError(f"Organization '{org_name}' not found.")
org_id = org.id
print(f"Using Organization ID: {org_id}")
# 定义 Buckets 信息
buckets = [
{"name": "SCADA_data", "retention_rules": []},
{"name": "realtime_simulation_result", "retention_rules": []},
{"name": "scheme_simulation_result", "retention_rules": []}
]
# 创建 Buckets 并初始化数据
for bucket in buckets:
# 创建 Bucket
created_bucket = bucket_api.create_bucket(
bucket_name=bucket["name"],
retention_rules=bucket["retention_rules"],
org_id=org_id
)
print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}")
# 根据 Bucket 初始化数据
if bucket["name"] == "SCADA_data":
point = Point("SCADA") \
.tag("date", None) \
.tag("description", None) \
.tag("device_ID", None) \
.field("monitored_value", 0.0) \
.field("datacleaning_value", 0.0) \
.field("simulation_value", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="SCADA_data", org=org_name, record=point)
print("Initialized SCADA_data with default structure.")
elif bucket["name"] == "realtime_simulation_result": # realtime_simulation_result
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point)
write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point)
print("Initialized realtime_simulation_result with default structure.")
elif bucket["name"] == "scheme_simulation_result":
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point)
write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point)
print("Initialized scheme_simulation_result with default structure.")
print("All buckets created and initialized successfully.")
def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None:
"""
将SCADA数据通过数据接口导入数据库
:param get_real_value_time: 获取数据的时间,格式如'2024-11-25T09:00:00+08:00'
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
write_api = client.write_api(write_options=ASYNCHRONOUS)
try_count = 0
reservoir_liquid_level_realtime_data_list = []
tank_liquid_level_realtime_data_list = []
fixed_pump_realtime_data_list =[]
variable_pump_realtime_data_list =[]
source_outflow_realtime_data_list = []
pipe_flow_realtime_data_list = []
pressure_realtime_data_list =[]
demand_realtime_data_list = []
quality_realtime_data_list = []
while try_count <= 5: # 尝试6次 *******
try:
try_count += 1
if globals.reservoir_liquid_level_realtime_ids:
reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue(
ids=','.join(globals.reservoir_liquid_level_realtime_ids))
if globals.tank_liquid_level_realtime_ids:
tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids))
if globals.fixed_pump_realtime_ids:
fixed_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.fixed_pump_realtime_ids))
if globals.variable_pump_realtime_ids:
variable_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.variable_pump_realtime_ids))
if globals.source_outflow_realtime_ids:
source_outflow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.source_outflow_realtime_ids))
if globals.pipe_flow_realtime_ids:
pipe_flow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pipe_flow_realtime_ids))
if globals.pressure_realtime_ids:
pressure_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pressure_realtime_ids))
if globals.demand_realtime_ids:
demand_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.demand_realtime_ids))
if globals.quality_realtime_ids:
quality_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.quality_realtime_ids))
except Exception as e:
print(e)
time.sleep(10)
else:
try_count = 100
# 写入数据
if reservoir_liquid_level_realtime_data_list:
for data in reservoir_liquid_level_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('reservoir_liquid_level_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if tank_liquid_level_realtime_data_list:
for data in tank_liquid_level_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('tank_liquid_level_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", (monitored_value))
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if fixed_pump_realtime_data_list:
for data in fixed_pump_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('fixed_pump_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if variable_pump_realtime_data_list:
for data in variable_pump_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('variable_pump_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if source_outflow_realtime_data_list:
for data in source_outflow_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('source_outflow_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if pipe_flow_realtime_data_list:
for data in pipe_flow_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('pipe_flow_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if pressure_realtime_data_list:
for data in pressure_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('pressure_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if demand_realtime_data_list:
for data in demand_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('demand_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if quality_realtime_data_list:
for data in quality_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('quality_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
def convert_time_format(original_time: str) -> str:
"""
格式转换将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00”
:param original_time: str “2024-04-13T08:00:00+08:00"格式的时间
:return: str“2024-04-13 08:00:00”格式的时间
"""
new_time = original_time.replace('T', ' ')
new_time = new_time.replace('+08:00', '')
return new_time
# 筛选符合条件的数据
def is_timestep_multiple(data_time, timestep):
# 获取时间点距离当天0点的时间差
midnight = data_time.replace(hour=0, minute=0, second=0, microsecond=0)
delta_since_midnight = data_time - midnight
# 检查时间差是否为时间步长的整数倍
return delta_since_midnight.total_seconds() % timestep.total_seconds() == 0
# 2025/01/10
def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None:
"""
获取某段时间内传回的scada数据
:param get_history_data_end_time: 获取历史数据的终止时间时间,格式如'2024-11-25T09:00:00+08:00'
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
write_api = client.write_api(write_options=SYNCHRONOUS)
# 将end_date字符串转换为datetime对象
end_date_dt = datetime.strptime(convert_time_format(get_history_data_end_time), '%Y-%m-%d %H:%M:%S')
end_date = end_date_dt.strftime('%Y-%m-%d %H:%M:%S')
# 将transmission_frequency字符串转换为timedelta对象
transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1)
get_history_data_start_time = end_date_dt - transmission_frequency_dt
begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S')
# print(begin_date)
# print(end_date)
reservoir_liquid_level_non_realtime_data_list = []
tank_liquid_level_non_realtime_data_list = []
fixed_pump_non_realtime_data_list = []
variable_pump_non_realtime_data_list = []
source_outflow_non_realtime_data_list = []
pipe_flow_non_realtime_data_list = []
pressure_non_realtime_data_list = []
demand_non_realtime_data_list = []
quality_non_realtime_data_list = []
try_count = 0
while try_count < 5:
try:
try_count += 1
# reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
# ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m')
if globals.reservoir_liquid_level_non_realtime_ids:
reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.reservoir_liquid_level_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.tank_liquid_level_non_realtime_ids:
tank_liquid_level_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.tank_liquid_level_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.fixed_pump_non_realtime_ids:
fixed_pump_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.fixed_pump_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.variable_pump_non_realtime_ids:
variable_pump_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.variable_pump_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.source_outflow_non_realtime_ids:
source_outflow_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.source_outflow_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.pipe_flow_non_realtime_ids:
pipe_flow_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pipe_flow_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
# print(pipe_flow_non_realtime_data_list)
if globals.pressure_non_realtime_ids:
pressure_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pressure_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
# print(pressure_non_realtime_data_list)
if globals.demand_non_realtime_ids:
demand_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.demand_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.quality_non_realtime_ids:
quality_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.quality_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
except Exception as e:
print(f"Attempt {try_count} failed with error: {e}")
if try_count < 5:
print("Retrying in 10 seconds...")
time.sleep(10)
else:
print("Max retries reached. Exiting.")
else:
print("Data fetched successfully.")
break # 成功后退出循环
if reservoir_liquid_level_non_realtime_data_list:
for data in reservoir_liquid_level_non_realtime_data_list:
# 创建Point对象
point = (
Point('reservoir_liquid_level_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if tank_liquid_level_non_realtime_data_list:
for data in tank_liquid_level_non_realtime_data_list:
# 创建Point对象
point = (
Point('tank_liquid_level_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if fixed_pump_non_realtime_data_list:
for data in fixed_pump_non_realtime_data_list:
# 创建Point对象
point = (
Point('fixed_pump_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if variable_pump_non_realtime_data_list:
for data in variable_pump_non_realtime_data_list:
# 创建Point对象
point = (
Point('variable_pump_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if source_outflow_non_realtime_data_list:
for data in source_outflow_non_realtime_data_list:
# 创建Point对象
point = (
Point('source_outflow_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
#
if pipe_flow_non_realtime_data_list:
for data in pipe_flow_non_realtime_data_list:
# 创建Point对象
point = (
Point('pipe_flow_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if pressure_non_realtime_data_list:
for data in pressure_non_realtime_data_list:
# 创建Point对象
point = (
Point('pressure_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if demand_non_realtime_data_list:
for data in demand_non_realtime_data_list:
# 创建Point对象
point = (
Point('demand_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if quality_non_realtime_data_list:
for data in quality_non_realtime_data_list:
# 创建Point对象
point = (
Point('quality_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]:
"""
根据SCADA设备的ID和时间查询值
:param query_ids_list: SCADA设备ID的列表
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建查询字典
SCADA_result_dict = {}
for device_id in query_ids_list:
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["device_ID"] == "{device_id}")
|> filter(fn: (r) => r["_field"] == "monitored_value")
'''
# 执行查询
try:
result = query_api.query(flux_query)
# 从查询结果中提取 monitored_value
if result:
# 假设返回的结果为一行数据
for table in result:
for record in table.records:
# 获取字段 "_value" 即为 monitored_value
monitored_value = record.get_value()
SCADA_result_dict[device_id] = monitored_value
else:
# 如果没有结果,默认设置为 None 或其他值
SCADA_result_dict[device_id] = None
except Exception as e:
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
SCADA_result_dict[device_id] = None
return SCADA_result_dict
# 2025/02/01
def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
result_start_time: str,
bucket: str = "realtime_simulation_result", client: InfluxDBClient = client):
"""
将实时模拟计算结果数据存储到 InfluxDB 的realtime_simulation_result这个bucket中。
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
:param result_start_time: (str): 计算结果的模拟开始时间。
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_simulation_result"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 开始写入数据
try:
write_api = client.write_api()
date_str = result_start_time.split('T')[0]
time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat()
for result in node_result_list:
# 提取节点信息和结果数据
node_id = result.get('node')
data_list = result.get('result', [])
for data in data_list:
# 构建 Point 数据,多个 field 存在于一个数据点中
node_point = Point("node") \
.tag("date", date_str) \
.tag("ID", node_id) \
.field("head", data.get('head', 0.0)) \
.field("pressure", data.get('pressure', 0.0)) \
.field("actualdemand", data.get('demand', 0.0)) \
.field("demanddeficit", None) \
.field("totalExternalOutflow", None) \
.field("quality", data.get('quality', 0.0)) \
.time(time_beijing)
# 写入数据到 InfluxDB多个 field 在同一个 point 中
write_api.write(bucket=bucket, org=org_name, record=node_point)
write_api.flush()
print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。")
for result in link_result_list:
link_id = result.get('link')
data_list = result.get('result', [])
for data in data_list:
link_point = Point("link") \
.tag("date", date_str) \
.tag("ID", link_id) \
.field("flow", data.get('flow', 0.0)) \
.field("velocity", data.get('velocity', 0.0)) \
.field("headloss", data.get('headloss', 0.0)) \
.field("quality", data.get('quality', 0.0)) \
.field("status", data.get('status', "UNKNOWN")) \
.field("setting", data.get('setting', 0.0)) \
.field("reaction", data.get('reaction', 0.0)) \
.field("friction", data.get('friction', 0.0)) \
.time(time_beijing)
write_api.write(bucket=bucket, org=org_name, record=link_point)
write_api.flush()
print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。")
except Exception as e:
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
# 2025/02/01
def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> dict:
"""
查询指定ID的最新的一条记录
:param ID: (str): 要查询的 ID。
:param type: (str): "node"或“link”
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。
:return: dict: 最新记录的数据,如果没有找到则返回 None。
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
if type == "node":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -30d) // 查找最近一月的记录
|> filter(fn: (r) => r["_measurement"] == "node")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"nodeID": ID,
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
# "demanddeficit": record["demanddeficit"],
# "totalExternalOutflow": record["totalExternalOutflow"],
"quality": record["quality"]
}
elif type == "link":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -30d) // 查找最近一月的记录
|> filter(fn: (r) => r["_measurement"] == "link")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"linkID": ID,
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
}
return None # 如果没有找到记录
# 2025/02/01
def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple:
"""
查询指定北京时间的所有记录,包括 'node''link' measurement分别以指定格式返回。
:param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
'''
# 执行查询
tables = query_api.query(flux_query)
node_records = []
link_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
measurement = record["_measurement"]
# 处理 node 数据
if measurement == "node":
node_records.append({
"time": record["_time"],
"ID": record["ID"],
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
"quality": record["quality"]
})
# 处理 link 数据
elif measurement == "link":
link_records.append({
"time": record["_time"],
"linkID": record["ID"],
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
})
return node_records, link_records
# 2025/02/01
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list:
"""
根据 type 查询对应的 measurement根据 ID 和 date 查询对应的 tag根据 property 查询对应的 field。
:param ID: (str): 要查询的 IDtag
:param type: (str): 查询的类型(决定 measurement
:param property: (str): 查询的字段名称field
:param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD'
:param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD'
:param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_simulation_result"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例
:return: 查询结果的列表
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 解析日期范围(当天的 UTC 开始和结束时间)
# previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)
# start_time = previous_day.isoformat() + "T16:00:00Z"
# stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z"
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
# 解析查询结果
results = []
for table in tables:
for record in table.records:
results.append({
"time": record["_time"],
"value": record["_value"]
})
return results
# 示例调用
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step1: 检查连接状态初始化influxdb的buckets
# try:
# # delete_buckets(client, org_name)
# create_and_initialize_buckets(client, org_name)
# except Exception as e:
# print(f"连接失败: {e}")
# finally:
# client.close()
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
query_pg_scada_info_realtime('bb')
query_pg_scada_info_non_realtime('bb')
# 手动执行
# store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-07T16:52:00+08:00')
store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00')
# step3: 查询测试示例
# with InfluxDBClient(url=url, token=token, org=org_name) as client:
# 示例1query_latest_record_by_ID
# bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称
# node_id = "ZBBDTZDP000022" # 查询的节点 ID
# link_id = "ZBBGXSZW000002"
#
# latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
# # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
#
# if latest_record:
# print("最新记录:", latest_record)
# else:
# print("未找到符合条件的记录。")
# 示例2query_all_record_by_time
# node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00")
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
# 示例3query_curve_by_ID_property_daterange
# curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head",
# start_date="2024-11-25", end_date="2024-11-25")
# print(curve_result)
# 示例4query_SCADA_data_by_device_ID_and_time
# SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-08T10:30:00+08:00')
# print(SCADA_result_dict)