Files
TJWaterServer/influxdb_api.py
2025-03-04 21:45:28 +08:00

2771 lines
129 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
from typing import List, Dict
from datetime import datetime, timedelta, timezone
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from dateutil import parser
import get_realValue
import get_data
import psycopg
import time
import simulation
from tjnetwork import *
import schedule
import threading
import globals
import csv
import pandas as pd
import openpyxl
import influxdb_info
import time_api
# influxdb数据库连接信息
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token, org=org_name)
# # 所有实时更新数据的SCADA设备的ID
# flow_device_ids = ['2498', '3854', '3853']
# pressure_device_ids = ['2510', '2514']
# reservoir_liquid_level_ids = ['2497', '2571']
# tank_liquid_level_ids = ['4780', '9774']
# pump_device_ids = ['2747', '2776', '2730', '2787', '2500', '2502', '2504']
#
# # 用于更改数据的SCADA设的ID
# change_data_device_ids = ['2498', '3854', '3853', '2497', '2571', '4780', '9774',
# '2747', '2776', '2730', '2787', '2500', '2502', '2504']
# # 全局变量用于存储不同类型的realtime api_query_id
# reservoir_liquid_level_realtime_ids = []
# tank_liquid_level_realtime_ids = []
# fixed_pump_realtime_ids = []
# variable_pump_realtime_ids = []
# source_outflow_realtime_ids = []
# pipe_flow_realtime_ids = []
# pressure_realtime_ids = []
# demand_realtime_ids = []
# quality_realtime_ids = []
#
# # transmission_frequency的最大值
# transmission_frequency = None
# hydraulic_timestep = None
#
# reservoir_liquid_level_non_realtime_ids = []
# tank_liquid_level_non_realtime_ids = []
# fixed_pump_non_realtime_ids = []
# variable_pump_non_realtime_ids = []
# source_outflow_non_realtime_ids = []
# pipe_flow_non_realtime_ids = []
# pressure_non_realtime_ids = []
# demand_non_realtime_ids = []
# quality_non_realtime_ids = []
def query_pg_scada_info_realtime(name: str) -> None:
"""
查询pg数据库中scada_info中属于realtime的数据
:param name: 数据库名称
:return:
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 transmission_mode 为 'realtime' 的记录
cur.execute("""
SELECT type, api_query_id
FROM scada_info
WHERE transmission_mode = 'realtime';
""")
records = cur.fetchall()
# 清空全局列表
globals.reservoir_liquid_level_realtime_ids.clear()
globals.tank_liquid_level_realtime_ids.clear()
globals.fixed_pump_realtime_ids.clear()
globals.variable_pump_realtime_ids.clear()
globals.source_outflow_realtime_ids.clear()
globals.pipe_flow_realtime_ids.clear()
globals.pressure_realtime_ids.clear()
globals.demand_realtime_ids.clear()
globals.quality_realtime_ids.clear()
# 根据 type 分类存储 api_query_id
for record in records:
record_type, api_query_id = record
if api_query_id is not None: # 确保 api_query_id 不为空
if record_type == "reservoir_liquid_level":
globals.reservoir_liquid_level_realtime_ids.append(api_query_id)
elif record_type == "tank_liquid_level":
globals.tank_liquid_level_realtime_ids.append(api_query_id)
elif record_type == "fixed_pump":
globals.fixed_pump_realtime_ids.append(api_query_id)
elif record_type == "variable_pump":
globals.variable_pump_realtime_ids.append(api_query_id)
elif record_type == "source_outflow":
globals.source_outflow_realtime_ids.append(api_query_id)
elif record_type == "pipe_flow":
globals.pipe_flow_realtime_ids.append(api_query_id)
elif record_type == "pressure":
globals.pressure_realtime_ids.append(api_query_id)
elif record_type == "demand":
globals.demand_realtime_ids.append(api_query_id)
elif record_type == "quality":
globals.quality_realtime_ids.append(api_query_id)
# 打印结果,方便调试
# print("Query completed. Results:")
# print("Reservoir Liquid Level IDs:", globals.reservoir_liquid_level_realtime_ids)
# print("Tank Liquid Level IDs:", globals.tank_liquid_level_realtime_ids)
# print("Fixed Pump IDs:", globals.fixed_pump_realtime_ids)
# print("Variable Pump IDs:", globals.variable_pump_realtime_ids)
# print("Source Outflow IDs:", globals.source_outflow_realtime_ids)
# print("Pipe Flow IDs:", globals.pipe_flow_realtime_ids)
# print("Pressure IDs:", globals.pressure_realtime_ids)
# print("Demand IDs:", globals.demand_realtime_ids)
# print("Quality IDs:", globals.quality_realtime_ids)
except Exception as e:
print(f"查询时发生错误:{e}")
def query_pg_scada_info_non_realtime(name: str) -> None:
"""
查询pg数据库中scada_info中属于non_realtime的数据以及这些数据transmission_frequency的最大值
:param name: 数据库名称
:return:
"""
# 重新打开数据库
if is_project_open(name):
close_project(name)
open_project(name)
dic_time = get_time(name)
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
close_project(name)
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 transmission_mode 为 'non_realtime' 的记录
cur.execute("""
SELECT type, api_query_id, transmission_frequency
FROM scada_info
WHERE transmission_mode = 'non_realtime';
""")
records = cur.fetchall()
# 清空全局列表
globals.reservoir_liquid_level_non_realtime_ids.clear()
globals.fixed_pump_non_realtime_ids.clear()
globals.variable_pump_non_realtime_ids.clear()
globals.source_outflow_non_realtime_ids.clear()
globals.pipe_flow_non_realtime_ids.clear()
globals.pressure_non_realtime_ids.clear()
globals.demand_non_realtime_ids.clear()
globals.quality_non_realtime_ids.clear()
# 用于计算 transmission_frequency 最大值
transmission_frequencies = []
# 根据 type 分类存储 api_query_id
for record in records:
record_type, api_query_id, freq = record
if api_query_id is not None: # 确保 api_query_id 不为空
if record_type == "reservoir_liquid_level":
globals.reservoir_liquid_level_non_realtime_ids.append(api_query_id)
elif record_type == "fixed_pump":
globals.fixed_pump_non_realtime_ids.append(api_query_id)
elif record_type == "variable_pump":
globals.variable_pump_non_realtime_ids.append(api_query_id)
elif record_type == "source_outflow":
globals.source_outflow_non_realtime_ids.append(api_query_id)
elif record_type == "pipe_flow":
globals.pipe_flow_non_realtime_ids.append(api_query_id)
elif record_type == "pressure":
globals.pressure_non_realtime_ids.append(api_query_id)
elif record_type == "demand":
globals.demand_non_realtime_ids.append(api_query_id)
elif record_type == "quality":
globals.quality_non_realtime_ids.append(api_query_id)
# 收集 transmission_frequency用于计算最大值
if freq is not None:
transmission_frequencies.append(freq)
# 计算 transmission_frequency 最大值
globals.transmission_frequency = max(transmission_frequencies) if transmission_frequencies else None
# 打印结果,方便调试
# print("Query completed. Results:")
# print("Reservoir Liquid Level Non-Realtime IDs:", globals.reservoir_liquid_level_non_realtime_ids)
# print("Fixed Pump Non-Realtime IDs:", globals.fixed_pump_non_realtime_ids)
# print("Variable Pump Non-Realtime IDs:", globals.variable_pump_non_realtime_ids)
# print("Source Outflow Non-Realtime IDs:", globals.source_outflow_non_realtime_ids)
# print("Pipe Flow Non-Realtime IDs:", globals.pipe_flow_non_realtime_ids)
# print("Pressure Non-Realtime IDs:", globals.pressure_non_realtime_ids)
# print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids)
# print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids)
# print("Maximum Transmission Frequency:", globals.transmission_frequency)
print("Hydraulic Timestep:", globals.hydraulic_timestep)
except Exception as e:
print(f"查询时发生错误:{e}")
# 2025/02/01
def delete_buckets(client: InfluxDBClient, org_name: str) -> None:
"""
删除InfluxDB中指定organization下的所有buckets。
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:param org_name: InfluxDB中organization的名称。
:return: None
"""
# 定义需要删除的 bucket 名称列表
buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result']
buckets_api = client.buckets_api()
buckets_obj = buckets_api.find_buckets(org=org_name)
# 确保 buckets_obj 拥有 buckets 属性
if hasattr(buckets_obj, 'buckets'):
for bucket in buckets_obj.buckets:
if bucket.name in buckets_to_delete: # 只删除特定名称的 bucket
try:
buckets_api.delete_bucket(bucket)
print(f"Bucket {bucket.name} has been deleted successfully.")
except Exception as e:
print(f"Failed to delete bucket {bucket.name}: {e}")
else:
print(f"Skipping bucket {bucket.name}. Not in the deletion list.")
else:
print("未找到 buckets 属性,无法迭代 buckets。")
# 2025/02/01
def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None:
"""
初始化influxdb的三个数据存储库分别为SCADA_data、realtime_simulation_result、scheme_simulation_result
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:param org_name: InfluxDB中organization的名称
:return:
"""
# 先删除原有的,然后再进行初始化
delete_buckets(client, org_name)
bucket_api = BucketsApi(client)
write_api = client.write_api()
org_api = OrganizationsApi(client)
# 获取 org_id
org = next((o for o in org_api.find_organizations() if o.name == org_name), None)
if not org:
raise ValueError(f"Organization '{org_name}' not found.")
org_id = org.id
print(f"Using Organization ID: {org_id}")
# 定义 Buckets 信息
buckets = [
{"name": "SCADA_data", "retention_rules": []},
{"name": "realtime_simulation_result", "retention_rules": []},
{"name": "scheme_simulation_result", "retention_rules": []}
]
# 创建 Buckets 并初始化数据
for bucket in buckets:
# 创建 Bucket
created_bucket = bucket_api.create_bucket(
bucket_name=bucket["name"],
retention_rules=bucket["retention_rules"],
org_id=org_id
)
print(f"Bucket '{bucket['name']}' created with ID: {created_bucket.id}")
# 根据 Bucket 初始化数据
if bucket["name"] == "SCADA_data":
point = Point("SCADA") \
.tag("date", None) \
.tag("description", None) \
.tag("device_ID", None) \
.field("monitored_value", 0.0) \
.field("datacleaning_value", 0.0) \
.field("simulation_value", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="SCADA_data", org=org_name, record=point)
print("Initialized SCADA_data with default structure.")
elif bucket["name"] == "realtime_simulation_result": # realtime_simulation_result
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.field("reaction", 0.0) \
.field("friction", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="realtime_simulation_result", org=org_name, record=link_point)
write_api.write(bucket="realtime_simulation_result", org=org_name, record=node_point)
print("Initialized realtime_simulation_result with default structure.")
elif bucket["name"] == "scheme_simulation_result":
link_point = Point("link") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("flow", 0.0) \
.field("leakage", 0.0) \
.field("velocity", 0.0) \
.field("headloss", 0.0) \
.field("status", None) \
.field("setting", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
node_point = Point("node") \
.tag("date", None) \
.tag("ID", None) \
.tag("scheme_Type", None) \
.tag("scheme_Name", None) \
.field("head", 0.0) \
.field("pressure", 0.0) \
.field("actualdemand", 0.0) \
.field("demanddeficit", 0.0) \
.field("totalExternalOutflow", 0.0) \
.field("quality", 0.0) \
.time("2024-11-21T00:00:00Z")
write_api.write(bucket="scheme_simulation_result", org=org_name, record=link_point)
write_api.write(bucket="scheme_simulation_result", org=org_name, record=node_point)
print("Initialized scheme_simulation_result with default structure.")
print("All buckets created and initialized successfully.")
def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None:
"""
将SCADA数据通过数据接口导入数据库
:param get_real_value_time: 获取数据的时间,格式如'2024-11-25T09:00:00+08:00'
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
write_api = client.write_api(write_options=ASYNCHRONOUS)
try_count = 0
reservoir_liquid_level_realtime_data_list = []
tank_liquid_level_realtime_data_list = []
fixed_pump_realtime_data_list =[]
variable_pump_realtime_data_list =[]
source_outflow_realtime_data_list = []
pipe_flow_realtime_data_list = []
pressure_realtime_data_list =[]
demand_realtime_data_list = []
quality_realtime_data_list = []
while try_count <= 5: # 尝试6次 *******
try:
try_count += 1
if globals.reservoir_liquid_level_realtime_ids:
reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue(
ids=','.join(globals.reservoir_liquid_level_realtime_ids))
if globals.tank_liquid_level_realtime_ids:
tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids))
if globals.fixed_pump_realtime_ids:
fixed_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.fixed_pump_realtime_ids))
if globals.variable_pump_realtime_ids:
variable_pump_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.variable_pump_realtime_ids))
if globals.source_outflow_realtime_ids:
source_outflow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.source_outflow_realtime_ids))
if globals.pipe_flow_realtime_ids:
pipe_flow_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pipe_flow_realtime_ids))
if globals.pressure_realtime_ids:
pressure_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.pressure_realtime_ids))
if globals.demand_realtime_ids:
demand_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.demand_realtime_ids))
if globals.quality_realtime_ids:
quality_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.quality_realtime_ids))
except Exception as e:
print(e)
time.sleep(10)
else:
try_count = 100
# 写入数据
if reservoir_liquid_level_realtime_data_list:
for data in reservoir_liquid_level_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('reservoir_liquid_level_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if tank_liquid_level_realtime_data_list:
for data in tank_liquid_level_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('tank_liquid_level_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", (monitored_value))
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if fixed_pump_realtime_data_list:
for data in fixed_pump_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('fixed_pump_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if variable_pump_realtime_data_list:
for data in variable_pump_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('variable_pump_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if source_outflow_realtime_data_list:
for data in source_outflow_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('source_outflow_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if pipe_flow_realtime_data_list:
for data in pipe_flow_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('pipe_flow_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if pressure_realtime_data_list:
for data in pressure_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('pressure_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if demand_realtime_data_list:
for data in demand_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('demand_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if quality_realtime_data_list:
for data in quality_realtime_data_list:
# 将 data['time'] 和 get_realValue_time 转换为 datetime 对象
data_time = datetime.fromisoformat(data['time'])
get_real_value_time_dt = datetime.fromisoformat(get_real_value_time).replace(tzinfo=None)
# 将获取的时间转换为 UTC 时间
get_real_value_time_utc = get_real_value_time_dt.astimezone(timezone.utc)
# 计算时间差(绝对值)
time_difference = abs((data_time - get_real_value_time_dt).total_seconds())
# 判断时间差是否超过1分钟
if time_difference > 60: # 超过1分钟
monitored_value = None
else: # 小于等于3分钟
monitored_value = data['monitored_value']
# 创建Point对象
point = (
Point('quality_realtime')
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", monitored_value)
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(get_real_value_time_utc)
)
write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
def convert_time_format(original_time: str) -> str:
"""
格式转换将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00”
:param original_time: str “2024-04-13T08:00:00+08:00"格式的时间
:return: str“2024-04-13 08:00:00”格式的时间
"""
new_time = original_time.replace('T', ' ')
new_time = new_time.replace('+08:00', '')
return new_time
# 筛选符合条件的数据
def is_timestep_multiple(data_time, timestep):
# 获取时间点距离当天0点的时间差
midnight = data_time.replace(hour=0, minute=0, second=0, microsecond=0)
delta_since_midnight = data_time - midnight
# 检查时间差是否为时间步长的整数倍
return delta_since_midnight.total_seconds() % timestep.total_seconds() == 0
# 2025/01/10
def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bucket: str = "SCADA_data", client: InfluxDBClient = client) -> None:
"""
获取某段时间内传回的scada数据
:param get_history_data_end_time: 获取历史数据的终止时间时间,格式如'2024-11-25T09:00:00+08:00'
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
write_api = client.write_api(write_options=SYNCHRONOUS)
# 将end_date字符串转换为datetime对象
end_date_dt = datetime.strptime(convert_time_format(get_history_data_end_time), '%Y-%m-%d %H:%M:%S')
end_date = end_date_dt.strftime('%Y-%m-%d %H:%M:%S')
# 将transmission_frequency字符串转换为timedelta对象
transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1)
get_history_data_start_time = end_date_dt - transmission_frequency_dt
begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S')
# print(begin_date)
# print(end_date)
reservoir_liquid_level_non_realtime_data_list = []
tank_liquid_level_non_realtime_data_list = []
fixed_pump_non_realtime_data_list = []
variable_pump_non_realtime_data_list = []
source_outflow_non_realtime_data_list = []
pipe_flow_non_realtime_data_list = []
pressure_non_realtime_data_list = []
demand_non_realtime_data_list = []
quality_non_realtime_data_list = []
try_count = 0
while try_count < 5:
try:
try_count += 1
# reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
# ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m')
if globals.reservoir_liquid_level_non_realtime_ids:
reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.reservoir_liquid_level_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.tank_liquid_level_non_realtime_ids:
tank_liquid_level_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.tank_liquid_level_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.fixed_pump_non_realtime_ids:
fixed_pump_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.fixed_pump_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.variable_pump_non_realtime_ids:
variable_pump_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.variable_pump_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.source_outflow_non_realtime_ids:
source_outflow_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.source_outflow_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.pipe_flow_non_realtime_ids:
pipe_flow_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pipe_flow_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
# print(pipe_flow_non_realtime_data_list)
if globals.pressure_non_realtime_ids:
pressure_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pressure_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
# print(pressure_non_realtime_data_list)
if globals.demand_non_realtime_ids:
demand_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.demand_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.quality_non_realtime_ids:
quality_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.quality_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
except Exception as e:
print(f"Attempt {try_count} failed with error: {e}")
if try_count < 5:
print("Retrying in 10 seconds...")
time.sleep(10)
else:
print("Max retries reached. Exiting.")
else:
print("Data fetched successfully.")
break # 成功后退出循环
if reservoir_liquid_level_non_realtime_data_list:
for data in reservoir_liquid_level_non_realtime_data_list:
# 创建Point对象
point = (
Point('reservoir_liquid_level_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if tank_liquid_level_non_realtime_data_list:
for data in tank_liquid_level_non_realtime_data_list:
# 创建Point对象
point = (
Point('tank_liquid_level_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if fixed_pump_non_realtime_data_list:
for data in fixed_pump_non_realtime_data_list:
# 创建Point对象
point = (
Point('fixed_pump_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if variable_pump_non_realtime_data_list:
for data in variable_pump_non_realtime_data_list:
# 创建Point对象
point = (
Point('variable_pump_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if source_outflow_non_realtime_data_list:
for data in source_outflow_non_realtime_data_list:
# 创建Point对象
point = (
Point('source_outflow_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if pipe_flow_non_realtime_data_list:
for data in pipe_flow_non_realtime_data_list:
# 创建Point对象
point = (
Point('pipe_flow_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if pressure_non_realtime_data_list:
for data in pressure_non_realtime_data_list:
# 创建Point对象
point = (
Point('pressure_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if demand_non_realtime_data_list:
for data in demand_non_realtime_data_list:
# 创建Point对象
point = (
Point('demand_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if quality_non_realtime_data_list:
for data in quality_non_realtime_data_list:
# 创建Point对象
point = (
Point('quality_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
# 2025/03/01
def download_history_data_manually(begin_time: str, end_time: str, bucket: str = "SCADA_data",
client: InfluxDBClient = client) -> None:
"""
获取某个时间段内所有SCADA设备的历史数据非实时执行手动补充数据版
:param begin_time: 获取历史数据的开始时间,格式如'2024-11-25T09:00:00+08:00'
:param end_time: 获取历史数据的结束时间,格式如'2024-11-25T09:00:00+08:00'
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
write_api = client.write_api(write_options=SYNCHRONOUS)
begin_date = convert_time_format(begin_time)
end_date = convert_time_format(end_time)
reservoir_liquid_level_realtime_data_list = []
tank_liquid_level_realtime_data_list = []
fixed_pump_realtime_data_list =[]
variable_pump_realtime_data_list =[]
source_outflow_realtime_data_list = []
pipe_flow_realtime_data_list = []
pressure_realtime_data_list =[]
demand_realtime_data_list = []
quality_realtime_data_list = []
reservoir_liquid_level_non_realtime_data_list = []
tank_liquid_level_non_realtime_data_list = []
fixed_pump_non_realtime_data_list = []
variable_pump_non_realtime_data_list = []
source_outflow_non_realtime_data_list = []
pipe_flow_non_realtime_data_list = []
pressure_non_realtime_data_list = []
demand_non_realtime_data_list = []
quality_non_realtime_data_list = []
try_count = 0
while try_count < 5:
try:
try_count += 1
if globals.reservoir_liquid_level_realtime_ids:
reservoir_liquid_level_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.reservoir_liquid_level_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.tank_liquid_level_realtime_ids:
tank_liquid_level_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.tank_liquid_level_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.fixed_pump_realtime_ids:
fixed_pump_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.fixed_pump_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.variable_pump_realtime_ids:
variable_pump_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.variable_pump_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.source_outflow_realtime_ids:
source_outflow_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.source_outflow_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.pipe_flow_realtime_ids:
pipe_flow_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pipe_flow_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.pressure_realtime_ids:
pressure_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pressure_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.demand_realtime_ids:
demand_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.demand_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.quality_realtime_ids:
quality_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.quality_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
# reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
# ids=','.join(reservoir_liquid_level_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m')
if globals.reservoir_liquid_level_non_realtime_ids:
reservoir_liquid_level_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.reservoir_liquid_level_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.tank_liquid_level_non_realtime_ids:
tank_liquid_level_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.tank_liquid_level_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.fixed_pump_non_realtime_ids:
fixed_pump_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.fixed_pump_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.variable_pump_non_realtime_ids:
variable_pump_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.variable_pump_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.source_outflow_non_realtime_ids:
source_outflow_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.source_outflow_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.pipe_flow_non_realtime_ids:
pipe_flow_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pipe_flow_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
# print(pipe_flow_non_realtime_data_list)
if globals.pressure_non_realtime_ids:
pressure_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.pressure_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
# print(pressure_non_realtime_data_list)
if globals.demand_non_realtime_ids:
demand_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.demand_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
if globals.quality_non_realtime_ids:
quality_non_realtime_data_list = get_data.get_history_data(
ids=','.join(globals.quality_non_realtime_ids),
begin_date=begin_date, end_date=end_date,
downsample='1m')
except Exception as e:
print(f"Attempt {try_count} failed with error: {e}")
if try_count < 5:
print("Retrying in 10 seconds...")
time.sleep(10)
else:
print("Max retries reached. Exiting.")
else:
print("Data fetched successfully.")
break # 成功后退出循环
if reservoir_liquid_level_realtime_data_list:
for data in reservoir_liquid_level_realtime_data_list:
# 创建Point对象
point = (
Point('reservoir_liquid_level_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if tank_liquid_level_realtime_data_list:
for data in tank_liquid_level_realtime_data_list:
# 创建Point对象
point = (
Point('tank_liquid_level_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if fixed_pump_realtime_data_list:
for data in fixed_pump_realtime_data_list:
# 创建Point对象
point = (
Point('fixed_pump_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if variable_pump_realtime_data_list:
for data in variable_pump_realtime_data_list:
# 创建Point对象
point = (
Point('variable_pump_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if source_outflow_realtime_data_list:
for data in source_outflow_realtime_data_list:
# 创建Point对象
point = (
Point('source_outflow_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if pipe_flow_realtime_data_list:
for data in pipe_flow_realtime_data_list:
# 创建Point对象
point = (
Point('pipe_flow_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if pressure_realtime_data_list:
for data in pressure_realtime_data_list:
# 创建Point对象
point = (
Point('pressure_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if demand_realtime_data_list:
for data in demand_realtime_data_list:
# 创建Point对象
point = (
Point('demand_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if quality_realtime_data_list:
for data in quality_realtime_data_list:
# 创建Point对象
point = (
Point('quality_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if reservoir_liquid_level_non_realtime_data_list:
for data in reservoir_liquid_level_non_realtime_data_list:
# 创建Point对象
point = (
Point('reservoir_liquid_level_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if tank_liquid_level_non_realtime_data_list:
for data in tank_liquid_level_non_realtime_data_list:
# 创建Point对象
point = (
Point('tank_liquid_level_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if fixed_pump_non_realtime_data_list:
for data in fixed_pump_non_realtime_data_list:
# 创建Point对象
point = (
Point('fixed_pump_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if variable_pump_non_realtime_data_list:
for data in variable_pump_non_realtime_data_list:
# 创建Point对象
point = (
Point('variable_pump_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if source_outflow_non_realtime_data_list:
for data in source_outflow_non_realtime_data_list:
# 创建Point对象
point = (
Point('source_outflow_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if pipe_flow_non_realtime_data_list:
for data in pipe_flow_non_realtime_data_list:
# 创建Point对象
point = (
Point('pipe_flow_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if pressure_non_realtime_data_list:
for data in pressure_non_realtime_data_list:
# 创建Point对象
point = (
Point('pressure_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if demand_non_realtime_data_list:
for data in demand_non_realtime_data_list:
# 创建Point对象
point = (
Point('demand_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
if quality_non_realtime_data_list:
for data in quality_non_realtime_data_list:
# 创建Point对象
point = (
Point('quality_non_realtime')
.tag("date", data['time'].strftime('%Y-%m-%d'))
.tag("description", data['description'])
.tag("device_ID", data['device_ID'])
.field("monitored_value", data['monitored_value'])
.field("datacleaning_value", None)
.field("simulation_value", None)
.time(data['time'])
)
write_api.write(bucket=bucket, org=org_name, record=point)
def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]:
"""
根据SCADA设备的ID和时间查询值
:param query_ids_list: SCADA设备ID的列表
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
# DingZQ temp change delta to 5 from 1
utc_start_time = utc_time - timedelta(seconds=5)
utc_stop_time = utc_time + timedelta(seconds=5)
# 构建查询字典
SCADA_result_dict = {}
for device_id in query_ids_list:
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["device_ID"] == "{device_id}")
|> filter(fn: (r) => r["_field"] == "monitored_value")
'''
# 执行查询
try:
result = query_api.query(flux_query)
# 从查询结果中提取 monitored_value
if result:
# 假设返回的结果为一行数据
for table in result:
for record in table.records:
# 获取字段 "_value" 即为 monitored_value
monitored_value = record.get_value()
SCADA_result_dict[device_id] = monitored_value
else:
# 如果没有结果,默认设置为 None 或其他值
SCADA_result_dict[device_id] = None
except Exception as e:
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
SCADA_result_dict[device_id] = None
return SCADA_result_dict
# DingZQ, 2025-02-15
def query_SCADA_data_by_device_ID_and_time_range(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list[dict[str, float]]:
"""
根据SCADA设备的ID和时间查询值
:param query_ids_list: SCADA设备ID的列表
:param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
utc_start_time = time_api.parse_utc_time(start_time)
utc_end_time = time_api.parse_utc_time(end_time)
# 构建查询字典
SCADA_results = []
for device_id in query_ids_list:
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()})
|> filter(fn: (r) => r["device_ID"] == "{device_id}")
|> filter(fn: (r) => r["_field"] == "monitored_value")
|> sort(columns: ["_time"], desc: false)
'''
# 执行查询
try:
result = query_api.query(flux_query)
# 从查询结果中提取 monitored_value
if result:
# 假设返回的结果为一行数据
for table in result:
for record in table.records:
# 获取字段 "_value" 即为 monitored_value
monitored_value = record.get_value()
rec = {
"ID": device_id,
"time": record.get_time(),
"value": monitored_value
}
SCADA_results.append(rec)
except Exception as e:
print(f"Error querying InfluxDB for device ID {device_id}: {e}")
return SCADA_results
# DingZQ, 2025-02-15
def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list[dict[str, float]]:
"""
根据SCADA设备的ID和日期查询值
:param query_ids_list: SCADA设备ID的列表
:param query_date: 输入的日期,格式为 '2024-11-24', 日期是北京时间的日期
:param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例。
:return:
"""
start_time, end_time = time_api.parse_beijing_date_range(query_date)
start_time_str = time_api.format_beijing_time(start_time)
end_time_str = time_api.format_beijing_time(end_time)
print(f"start_time_str: {start_time_str}, end_time_str: {end_time_str}")
return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, start_time_str, end_time_str, bucket, client)
# 2025/02/01
def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
result_start_time: str,
bucket: str = "realtime_simulation_result", client: InfluxDBClient = client):
"""
将实时模拟计算结果数据存储到 InfluxDB 的realtime_simulation_result这个bucket中。
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
:param result_start_time: (str): 计算结果的模拟开始时间。
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "realtime_simulation_result"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- store_realtime_simulation_result_to_influxdb : Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- store_realtime_simulation_result_to_influxdb : Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 开始写入数据
try:
write_api = client.write_api()
date_str = result_start_time.split('T')[0]
time_beijing = datetime.strptime(result_start_time, '%Y-%m-%dT%H:%M:%S%z').isoformat()
for result in node_result_list:
# 提取节点信息和结果数据
node_id = result.get('node')
data_list = result.get('result', [])
for data in data_list:
# 构建 Point 数据,多个 field 存在于一个数据点中
node_point = Point("node") \
.tag("date", date_str) \
.tag("ID", node_id) \
.field("head", data.get('head', 0.0)) \
.field("pressure", data.get('pressure', 0.0)) \
.field("actualdemand", data.get('demand', 0.0)) \
.field("demanddeficit", None) \
.field("totalExternalOutflow", None) \
.field("quality", data.get('quality', 0.0)) \
.time(time_beijing)
# 写入数据到 InfluxDB多个 field 在同一个 point 中
write_api.write(bucket=bucket, org=org_name, record=node_point)
write_api.flush()
print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。")
for result in link_result_list:
link_id = result.get('link')
data_list = result.get('result', [])
for data in data_list:
link_point = Point("link") \
.tag("date", date_str) \
.tag("ID", link_id) \
.field("flow", data.get('flow', 0.0)) \
.field("velocity", data.get('velocity', 0.0)) \
.field("headloss", data.get('headloss', 0.0)) \
.field("quality", data.get('quality', 0.0)) \
.field("status", data.get('status', "UNKNOWN")) \
.field("setting", data.get('setting', 0.0)) \
.field("reaction", data.get('reaction', 0.0)) \
.field("friction", data.get('friction', 0.0)) \
.time(time_beijing)
write_api.write(bucket=bucket, org=org_name, record=link_point)
write_api.flush()
print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。")
except Exception as e:
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
# 2025/02/01
def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> dict:
"""
查询指定ID的最新的一条记录
:param ID: (str): 要查询的 ID。
:param type: (str): "node"或“link”或'scada'
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDB 客户端实例。
:return: dict: 最新记录的数据,如果没有找到则返回 None。
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
if type == "node":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -7d) // 查找最近七天的记录
|> filter(fn: (r) => r["_measurement"] == "node")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> group() // 将所有数据聚合到同一个 group
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"nodeID": ID,
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
# "demanddeficit": record["demanddeficit"],
# "totalExternalOutflow": record["totalExternalOutflow"],
"quality": record["quality"]
}
elif type == "link":
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: -7d) // 查找最近七天的记录
|> filter(fn: (r) => r["_measurement"] == "link")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> group() // 将所有数据聚合到同一个 group
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"linkID": ID,
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
}
elif type == "scada":
flux_query = f'''
from(bucket: "SCADA_data")
|> range(start: -30d) // 查找最近一月的记录
|> filter(fn: (r) => r["_measurement"] == "pressure_realtime")
|> filter(fn: (r) => r["device_ID"] == "{ID}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 1)
'''
tables = query_api.query(flux_query)
# 解析查询结果
for table in tables:
for record in table.records:
return {
"time": record["_time"],
"device_ID": ID,
"value": record["monitored_value"],
}
return None # 如果没有找到记录
# 2025/02/01
def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple:
"""
查询指定北京时间的所有记录,包括 'node''link' measurement分别以指定格式返回。
:param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
'''
# 执行查询
tables = query_api.query(flux_query)
node_records = []
link_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
measurement = record["_measurement"]
# 处理 node 数据
if measurement == "node":
node_records.append({
"time": record["_time"],
"ID": record["ID"],
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
"quality": record["quality"]
})
# 处理 link 数据
elif measurement == "link":
link_records.append({
"time": record["_time"],
"linkID": record["ID"],
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
})
return node_records, link_records
# 2025/03/03 WMH
def query_all_record_by_time_property(query_time: str, type: str, property: str, bucket: str="realtime_simulation_result",
client: InfluxDBClient=client) -> list:
"""
查询指定北京时间的所有记录,查询 'node''link' 的某一属性值,以指定格式返回。
:param query_time: (str): 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param type: (str): 查询的类型(决定 measurement
:param property: (str): 查询的字段名称field
:param bucket: (str): 数据存储的 bucket 名称。
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return: list(dict): result_records
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
result_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
result_records.append({
"ID": record["ID"],
"value": record["_value"]
})
return result_records
# 2025/02/21
def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple:
"""
查询指定日期的所有记录包括nodelink分别以指定的格式返回
:param query_date: 输入的日期格式为2025-02-14
:param bucket: 数据存储的bucket名称
:param client: 已初始化的InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
print(start_time)
stop_time = (datetime.strptime(query_date, "%Y-%m-%d")).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
print(stop_time)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|> filter(fn: (r) => r["date"] == "{query_date}")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
'''
# 执行查询
tables = query_api.query(flux_query)
node_records = []
link_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
measurement = record["_measurement"]
# 处理 node 数据
if measurement == "node":
node_records.append({
"time": record["_time"],
"ID": record["ID"],
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
"quality": record["quality"]
})
# 处理 link 数据
elif measurement == "link":
link_records.append({
"time": record["_time"],
"linkID": record["ID"],
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
})
return node_records, link_records
# 2025/02/21 WMH
def query_all_record_by_date_property(query_date: str, type: str, property: str,
bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list:
"""
查询指定日期的nodelink的某一属性值的所有记录以指定的格式返回
:param query_date: 输入的日期格式为2025-02-14
:param type: (str): 查询的类型(决定 measurement
:param property: (str): 查询的字段名称field
:param bucket: 数据存储的bucket名称
:param client: 已初始化的InfluxDBClient 实例。
:return: list(dict): result_records
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 将 start_date 的北京时间转换为 UTC 时间
# start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
print(start_time)
stop_time = (datetime.strptime(query_date, "%Y-%m-%d")).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
print(stop_time)
print(f"now time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 构建 Flux 查询语句
print("before query influxdb")
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) =>
r._measurement == "{measurement}" and
r._field == "{property}"
)
|> group(columns: ["date"])
|> filter(fn: (r) => r.date == "{query_date}")
|> keep(columns: ["_time", "_value", "date"])
'''
# 执行查询
tables = query_api.query(flux_query)
print("after query influxdb")
print(f"now time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
result_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
result_records.append({
"time": record["_time"],
"ID": record["ID"],
"value": record["_value"]
})
return result_records
# 2025/02/01
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list:
"""
根据 type 查询对应的 measurement根据 ID 和 date 查询对应的 tag根据 property 查询对应的 field。
:param ID: (str): 要查询的 IDtag
:param type: (str): 查询的类型(决定 measurement
:param property: (str): 查询的字段名称field
:param start_date: (str): 查询的开始日期,格式为 'YYYY-MM-DD'
:param end_date: (str): 查询的结束日期,格式为 'YYYY-MM-DD'
:param bucket: (str): 数据存储的 bucket 名称,默认值为 "realtime_simulation_result"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例
:return: 查询结果的列表
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 解析日期范围(当天的 UTC 开始和结束时间)
# previous_day = datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)
# start_time = previous_day.isoformat() + "T16:00:00Z"
# stop_time = datetime.strptime(end_date, "%Y-%m-%d").isoformat() + "T15:59:59Z"
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
# 解析查询结果
results = []
for table in tables:
for record in table.records:
results.append({
"time": record["_time"],
"value": record["_value"]
})
return results
def query_buckets(client: InfluxDBClient=client) -> list[str]:
# 获取 Buckets API 实例
buckets_api = client.buckets_api()
# 查询所有 Buckets
buckets = buckets_api.find_buckets().buckets
print("All Buckets:")
buckets_list = []
for bucket in buckets:
buckets_list.append(bucket.name)
return buckets_list
def query_measurements(bucket: str, client: InfluxDBClient=client) -> list[str]:
query = f'''
import "influxdata/influxdb/schema"
schema.measurements(bucket: "{bucket}")
'''
result = client.query_api().query(query)
# 提取测量名称
measurements = [row.values["_value"] for table in result for row in table.records]
return measurements
# 2025/02/13
def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
scheme_start_time: str, num_periods: int = 1, scheme_Type: str = None, scheme_Name: str = None,
bucket: str = "scheme_simulation_result", client: InfluxDBClient = client):
"""
将方案模拟计算结果存入 InfluxuDb 的scheme_simulation_result这个bucket中。
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
:param scheme_start_time: (str): 方案模拟开始时间。
:param num_periods: (int): 方案模拟的周期数
:param scheme_Type: (str): 方案类型
:param scheme_Name: (str): 方案名称
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
try:
write_api = client.write_api()
date_str = scheme_start_time.split('T')[0]
time_beijing = datetime.strptime(scheme_start_time, '%Y-%m-%dT%H:%M:%S%z')
timestep_parts = globals.hydraulic_timestep.split(':')
timestep = timedelta(hours=int(timestep_parts[0]), minutes=int(timestep_parts[1]), seconds=int(timestep_parts[2]))
for node_result in node_result_list:
# 提取节点信息和数据结果
node_id = node_result.get('node')
# 从period 0 到 period num_period - 1
for period_index in range(num_periods):
scheme_time = (time_beijing + (timestep * period_index)).isoformat()
data_list = [node_result.get('result', [])[period_index]]
for data in data_list:
# 构建 Point 数据,多个 field 存在于一个数据点中
node_point = Point("node") \
.tag("date", date_str) \
.tag("ID", node_id) \
.tag("scheme_Type", scheme_Type) \
.tag("scheme_Name", scheme_Name) \
.field("head", data.get('head', 0.0)) \
.field("pressure", data.get('pressure', 0.0)) \
.field("actualdemand", data.get('demand', 0.0)) \
.field("demanddeficit", None) \
.field("totalExternalOutflow", None) \
.field("quality", data.get('quality', 0.0)) \
.time(scheme_time)
# 写入数据到 InfluxDB多个 field 在同一个 point 中
write_api.write(bucket=bucket, org=org_name, record=node_point)
write_api.flush()
for link_result in link_result_list:
link_id = link_result.get('link')
for period_index in range(num_periods):
scheme_time = (time_beijing + (timestep * period_index)).isoformat()
data_list = [link_result.get('result', [])[period_index]]
for data in data_list:
link_point = Point("link") \
.tag("date", date_str) \
.tag("ID", link_id) \
.tag("scheme_Type", scheme_Type) \
.tag("scheme_Name", scheme_Name) \
.field("flow", data.get('flow', 0.0)) \
.field("velocity", data.get('velocity', 0.0)) \
.field("headloss", data.get('headloss', 0.0)) \
.field("quality", data.get('quality', 0.0)) \
.field("status", data.get('status', "UNKNOWN")) \
.field("setting", data.get('setting', 0.0)) \
.field("reaction", data.get('reaction', 0.0)) \
.field("friction", data.get('friction', 0.0)) \
.time(scheme_time)
write_api.write(bucket=bucket, org=org_name, record=link_point)
write_api.flush()
except Exception as e:
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
# 2025/02/15
def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list:
"""
根据SCADA设备的api_query_id和时间范围查询得到曲线查到的数据为0时区时间
:param api_query_id: SCADA设备的api_query_id
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例
:return: 查询结果的列表
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["device_ID"] == "{api_query_id}")
'''
# 执行查询
tables = query_api.query(flux_query)
# 解析查询结果
results = []
for table in tables:
for record in table.records:
results.append({
"time": record["_time"],
"value": record["_value"]
})
return results
# 2025/02/18
def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_time: str,
bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> tuple:
"""
查询指定方案某一时刻的所有记录包括node'link分别以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param bucket: 数据存储的 bucket 名称。
:param client: 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
'''
# 执行查询
tables = query_api.query(flux_query)
node_records = []
link_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
measurement = record["_measurement"]
# 处理 node 数据
if measurement == "node":
node_records.append({
"time": record["_time"],
"ID": record["ID"],
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
"quality": record["quality"]
})
# 处理 link 数据
elif measurement == "link":
link_records.append({
"time": record["_time"],
"linkID": record["ID"],
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
})
return node_records, link_records
# 2025/03/04 WMH
def query_scheme_all_record_by_time_property(scheme_Type: str, scheme_Name: str, query_time: str, type: str, property: str,
bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list:
"""
查询指定方案某一时刻node'link某一属性值以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'
:param type: 查询的类型(决定 measurement
:param property: 查询的字段名称field
:param bucket: 数据存储的 bucket 名称。
:param client: 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 将北京时间转换为 UTC 时间
beijing_time = datetime.fromisoformat(query_time)
utc_time = beijing_time.astimezone(timezone.utc)
utc_start_time = utc_time - timedelta(seconds=1)
utc_stop_time = utc_time + timedelta(seconds=1)
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
result_records = []
# 解析查询结果
for table in tables:
for record in table.records:
result_records.append({
"ID": record["ID"],
"value": record["_value"]
})
return result_records
# 2025/02/19
def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: str, type: str, property: str,
bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list:
"""
根据scheme_Type和scheme_Name,查询该模拟方案中某一node或link的某一属性值的所有时间的结果
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param ID: 元素的ID
:param type: 元素的类型node或link
:param property: 元素的属性值
:param bucket: 数据存储的 bucket 名称,默认值为 "scheme_simulation_result"
:param client: 已初始化的 InfluxDBClient 实例
:return: 查询结果的列表
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: 2025-01-01T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["ID"] == "{ID}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
# 解析查询结果
results = []
for table in tables:
for record in table.records:
results.append({
"time": record["_time"],
"value": record["_value"]
})
return results
# 2025/02/21
def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result",
client: InfluxDBClient=client) -> tuple:
"""
查询指定方案的所有记录包括node'link分别以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param bucket: 数据存储的 bucket 名称。
:param client: 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: 2025-01-01T00:00:00Z)
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|> pivot(
rowKey:["_time"],
columnKey:["_field"],
valueColumn:"_value"
)
'''
# 执行查询
tables = query_api.query(flux_query)
node_records = []
link_records = []
# 解析查询结果
for table in tables:
for record in table.records:
# print(record.values) # 打印完整记录内容
measurement = record["_measurement"]
# 处理 node 数据
if measurement == "node":
node_records.append({
"time": record["_time"],
"ID": record["ID"],
"head": record["head"],
"pressure": record["pressure"],
"actualdemand": record["actualdemand"],
"quality": record["quality"]
})
# 处理 link 数据
elif measurement == "link":
link_records.append({
"time": record["_time"],
"linkID": record["ID"],
"flow": record["flow"],
"velocity": record["velocity"],
"headloss": record["headloss"],
"quality": record["quality"],
"status": record["status"],
"setting": record["setting"],
"reaction": record["reaction"],
"friction": record["friction"]
})
return node_records, link_records
# 2025/03/04 WMH
def query_scheme_all_record_property(scheme_Type: str, scheme_Name: str, type: str, property: str,
bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list:
"""
查询指定方案的node'link的某一属性值以指定格式返回。
:param scheme_Type: 方案类型
:param scheme_Name: 方案名称
:param type: 查询的类型(决定 measurement
:param property: 查询的字段名称field
:param bucket: 数据存储的 bucket 名称。
:param client: 已初始化的 InfluxDBClient 实例。
:return: dict: tuple: (node_records, link_records)
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 确定 measurement
if type == "node":
measurement = "node"
elif type == "link":
measurement = "link"
else:
raise ValueError(f"不支持的类型: {type}")
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: 2025-01-01T00:00:00Z)
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> filter(fn: (r) => r["_field"] == "{property}")
'''
# 执行查询
tables = query_api.query(flux_query)
result_records = []
# 解析查询结果
for table in tables:
for record in table.records:
result_records.append({
"time": record["_time"],
"ID": record["ID"],
"value": record["_value"]
})
return result_records
# 2025/02/16
def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> None:
"""
导出influxdb中SCADA_data这个bucket的数据到csv中
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句
flux_query = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
'''
# 执行查询
tables = query_api.query(flux_query)
# 存储查询结果
rows = []
for table in tables:
for record in table.records:
row = {
'time': record.get_time(),
'measurement': record.get_measurement(),
'date': record.values.get('date', None),
'description': record.values.get('description', None),
'device_ID': record.values.get('device_ID', None),
'monitored_value': record.get_value() if record.get_field() == 'monitored_value' else None,
'datacleaning_value': record.get_value() if record.get_field() == 'datacleaning_value' else None,
'simulation_value': record.get_value() if record.get_field() == 'simulation_value' else None,
}
rows.append(row)
# 动态生成 CSV 文件名
csv_filename = f"SCADA_data_{start_date}{end_date}.csv"
# 写入到 CSV 文件
with open(csv_filename, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'description', 'device_ID', 'monitored_value', 'datacleaning_value', 'simulation_value'])
writer.writeheader()
writer.writerows(rows)
print(f"Data exported to {csv_filename} successfully.")
# 2025/02/17
def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> None:
"""
导出influxdb中realtime_simulation_result这个bucket的数据到csv中
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_link = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "link")
'''
# 执行查询
link_tables = query_api.query(flux_query_link)
# 存储link类的数据
link_rows = []
link_data = {}
for table in link_tables:
for record in table.records:
key = (record.get_time(), record.values.get('ID', None))
if key not in link_data:
link_data[key] = {}
field = record.get_field()
link_data[key][field] = record.get_value()
link_data[key]['measurement'] = record.get_measurement()
link_data[key]['date'] = record.values.get('date', None)
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_node = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "node")
'''
# 执行查询
node_tables = query_api.query(flux_query_node)
# 存储node类的数据
node_rows = []
node_data = {}
for table in node_tables:
for record in table.records:
key = (record.get_time(), record.values.get('ID', None))
if key not in node_data:
node_data[key] = {}
field = record.get_field()
node_data[key][field] = record.get_value()
node_data[key]['measurement'] = record.get_measurement()
node_data[key]['date'] = record.values.get('date', None)
for key in set(link_data.keys()):
row = {'time': key[0], "ID": key[1]}
row.update(link_data.get(key, {}))
link_rows.append(row)
for key in set(node_data.keys()):
row = {'time': key[0], "ID": key[1]}
row.update(node_data.get(key, {}))
node_rows.append(row)
# 动态生成 CSV 文件名
csv_filename_link = f"realtime_simulation_link_result_{start_date}{end_date}.csv"
csv_filename_node = f"realtime_simulation_node_result_{start_date}{end_date}.csv"
# 写入到 CSV 文件
with open(csv_filename_link, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction'])
writer.writeheader()
writer.writerows(link_rows)
with open(csv_filename_node, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'head', 'pressure', 'actualdemand',
'demanddeficit', 'totalExternalOutflow', 'quality'])
writer.writeheader()
writer.writerows(node_rows)
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
# 2025/02/18
def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None:
"""
导出influxdb中scheme_simulation_result这个bucket的数据到csv中
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 将 start_date 的北京时间转换为 UTC 时间范围
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_link = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "link")
'''
# 执行查询
link_tables = query_api.query(flux_query_link)
# 存储link类的数据
link_rows = []
link_data = {}
for table in link_tables:
for record in table.records:
key = (record.get_time(), record.values.get('ID', None))
if key not in link_data:
link_data[key] = {}
field = record.get_field()
link_data[key][field] = record.get_value()
link_data[key]['measurement'] = record.get_measurement()
link_data[key]['date'] = record.values.get('date', None)
link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_node = f'''
from(bucket: "{bucket}")
|> range(start: {start_time}, stop: {stop_time})
|> filter(fn: (r) => r["_measurement"] == "node")
'''
# 执行查询
node_tables = query_api.query(flux_query_node)
# 存储node类的数据
node_rows = []
node_data = {}
for table in node_tables:
for record in table.records:
key = (record.get_time(), record.values.get('ID', None))
if key not in node_data:
node_data[key] = {}
field = record.get_field()
node_data[key][field] = record.get_value()
node_data[key]['measurement'] = record.get_measurement()
node_data[key]['date'] = record.values.get('date', None)
node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
for key in set(link_data.keys()):
row = {'time': key[0], "ID": key[1]}
row.update(link_data.get(key, {}))
link_rows.append(row)
for key in set(node_data.keys()):
row = {'time': key[0], "ID": key[1]}
row.update(node_data.get(key, {}))
node_rows.append(row)
# 动态生成 CSV 文件名
csv_filename_link = f"scheme_simulation_link_result_{start_date}{end_date}.csv"
csv_filename_node = f"scheme_simulation_node_result_{start_date}{end_date}.csv"
# 写入到 CSV 文件
with open(csv_filename_link, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction'])
writer.writeheader()
writer.writerows(link_rows)
with open(csv_filename_node, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand',
'demanddeficit', 'totalExternalOutflow', 'quality'])
writer.writeheader()
writer.writerows(node_rows)
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
# 2025/02/18
def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None:
"""
导出influxdb中scheme_simulation_result这个bucket的数据到csv中
:param scheme_Type: 查询的方案类型
:param scheme_Name: 查询的方案名
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
:param client: 已初始化的 InfluxDBClient 实例
:return:
"""
if client.ping():
print("{} -- Successfully connected to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
query_api = client.query_api()
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_link = f'''
from(bucket: "{bucket}")
|> range(start: 2025-01-01T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "link")
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
'''
# 执行查询
link_tables = query_api.query(flux_query_link)
# 存储link类的数据
link_rows = []
link_data = {}
for table in link_tables:
for record in table.records:
key = (record.get_time(), record.values.get('ID', None))
if key not in link_data:
link_data[key] = {}
field = record.get_field()
link_data[key][field] = record.get_value()
link_data[key]['measurement'] = record.get_measurement()
link_data[key]['date'] = record.values.get('date', None)
link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
# 构建 Flux 查询语句,查询指定时间范围内的数据
flux_query_node = f'''
from(bucket: "{bucket}")
|> range(start: 2025-01-01T00:00:00Z)
|> filter(fn: (r) => r["_measurement"] == "node")
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
'''
# 执行查询
node_tables = query_api.query(flux_query_node)
# 存储node类的数据
node_rows = []
node_data = {}
for table in node_tables:
for record in table.records:
key = (record.get_time(), record.values.get('ID', None))
if key not in node_data:
node_data[key] = {}
field = record.get_field()
node_data[key][field] = record.get_value()
node_data[key]['measurement'] = record.get_measurement()
node_data[key]['date'] = record.values.get('date', None)
node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
for key in set(link_data.keys()):
row = {'time': key[0], "ID": key[1]}
row.update(link_data.get(key, {}))
link_rows.append(row)
for key in set(node_data.keys()):
row = {'time': key[0], "ID": key[1]}
row.update(node_data.get(key, {}))
node_rows.append(row)
# 动态生成 CSV 文件名
csv_filename_link = f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv"
csv_filename_node = f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv"
# 写入到 CSV 文件
with open(csv_filename_link, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction'])
writer.writeheader()
writer.writerows(link_rows)
with open(csv_filename_node, mode='w', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand',
'demanddeficit', 'totalExternalOutflow', 'quality'])
writer.writeheader()
writer.writerows(node_rows)
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
# 示例调用
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step1: 检查连接状态初始化influxdb的buckets
# try:
# # delete_buckets(client, org_name)
# create_and_initialize_buckets(client, org_name)
# except Exception as e:
# print(f"连接失败: {e}")
# finally:
# client.close()
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
query_pg_scada_info_realtime('bb')
query_pg_scada_info_non_realtime('bb')
# 手动执行存储测试
# 示例1store_realtime_SCADA_data_to_influxdb
# store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-25T23:45:00+08:00')
# 示例2store_non_realtime_SCADA_data_to_influxdb
# store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00')
# 示例3download_history_data_manually
# download_history_data_manually(begin_time='2025-02-25T00:00:00+08:00', end_time='2025-02-26T00:00:00+08:00')
# step3: 查询测试示例
# with InfluxDBClient(url=url, token=token, org=org_name) as client:
# # 示例1query_latest_record_by_ID
# bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称
# node_id = "ZBBDTZDP000022" # 查询的节点 ID
# link_id = "ZBBGXSZW000002"
#
# latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
# # # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
# #
# if latest_record:
# print("最新记录:", latest_record)
# else:
# print("未找到符合条件的记录。")
# 示例2query_all_record_by_time
# node_records, link_records = query_all_record_by_time(query_time="2025-02-14T10:30:00+08:00")
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
# 示例3query_curve_by_ID_property_daterange
# curve_result = query_curve_by_ID_property_daterange(ID=node_id, type="node", property="head",
# start_date="2024-11-25", end_date="2024-11-25")
# print(curve_result)
# 示例4query_SCADA_data_by_device_ID_and_time
# SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-25T23:45:00+08:00')
# print(SCADA_result_dict)
# 示例5query_SCADA_data_curve
# SCADA_result = query_SCADA_data_curve(api_query_id='3853', start_date='2025-02-14', end_date='2025-02-16')
# print(SCADA_result)
# 示例6export_SCADA_data_to_csv
# export_SCADA_data_to_csv(start_date='2025-02-13', end_date='2025-02-15')
# 示例7export_realtime_simulation_result_to_csv
# export_realtime_simulation_result_to_csv(start_date='2025-02-13', end_date='2025-02-15')
# 示例8export_scheme_simulation_result_to_csv_time
# export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15')
# 示例9export_scheme_simulation_result_to_csv_scheme
# export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1')
# 示例10query_scheme_all_record_by_time
# node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00")
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
# 示例11query_scheme_curve_by_ID_property
# curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', ID='ZBBDTZDP000022',
# type='node', property='head')
# print(curve_result)
# 示例12query_all_record_by_date
# node_records, link_records = query_all_record_by_date(query_date='2025-02-14')
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
# 示例13query_scheme_all_record
# node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1')
# print("Node 数据:", node_records)
# print("Link 数据:", link_records)
# 示例14query_all_record_by_time_property
# result_records = query_all_record_by_time_property(query_time='2025-02-25T23:45:00+08:00', type='node', property='head')
# print(result_records)
# 示例15query_all_record_by_date_property
# result_records = query_all_record_by_date_property(query_date='2025-02-27', type='node', property='head')
# print(result_records)
# 示例16query_scheme_all_record_by_time_property
# result_records = query_scheme_all_record_by_time_property(scheme_Type='burst_Analysis', scheme_Name='scheme1',
# query_time='2025-02-14T10:30:00+08:00', type='node', property='head')
# print(result_records)
# 示例17query_scheme_all_record_property
# result_records = query_scheme_all_record_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', type='node', property='head')
# print(result_records)