1357 lines
60 KiB
Python
1357 lines
60 KiB
Python
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
|
||
from typing import List, Dict
|
||
from datetime import datetime, timedelta, timezone
|
||
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
|
||
from dateutil import parser
|
||
import get_realValue
|
||
import get_data
|
||
import psycopg
|
||
import time
|
||
import simulation
|
||
from tjnetwork import *
|
||
import schedule
|
||
import threading
|
||
import globals
|
||
import influxdb_info
|
||
|
||
# influxdb数据库连接信息
|
||
url = influxdb_info.url
|
||
token = influxdb_info.token
|
||
org_name = influxdb_info.org
|
||
client = InfluxDBClient(url=url, token=token, org=org_name)
|
||
|
||
# # 所有实时更新数据的SCADA设备的ID
|
||
# flow_device_ids = ['2498', '3854', '3853']
|
||
# pressure_device_ids = ['2510', '2514']
|
||
# reservoir_liquid_level_ids = ['2497', '2571']
|
||
# tank_liquid_level_ids = ['4780', '9774']
|
||
# pump_device_ids = ['2747', '2776', '2730', '2787', '2500', '2502', '2504']
|
||
#
|
||
# # 用于更改数据的SCADA设的ID
|
||
# change_data_device_ids = ['2498', '3854', '3853', '2497', '2571', '4780', '9774',
|
||
# '2747', '2776', '2730', '2787', '2500', '2502', '2504']
|
||
|
||
# # 全局变量,用于存储不同类型的realtime api_query_id
|
||
# reservoir_liquid_level_realtime_ids = []
|
||
# tank_liquid_level_realtime_ids = []
|
||
# fixed_pump_realtime_ids = []
|
||
# variable_pump_realtime_ids = []
|
||
# source_outflow_realtime_ids = []
|
||
# pipe_flow_realtime_ids = []
|
||
# pressure_realtime_ids = []
|
||
# demand_realtime_ids = []
|
||
# quality_realtime_ids = []
|
||
#
|
||
# # transmission_frequency的最大值
|
||
# transmission_frequency = None
|
||
# hydraulic_timestep = None
|
||
#
|
||
# reservoir_liquid_level_non_realtime_ids = []
|
||
# tank_liquid_level_non_realtime_ids = []
|
||
# fixed_pump_non_realtime_ids = []
|
||
# variable_pump_non_realtime_ids = []
|
||
# source_outflow_non_realtime_ids = []
|
||
# pipe_flow_non_realtime_ids = []
|
||
# pressure_non_realtime_ids = []
|
||
# demand_non_realtime_ids = []
|
||
# quality_non_realtime_ids = []
|
||
|
||
|
||
def query_pg_scada_info_realtime(name: str) -> None:
|
||
"""
|
||
查询pg数据库中,scada_info中,属于realtime的数据
|
||
:param name: 数据库名称
|
||
:return:
|
||
"""
|
||
# 连接数据库
|
||
conn_string = f"dbname={name} host=127.0.0.1"
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 查询 transmission_mode 为 'realtime' 的记录
|
||
cur.execute("""
|
||
SELECT type, api_query_id
|
||
FROM scada_info
|
||
WHERE transmission_mode = 'realtime';
|
||
""")
|
||
records = cur.fetchall()
|
||
|
||
# 清空全局列表
|
||
globals.reservoir_liquid_level_realtime_ids.clear()
|
||
globals.tank_liquid_level_realtime_ids.clear()
|
||
globals.fixed_pump_realtime_ids.clear()
|
||
globals.variable_pump_realtime_ids.clear()
|
||
globals.source_outflow_realtime_ids.clear()
|
||
globals.pipe_flow_realtime_ids.clear()
|
||
globals.pressure_realtime_ids.clear()
|
||
globals.demand_realtime_ids.clear()
|
||
globals.quality_realtime_ids.clear()
|
||
|
||
# 根据 type 分类存储 api_query_id
|
||
for record in records:
|
||
record_type, api_query_id = record
|
||
if api_query_id is not None: # 确保 api_query_id 不为空
|
||
if record_type == "reservoir_liquid_level":
|
||
globals.reservoir_liquid_level_realtime_ids.append(api_query_id)
|
||
elif record_type == "tank_liquid_level":
|
||
globals.tank_liquid_level_realtime_ids.append(api_query_id)
|
||
elif record_type == "fixed_pump":
|
||
globals.fixed_pump_realtime_ids.append(api_query_id)
|
||
elif record_type == "variable_pump":
|
||
globals.variable_pump_realtime_ids.append(api_query_id)
|
||
elif record_type == "source_outflow":
|
||
globals.source_outflow_realtime_ids.append(api_query_id)
|
||
elif record_type == "pipe_flow":
|
||
globals.pipe_flow_realtime_ids.append(api_query_id)
|
||
elif record_type == "pressure":
|
||
globals.pressure_realtime_ids.append(api_query_id)
|
||
elif record_type == "demand":
|
||
globals.demand_realtime_ids.append(api_query_id)
|
||
elif record_type == "quality":
|
||
globals.quality_realtime_ids.append(api_query_id)
|
||
|
||
# 打印结果,方便调试
|
||
# print("Query completed. Results:")
|
||
# print("Reservoir Liquid Level IDs:", globals.reservoir_liquid_level_realtime_ids)
|
||
# print("Tank Liquid Level IDs:", globals.tank_liquid_level_realtime_ids)
|
||
# print("Fixed Pump IDs:", globals.fixed_pump_realtime_ids)
|
||
# print("Variable Pump IDs:", globals.variable_pump_realtime_ids)
|
||
# print("Source Outflow IDs:", globals.source_outflow_realtime_ids)
|
||
# print("Pipe Flow IDs:", globals.pipe_flow_realtime_ids)
|
||
# print("Pressure IDs:", globals.pressure_realtime_ids)
|
||
# print("Demand IDs:", globals.demand_realtime_ids)
|
||
# print("Quality IDs:", globals.quality_realtime_ids)
|
||
|
||
except Exception as e:
|
||
print(f"查询时发生错误:{e}")
|
||
|
||
|
||
def query_pg_scada_info_non_realtime(name: str) -> None:
|
||
"""
|
||
查询pg数据库中,scada_info中,属于non_realtime的数据,以及这些数据transmission_frequency的最大值
|
||
:param name: 数据库名称
|
||
:return:
|
||
"""
|
||
# 重新打开数据库
|
||
if is_project_open(name):
|
||
close_project(name)
|
||
open_project(name)
|
||
dic_time = get_time(name)
|
||
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
|
||
close_project(name)
|
||
# 连接数据库
|
||
conn_string = f"dbname={name} host=127.0.0.1"
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 查询 transmission_mode 为 'non_realtime' 的记录
|
||
cur.execute("""
|
||
SELECT type, api_query_id, transmission_frequency
|
||
FROM scada_info
|
||
WHERE transmission_mode = 'non_realtime';
|
||
""")
|
||
records = cur.fetchall()
|
||
|
||
# 清空全局列表
|
||
globals.reservoir_liquid_level_non_realtime_ids.clear()
|
||
globals.fixed_pump_non_realtime_ids.clear()
|
||
globals.variable_pump_non_realtime_ids.clear()
|
||
globals.source_outflow_non_realtime_ids.clear()
|
||
globals.pipe_flow_non_realtime_ids.clear()
|
||
globals.pressure_non_realtime_ids.clear()
|
||
globals.demand_non_realtime_ids.clear()
|
||
globals.quality_non_realtime_ids.clear()
|
||
|
||
# 用于计算 transmission_frequency 最大值
|
||
transmission_frequencies = []
|
||
|
||
# 根据 type 分类存储 api_query_id
|
||
for record in records:
|
||
record_type, api_query_id, freq = record
|
||
if api_query_id is not None: # 确保 api_query_id 不为空
|
||
if record_type == "reservoir_liquid_level":
|
||
globals.reservoir_liquid_level_non_realtime_ids.append(api_query_id)
|
||
elif record_type == "fixed_pump":
|
||
globals.fixed_pump_non_realtime_ids.append(api_query_id)
|
||
elif record_type == "variable_pump":
|
||
globals.variable_pump_non_realtime_ids.append(api_query_id)
|
||
elif record_type == "source_outflow":
|
||
globals.source_outflow_non_realtime_ids.append(api_query_id)
|
||
elif record_type == "pipe_flow":
|
||
globals.pipe_flow_non_realtime_ids.append(api_query_id)
|
||
elif record_type == "pressure":
|
||
globals.pressure_non_realtime_ids.append(api_query_id)
|
||
elif record_type == "demand":
|
||
globals.demand_non_realtime_ids.append(api_query_id)
|
||
elif record_type == "quality":
|
||
globals.quality_non_realtime_ids.append(api_query_id)
|
||
|
||
# 收集 transmission_frequency,用于计算最大值
|
||
if freq is not None:
|
||
transmission_frequencies.append(freq)
|
||
|
||
# 计算 transmission_frequency 最大值
|
||
globals.transmission_frequency = max(transmission_frequencies) if transmission_frequencies else None
|
||
|
||
# 打印结果,方便调试
|
||
# print("Query completed. Results:")
|
||
# print("Reservoir Liquid Level Non-Realtime IDs:", globals.reservoir_liquid_level_non_realtime_ids)
|
||
# print("Fixed Pump Non-Realtime IDs:", globals.fixed_pump_non_realtime_ids)
|
||
# print("Variable Pump Non-Realtime IDs:", globals.variable_pump_non_realtime_ids)
|
||
# print("Source Outflow Non-Realtime IDs:", globals.source_outflow_non_realtime_ids)
|
||
# print("Pipe Flow Non-Realtime IDs:", globals.pipe_flow_non_realtime_ids)
|
||
# print("Pressure Non-Realtime IDs:", globals.pressure_non_realtime_ids)
|
||
# print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids)
|
||
# print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids)
|
||
# print("Maximum Transmission Frequency:", globals.transmission_frequency)
|
||
# print("Hydraulic Timestep:", globals.hydraulic_timestep)
|
||
|
||
|
||
except Exception as e:
|
||
print(f"查询时发生错误:{e}")
|
||
|
||
|
||
# 2025/02/01
|
||
def delete_buckets(client: InfluxDBClient, org_name: str) -> None:
|
||
"""
|
||
删除InfluxDB中指定organization下的所有buckets。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:param org_name: InfluxDB中organization的名称。
|
||
:return: None
|
||
"""
|
||
buckets_api = client.buckets_api()
|
||
buckets_obj = buckets_api.find_buckets(org=org_name)
|
||
|
||
# 确保 buckets_obj 拥有 buckets 属性
|
||
if hasattr(buckets_obj, 'buckets'):
|
||
for bucket in buckets_obj.buckets:
|
||
try:
|
||
buckets_api.delete_bucket(bucket)
|
||
print(f"Bucket {bucket.name} has been deleted successfully.")
|
||
except Exception as e:
|
||
print(f"Failed to delete bucket {bucket.name}: {e}")
|
||
else:
|
||
print("未找到 buckets 属性,无法迭代 buckets。")
|
||
|
||
|
||
# 2025/02/01
|
||
def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None:
|
||
"""
|
||
初始化influxdb的三个数据存储库,分别为SCADA_data、realtime_simulation_result、scheme_simulation_result
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:param org_name: InfluxDB中organization的名称
|
||
:return:
|
||
"""
|
||
# 先删除原有的,然后再进行初始化
|
||
delete_buckets(client, org_name)
|
||
|
||
bucket_api = BucketsApi(client)
|
||
write_api = client.write_api()
|
||
org_api = OrganizationsApi(client)
|
||
|
||
# 获取 org_id
|
||
org = next((o for o in org_api.find_organizations() if o.name == org_name), None)
|
||
if not org:
|
||
raise ValueError(f"Organization '{org_name}' not found.")
|
||
org_id = org.id
|
||
print(f"Using Organization ID: {org_id}")
|
||
|
||
# 定义 Buckets 信息
|
||
buckets = [
|
||
{"name": "SCADA_data", "retention_rules": []},
|
||
{"name": "realtime_simulation_result", "retention_rules": []},
|
||
{"name": "scheme_simulation_result", "retention_rules": []}
|
||
]
|
||
|
||
# 创建 Buckets 并初始化数据
|
||
for bucket in buckets:
|
||
# 创建 Bucket
|
||
created_bucket = bucket_api.create_bucket(
|
||
bucket_name=bucket["name"],
|
||
retention_rules=bucket["retention_rules"],
|
||
org_id=org_id
|
||
)
|
||
print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}")
|
||
|
||
# 根据 Bucket 初始化数据
|
||
if bucket["name"] == "SCADA_data":
|
||
point = Point("SCADA") \
|
||
.tag("date", None) \
|
||
.tag("description", None) \
|
||
.tag("device_ID", None) \
|
||
.field("monitored_value", 0.0) \
|
||
.field("datacleaning_value", 0.0) \
|
||
.field("simulation_value", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
write_api.write(bucket="SCADA_data", org=org_name, record=point)
|
||
print("Initialized SCADA_data with default structure.")
|
||
|
||
elif bucket["name"] == "realtime_simulation_result": # realtime_simulation_result
|
||
link_point = Point("link") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.field("flow", 0.0) \
|
||
.field("leakage", 0.0) \
|
||
.field("velocity", 0.0) \
|
||
.field("headloss", 0.0) \
|
||
.field("status", None) \
|
||
.field("setting", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
node_point = Point("node") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.field("head", 0.0) \
|
||
.field("pressure", 0.0) \
|
||
.field("actualdemand", 0.0) \
|
||
.field("demanddeficit", 0.0) \
|
||
.field("totalExternalOutflow", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point)
|
||
write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point)
|
||
print("Initialized realtime_simulation_result with default structure.")
|
||
|
||
elif bucket["name"] == "scheme_simulation_result":
|
||
link_point = Point("link") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.tag("scheme_Type", None) \
|
||
.tag("scheme_Name", None) \
|
||
.field("flow", 0.0) \
|
||
.field("leakage", 0.0) \
|
||
.field("velocity", 0.0) \
|
||
.field("headloss", 0.0) \
|
||
.field("status", None) \
|
||
.field("setting", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
node_point = Point("node") \
|
||
.tag("date", None) \
|
||
.tag("ID", None) \
|
||
.tag("scheme_Type", None) \
|
||
.tag("scheme_Name", None) \
|
||
.field("head", 0.0) \
|
||
.field("pressure", 0.0) \
|
||
.field("actualdemand", 0.0) \
|
||
.field("demanddeficit", 0.0) \
|
||
.field("totalExternalOutflow", 0.0) \
|
||
.field("quality", 0.0) \
|
||
.time("2024-11-21T00:00:00Z")
|
||
|
||
write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point)
|
||
write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point)
|
||
print("Initialized scheme_simulation_result with default structure.")
|
||
|
||
|
||
print("All buckets created and initialized successfully.")
|
||
|
||
|
||
def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None:
|
||
"""
|
||
将SCADA数据通过数据接口导入数据库
|
||
:param get_real_value_time: 获取数据的时间,格式如'2024-11-25T09:00:00+08:00'
|
||
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:return:
|
||
"""
|
||
if client.ping():
|
||
print("{} -- Successfully connected to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
else:
|
||
print("{} -- Failed to connect to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
write_api = client.write_api(write_options=ASYNCHRONOUS)
|
||
try_count = 0
|
||
reservoir_liquid_level_realtime_data_list = []
|
||
tank_liquid_level_realtime_data_list = []
|
||
fixed_pump_realtime_data_list =[]
|
||
variable_pump_realtime_data_list =[]
|
||
source_outflow_realtime_data_list = []
|
||
pipe_flow_realtime_data_list = []
|
||
pressure_realtime_data_list =[]
|
||
demand_realtime_data_list = []
|
||
quality_realtime_data_list = []
|
||
while try_count <= 5: # 尝试6次 *******
|
||
try:
|
||
try_count += 1
|
||
if globals.reservoir_liquid_level_realtime_ids:
|
||
reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.reservoir_liquid_level_realtime_ids))
|
||
if globals.tank_liquid_level_realtime_ids:
|
||
tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids))
|
||
if globals.fixed_pump_realtime_ids:
|
||
fixed_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.fixed_pump_realtime_ids))
|
||
if globals.variable_pump_realtime_ids:
|
||
variable_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.variable_pump_realtime_ids))
|
||
if globals.source_outflow_realtime_ids:
|
||
source_outflow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.source_outflow_realtime_ids))
|
||
if globals.pipe_flow_realtime_ids:
|
||
pipe_flow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pipe_flow_realtime_ids))
|
||
if globals.pressure_realtime_ids:
|
||
pressure_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pressure_realtime_ids))
|
||
if globals.demand_realtime_ids:
|
||
demand_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.demand_realtime_ids))
|
||
if globals.quality_realtime_ids:
|
||
quality_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.quality_realtime_ids))
|
||
except Exception as e:
|
||
print(e)
|
||
time.sleep(10)
|
||
else:
|
||
try_count = 100
|
||
|
||
# 写入数据
|
||
if reservoir_liquid_level_realtime_data_list:
|
||
for data in reservoir_liquid_level_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('reservoir_liquid_level_realtime') # measurement name
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if tank_liquid_level_realtime_data_list:
|
||
for data in tank_liquid_level_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('tank_liquid_level_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", (monitored_value))
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if fixed_pump_realtime_data_list:
|
||
for data in fixed_pump_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('fixed_pump_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if variable_pump_realtime_data_list:
|
||
for data in variable_pump_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('variable_pump_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if source_outflow_realtime_data_list:
|
||
for data in source_outflow_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('source_outflow_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if pipe_flow_realtime_data_list:
|
||
for data in pipe_flow_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('pipe_flow_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if pressure_realtime_data_list:
|
||
for data in pressure_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('pressure_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if demand_realtime_data_list:
|
||
for data in demand_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('demand_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
if quality_realtime_data_list:
|
||
for data in quality_realtime_data_list:
|
||
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
|
||
data_time = datetime.fromisoformat(data['time'])
|
||
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
|
||
|
||
# 将获取的时间转换为 UTC 时间
|
||
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
|
||
|
||
# 计算时间差(绝对值)
|
||
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
|
||
|
||
# 判断时间差是否超过1分钟
|
||
if time_difference > 60: # 超过1分钟
|
||
monitored_value = None
|
||
else: # 小于等于3分钟
|
||
monitored_value = data['monitored_value']
|
||
|
||
# 创建Point对象
|
||
point = (
|
||
Point('quality_realtime')
|
||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", monitored_value)
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(get_real_value_time_utc)
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
write_api.flush()
|
||
|
||
|
||
def convert_time_format(original_time: str) -> str:
|
||
"""
|
||
格式转换,将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00”
|
||
:param original_time: str, “2024-04-13T08:00:00+08:00"格式的时间
|
||
:return: str,“2024-04-13 08:00:00”格式的时间
|
||
"""
|
||
new_time = original_time.replace('T', ' ')
|
||
new_time = new_time.replace('+08:00', '')
|
||
return new_time
|
||
|
||
|
||
# 筛选符合条件的数据
|
||
def is_timestep_multiple(data_time, timestep):
|
||
# 获取时间点距离当天0点的时间差
|
||
midnight = data_time.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
delta_since_midnight = data_time - midnight
|
||
# 检查时间差是否为时间步长的整数倍
|
||
return delta_since_midnight.total_seconds() % timestep.total_seconds() == 0
|
||
|
||
# 2025/01/10
|
||
def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None:
|
||
"""
|
||
获取某段时间内传回的scada数据
|
||
:param get_history_data_end_time: 获取历史数据的终止时间时间,格式如'2024-11-25T09:00:00+08:00'
|
||
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:return:
|
||
"""
|
||
if client.ping():
|
||
print("{} -- Successfully connected to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
else:
|
||
print("{} -- Failed to connect to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
write_api = client.write_api(write_options=SYNCHRONOUS)
|
||
|
||
# 将end_date字符串转换为datetime对象
|
||
end_date_dt = datetime.strptime(convert_time_format(get_history_data_end_time), '%Y-%m-%d %H:%M:%S')
|
||
end_date = end_date_dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
# 将transmission_frequency字符串转换为timedelta对象
|
||
transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1)
|
||
get_history_data_start_time = end_date_dt - transmission_frequency_dt
|
||
begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S')
|
||
# print(begin_date)
|
||
# print(end_date)
|
||
|
||
reservoir_liquid_level_non_realtime_data_list = []
|
||
tank_liquid_level_non_realtime_data_list = []
|
||
fixed_pump_non_realtime_data_list = []
|
||
variable_pump_non_realtime_data_list = []
|
||
source_outflow_non_realtime_data_list = []
|
||
pipe_flow_non_realtime_data_list = []
|
||
pressure_non_realtime_data_list = []
|
||
demand_non_realtime_data_list = []
|
||
quality_non_realtime_data_list = []
|
||
|
||
|
||
try_count = 0
|
||
while try_count < 5:
|
||
try:
|
||
try_count += 1
|
||
# reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
|
||
# ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m')
|
||
if globals.reservoir_liquid_level_non_realtime_ids:
|
||
reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.reservoir_liquid_level_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
|
||
if globals.tank_liquid_level_non_realtime_ids:
|
||
tank_liquid_level_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.tank_liquid_level_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
|
||
if globals.fixed_pump_non_realtime_ids:
|
||
fixed_pump_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.fixed_pump_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
|
||
if globals.variable_pump_non_realtime_ids:
|
||
variable_pump_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.variable_pump_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
|
||
if globals.source_outflow_non_realtime_ids:
|
||
source_outflow_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.source_outflow_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
|
||
if globals.pipe_flow_non_realtime_ids:
|
||
pipe_flow_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.pipe_flow_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
# print(pipe_flow_non_realtime_data_list)
|
||
|
||
if globals.pressure_non_realtime_ids:
|
||
pressure_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.pressure_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
# print(pressure_non_realtime_data_list)
|
||
|
||
if globals.demand_non_realtime_ids:
|
||
demand_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.demand_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
|
||
if globals.quality_non_realtime_ids:
|
||
quality_non_realtime_data_list = get_data.get_history_data(
|
||
ids=','.join(globals.quality_non_realtime_ids),
|
||
begin_date=begin_date, end_date=end_date,
|
||
downsample='1m')
|
||
|
||
except Exception as e:
|
||
print(f"Attempt {try_count} failed with error: {e}")
|
||
if try_count < 5:
|
||
print("Retrying in 10 seconds...")
|
||
time.sleep(10)
|
||
else:
|
||
print("Max retries reached. Exiting.")
|
||
|
||
else:
|
||
print("Data fetched successfully.")
|
||
break # 成功后退出循环
|
||
|
||
if reservoir_liquid_level_non_realtime_data_list:
|
||
for data in reservoir_liquid_level_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('reservoir_liquid_level_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
if tank_liquid_level_non_realtime_data_list:
|
||
for data in tank_liquid_level_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('tank_liquid_level_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
if fixed_pump_non_realtime_data_list:
|
||
for data in fixed_pump_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('fixed_pump_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
if variable_pump_non_realtime_data_list:
|
||
for data in variable_pump_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('variable_pump_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
if source_outflow_non_realtime_data_list:
|
||
for data in source_outflow_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('source_outflow_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
#
|
||
if pipe_flow_non_realtime_data_list:
|
||
for data in pipe_flow_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('pipe_flow_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
if pressure_non_realtime_data_list:
|
||
for data in pressure_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('pressure_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
if demand_non_realtime_data_list:
|
||
for data in demand_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('demand_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
if quality_non_realtime_data_list:
|
||
for data in quality_non_realtime_data_list:
|
||
# 创建Point对象
|
||
point = (
|
||
Point('quality_non_realtime')
|
||
.tag("date", data['time'].strftime('%Y-%m-%d'))
|
||
.tag("description", data['description'])
|
||
.tag("device_ID", data['device_ID'])
|
||
.field("monitored_value", data['monitored_value'])
|
||
.field("datacleaning_value", None)
|
||
.field("simulation_value", None)
|
||
.time(data['time'])
|
||
)
|
||
write_api.write(bucket=bucket, org=org_name, record=point)
|
||
|
||
|
||
def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]:
|
||
"""
|
||
根据SCADA设备的ID和时间查询值
|
||
:param query_ids_list: SCADA设备ID的列表
|
||
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
||
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。
|
||
:param client: 已初始化的 InfluxDBClient 实例。
|
||
:return:
|
||
"""
|
||
if client.ping():
|
||
print("{} -- Successfully connected to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
else:
|
||
print("{} -- Failed to connect to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
query_api = client.query_api()
|
||
# 将北京时间转换为 UTC 时间
|
||
beijing_time = datetime.fromisoformat(query_time)
|
||
utc_time = beijing_time.astimezone(timezone.utc)
|
||
utc_start_time = utc_time - timedelta(seconds=1)
|
||
utc_stop_time = utc_time + timedelta(seconds=1)
|
||
|
||
# 构建查询字典
|
||
SCADA_result_dict = {}
|
||
|
||
for device_id in query_ids_list:
|
||
# 构建 Flux 查询语句
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
||
|> filter(fn: (r) => r["device_ID"] == "{device_id}")
|
||
|> filter(fn: (r) => r["_field"] == "monitored_value")
|
||
'''
|
||
|
||
# 执行查询
|
||
try:
|
||
result = query_api.query(flux_query)
|
||
|
||
# 从查询结果中提取 monitored_value
|
||
if result:
|
||
# 假设返回的结果为一行数据
|
||
for table in result:
|
||
for record in table.records:
|
||
# 获取字段 "_value" 即为 monitored_value
|
||
monitored_value = record.get_value()
|
||
SCADA_result_dict[device_id] = monitored_value
|
||
else:
|
||
# 如果没有结果,默认设置为 None 或其他值
|
||
SCADA_result_dict[device_id] = None
|
||
except Exception as e:
|
||
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
|
||
SCADA_result_dict[device_id] = None
|
||
|
||
return SCADA_result_dict
|
||
|
||
|
||
# 2025/02/01
|
||
def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
|
||
result_start_time: str,
|
||
bucket: str = "realtime_simulation_result", client: InfluxDBClient = client):
|
||
"""
|
||
将实时模拟计算结果数据存储到 InfluxDB 的realtime_simulation_result这个bucket中。
|
||
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
|
||
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
|
||
:param result_start_time: (str): 计算结果的模拟开始时间。
|
||
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_simulation_result"。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:return:
|
||
"""
|
||
|
||
if client.ping():
|
||
print("{} -- store_realtime_simulation_result_to_influxdb : Successfully connected to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
else:
|
||
print("{} -- store_realtime_simulation_result_to_influxdb : Failed to connect to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
|
||
# 开始写入数据
|
||
try:
|
||
write_api = client.write_api()
|
||
date_str = result_start_time.split('T')[0]
|
||
time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat()
|
||
for result in node_result_list:
|
||
# 提取节点信息和结果数据
|
||
node_id = result.get('node')
|
||
data_list = result.get('result', [])
|
||
|
||
for data in data_list:
|
||
# 构建 Point 数据,多个 field 存在于一个数据点中
|
||
node_point = Point("node") \
|
||
.tag("date", date_str) \
|
||
.tag("ID", node_id) \
|
||
.field("head", data.get('head', 0.0)) \
|
||
.field("pressure", data.get('pressure', 0.0)) \
|
||
.field("actualdemand", data.get('demand', 0.0)) \
|
||
.field("demanddeficit", None) \
|
||
.field("totalExternalOutflow", None) \
|
||
.field("quality", data.get('quality', 0.0)) \
|
||
.time(time_beijing)
|
||
|
||
# 写入数据到 InfluxDB,多个 field 在同一个 point 中
|
||
write_api.write(bucket=bucket, org=org_name, record=node_point)
|
||
write_api.flush()
|
||
print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。")
|
||
for result in link_result_list:
|
||
link_id = result.get('link')
|
||
data_list = result.get('result', [])
|
||
|
||
for data in data_list:
|
||
link_point = Point("link") \
|
||
.tag("date", date_str) \
|
||
.tag("ID", link_id) \
|
||
.field("flow", data.get('flow', 0.0)) \
|
||
.field("velocity", data.get('velocity', 0.0)) \
|
||
.field("headloss", data.get('headloss', 0.0)) \
|
||
.field("quality", data.get('quality', 0.0)) \
|
||
.field("status", data.get('status', "UNKNOWN")) \
|
||
.field("setting", data.get('setting', 0.0)) \
|
||
.field("reaction", data.get('reaction', 0.0)) \
|
||
.field("friction", data.get('friction', 0.0)) \
|
||
.time(time_beijing)
|
||
write_api.write(bucket=bucket, org=org_name, record=link_point)
|
||
write_api.flush()
|
||
print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。")
|
||
|
||
except Exception as e:
|
||
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
|
||
|
||
|
||
# 2025/02/01
|
||
def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> dict:
|
||
"""
|
||
查询指定ID的最新的一条记录
|
||
:param ID: (str): 要查询的 ID。
|
||
:param type: (str): "node"或“link”
|
||
:param bucket: (str): 数据存储的 bucket 名称。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。
|
||
:return: dict: 最新记录的数据,如果没有找到则返回 None。
|
||
"""
|
||
if client.ping():
|
||
print("{} -- Successfully connected to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
else:
|
||
print("{} -- Failed to connect to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
query_api = client.query_api()
|
||
if type == "node":
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: -30d) // 查找最近一月的记录
|
||
|> filter(fn: (r) => r["_measurement"] == "node")
|
||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||
|> pivot(
|
||
rowKey:["_time"],
|
||
columnKey:["_field"],
|
||
valueColumn:"_value"
|
||
)
|
||
|> sort(columns: ["_time"], desc: true)
|
||
|> limit(n: 1)
|
||
'''
|
||
tables = query_api.query(flux_query)
|
||
# 解析查询结果
|
||
for table in tables:
|
||
for record in table.records:
|
||
|
||
return {
|
||
"time": record["_time"],
|
||
"nodeID": ID,
|
||
"head": record["head"],
|
||
"pressure": record["pressure"],
|
||
"actualdemand": record["actualdemand"],
|
||
# "demanddeficit": record["demanddeficit"],
|
||
# "totalExternalOutflow": record["totalExternalOutflow"],
|
||
"quality": record["quality"]
|
||
}
|
||
elif type == "link":
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: -30d) // 查找最近一月的记录
|
||
|> filter(fn: (r) => r["_measurement"] == "link")
|
||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||
|> pivot(
|
||
rowKey:["_time"],
|
||
columnKey:["_field"],
|
||
valueColumn:"_value"
|
||
)
|
||
|> sort(columns: ["_time"], desc: true)
|
||
|> limit(n: 1)
|
||
'''
|
||
tables = query_api.query(flux_query)
|
||
# 解析查询结果
|
||
for table in tables:
|
||
for record in table.records:
|
||
return {
|
||
"time": record["_time"],
|
||
"linkID": ID,
|
||
"flow": record["flow"],
|
||
"velocity": record["velocity"],
|
||
"headloss": record["headloss"],
|
||
"quality": record["quality"],
|
||
"status": record["status"],
|
||
"setting": record["setting"],
|
||
"reaction": record["reaction"],
|
||
"friction": record["friction"]
|
||
}
|
||
return None # 如果没有找到记录
|
||
|
||
|
||
# 2025/02/01
|
||
def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple:
|
||
"""
|
||
查询指定北京时间的所有记录,包括 'node' 和 'link' measurement,分别以指定格式返回。
|
||
:param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
||
:param bucket: (str): 数据存储的 bucket 名称。
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||
:return: dict: tuple: (node_records, link_records)
|
||
"""
|
||
if client.ping():
|
||
print("{} -- Successfully connected to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
else:
|
||
print("{} -- Failed to connect to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
query_api = client.query_api()
|
||
# 将北京时间转换为 UTC 时间
|
||
beijing_time = datetime.fromisoformat(query_time)
|
||
utc_time = beijing_time.astimezone(timezone.utc)
|
||
utc_start_time = utc_time - timedelta(seconds=1)
|
||
utc_stop_time = utc_time + timedelta(seconds=1)
|
||
|
||
# 构建 Flux 查询语句
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
||
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|
||
|> pivot(
|
||
rowKey:["_time"],
|
||
columnKey:["_field"],
|
||
valueColumn:"_value"
|
||
)
|
||
'''
|
||
|
||
# 执行查询
|
||
tables = query_api.query(flux_query)
|
||
node_records = []
|
||
link_records = []
|
||
# 解析查询结果
|
||
|
||
for table in tables:
|
||
for record in table.records:
|
||
# print(record.values) # 打印完整记录内容
|
||
measurement = record["_measurement"]
|
||
# 处理 node 数据
|
||
if measurement == "node":
|
||
node_records.append({
|
||
"time": record["_time"],
|
||
"ID": record["ID"],
|
||
"head": record["head"],
|
||
"pressure": record["pressure"],
|
||
"actualdemand": record["actualdemand"],
|
||
"quality": record["quality"]
|
||
})
|
||
|
||
# 处理 link 数据
|
||
elif measurement == "link":
|
||
link_records.append({
|
||
"time": record["_time"],
|
||
"linkID": record["ID"],
|
||
"flow": record["flow"],
|
||
"velocity": record["velocity"],
|
||
"headloss": record["headloss"],
|
||
"quality": record["quality"],
|
||
"status": record["status"],
|
||
"setting": record["setting"],
|
||
"reaction": record["reaction"],
|
||
"friction": record["friction"]
|
||
})
|
||
|
||
return node_records, link_records
|
||
|
||
|
||
# 2025/02/01
|
||
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list:
|
||
"""
|
||
根据 type 查询对应的 measurement,根据 ID 和 date 查询对应的 tag,根据 property 查询对应的 field。
|
||
:param ID: (str): 要查询的 ID(tag)
|
||
:param type: (str): 查询的类型(决定 measurement)
|
||
:param property: (str): 查询的字段名称(field)
|
||
:param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD'
|
||
:param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD'
|
||
:param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_simulation_result"
|
||
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例
|
||
:return: 查询结果的列表
|
||
"""
|
||
if client.ping():
|
||
print("{} -- Successfully connected to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
else:
|
||
print("{} -- Failed to connect to InfluxDB.".format(
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||
query_api = client.query_api()
|
||
# 确定 measurement
|
||
if type == "node":
|
||
measurement = "node"
|
||
elif type == "link":
|
||
measurement = "link"
|
||
else:
|
||
raise ValueError(f"不支持的类型: {type}")
|
||
|
||
# 解析日期范围(当天的 UTC 开始和结束时间)
|
||
# previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)
|
||
# start_time = previous_day.isoformat() + "T16:00:00Z"
|
||
# stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z"
|
||
|
||
# 将 start_date 的北京时间转换为 UTC 时间范围
|
||
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
|
||
|
||
# 构建 Flux 查询语句
|
||
flux_query = f'''
|
||
from(bucket: "{bucket}")
|
||
|> range(start: {start_time}, stop: {stop_time})
|
||
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|
||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||
|> filter(fn: (r) => r["_field"] == "{property}")
|
||
'''
|
||
|
||
# 执行查询
|
||
tables = query_api.query(flux_query)
|
||
|
||
# 解析查询结果
|
||
results = []
|
||
for table in tables:
|
||
for record in table.records:
|
||
results.append({
|
||
"time": record["_time"],
|
||
"value": record["_value"]
|
||
})
|
||
|
||
return results
|
||
|
||
|
||
# 示例调用
|
||
if __name__ == "__main__":
|
||
url = influxdb_info.url
|
||
token = influxdb_info.token
|
||
org_name = influxdb_info.org
|
||
|
||
client = InfluxDBClient(url=url, token=token)
|
||
|
||
# step1: 检查连接状态,初始化influxdb的buckets
|
||
# try:
|
||
# # delete_buckets(client, org_name)
|
||
# create_and_initialize_buckets(client, org_name)
|
||
# except Exception as e:
|
||
# print(f"连接失败: {e}")
|
||
# finally:
|
||
# client.close()
|
||
|
||
# step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里
|
||
# query_pg_scada_info_realtime('bb')
|
||
# query_pg_scada_info_non_realtime('bb')
|
||
|
||
# 手动执行
|
||
# store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-07T16:52:00+08:00')
|
||
# store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00')
|
||
|
||
# step3: 查询测试示例
|
||
with InfluxDBClient(url=url, token=token, org=org_name) as client:
|
||
# 示例1:query_latest_record_by_ID
|
||
bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称
|
||
node_id = "ZBBDTZDP000022" # 查询的节点 ID
|
||
link_id = "ZBBGXSZW000002"
|
||
|
||
latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
|
||
# latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
|
||
|
||
if latest_record:
|
||
print("最新记录:", latest_record)
|
||
else:
|
||
print("未找到符合条件的记录。")
|
||
|
||
# 示例2:query_all_record_by_time
|
||
# node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00")
|
||
# print("Node 数据:", node_records)
|
||
# print("Link 数据:", link_records)
|
||
|
||
# 示例3:query_curve_by_ID_property_daterange
|
||
# curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head",
|
||
# start_date="2024-11-25", end_date="2024-11-25")
|
||
# print(curve_result)
|
||
|
||
# 示例4:query_SCADA_data_by_device_ID_and_time
|
||
# SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-08T10:30:00+08:00')
|
||
# print(SCADA_result_dict)
|