Add influxdb_api
This commit is contained in:
803
influxdb_api.py
803
influxdb_api.py
@@ -12,6 +12,9 @@ from tjnetwork import *
|
|||||||
import schedule
|
import schedule
|
||||||
import threading
|
import threading
|
||||||
import globals
|
import globals
|
||||||
|
import csv
|
||||||
|
import pandas as pd
|
||||||
|
import openpyxl
|
||||||
import influxdb_info
|
import influxdb_info
|
||||||
import time_api
|
import time_api
|
||||||
|
|
||||||
@@ -139,6 +142,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None:
|
|||||||
open_project(name)
|
open_project(name)
|
||||||
dic_time = get_time(name)
|
dic_time = get_time(name)
|
||||||
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
|
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
|
||||||
|
|
||||||
close_project(name)
|
close_project(name)
|
||||||
# 连接数据库
|
# 连接数据库
|
||||||
conn_string = f"dbname={name} host=127.0.0.1"
|
conn_string = f"dbname={name} host=127.0.0.1"
|
||||||
@@ -205,7 +209,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None:
|
|||||||
# print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids)
|
# print("Demand Non-Realtime IDs:", globals.demand_non_realtime_ids)
|
||||||
# print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids)
|
# print("Quality Non-Realtime IDs:", globals.quality_non_realtime_ids)
|
||||||
# print("Maximum Transmission Frequency:", globals.transmission_frequency)
|
# print("Maximum Transmission Frequency:", globals.transmission_frequency)
|
||||||
# print("Hydraulic Timestep:", globals.hydraulic_timestep)
|
print("Hydraulic Timestep:", globals.hydraulic_timestep)
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -220,17 +224,23 @@ def delete_buckets(client: InfluxDBClient, org_name: str) -> None:
|
|||||||
:param org_name: InfluxDB中organization的名称。
|
:param org_name: InfluxDB中organization的名称。
|
||||||
:return: None
|
:return: None
|
||||||
"""
|
"""
|
||||||
|
# 定义需要删除的 bucket 名称列表
|
||||||
|
buckets_to_delete = ['SCADA_data', 'realtime_simulation_result', 'scheme_simulation_result']
|
||||||
|
|
||||||
buckets_api = client.buckets_api()
|
buckets_api = client.buckets_api()
|
||||||
buckets_obj = buckets_api.find_buckets(org=org_name)
|
buckets_obj = buckets_api.find_buckets(org=org_name)
|
||||||
|
|
||||||
# 确保 buckets_obj 拥有 buckets 属性
|
# 确保 buckets_obj 拥有 buckets 属性
|
||||||
if hasattr(buckets_obj, 'buckets'):
|
if hasattr(buckets_obj, 'buckets'):
|
||||||
for bucket in buckets_obj.buckets:
|
for bucket in buckets_obj.buckets:
|
||||||
|
if bucket.name in buckets_to_delete: # 只删除特定名称的 bucket
|
||||||
try:
|
try:
|
||||||
buckets_api.delete_bucket(bucket)
|
buckets_api.delete_bucket(bucket)
|
||||||
print(f"Bucket {bucket.name} has been deleted successfully.")
|
print(f"Bucket {bucket.name} has been deleted successfully.")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Failed to delete bucket {bucket.name}: {e}")
|
print(f"Failed to delete bucket {bucket.name}: {e}")
|
||||||
|
else:
|
||||||
|
print(f"Skipping bucket {bucket.name}. Not in the deletion list.")
|
||||||
else:
|
else:
|
||||||
print("未找到 buckets 属性,无法迭代 buckets。")
|
print("未找到 buckets 属性,无法迭代 buckets。")
|
||||||
|
|
||||||
@@ -298,6 +308,8 @@ def create_and_initialize_buckets(client: InfluxDBClient, org_name: str) -> None
|
|||||||
.field("status", None) \
|
.field("status", None) \
|
||||||
.field("setting", 0.0) \
|
.field("setting", 0.0) \
|
||||||
.field("quality", 0.0) \
|
.field("quality", 0.0) \
|
||||||
|
.field("reaction", 0.0) \
|
||||||
|
.field("friction", 0.0) \
|
||||||
.time("2024-11-21T00:00:00Z")
|
.time("2024-11-21T00:00:00Z")
|
||||||
|
|
||||||
node_point = Point("node") \
|
node_point = Point("node") \
|
||||||
@@ -380,7 +392,8 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
|
|||||||
try:
|
try:
|
||||||
try_count += 1
|
try_count += 1
|
||||||
if globals.reservoir_liquid_level_realtime_ids:
|
if globals.reservoir_liquid_level_realtime_ids:
|
||||||
reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.reservoir_liquid_level_realtime_ids))
|
reservoir_liquid_level_realtime_data_list = get_realValue.get_realValue(
|
||||||
|
ids=','.join(globals.reservoir_liquid_level_realtime_ids))
|
||||||
if globals.tank_liquid_level_realtime_ids:
|
if globals.tank_liquid_level_realtime_ids:
|
||||||
tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids))
|
tank_liquid_level_realtime_data_list = get_realValue.get_realValue(ids=','.join(globals.tank_liquid_level_realtime_ids))
|
||||||
if globals.fixed_pump_realtime_ids:
|
if globals.fixed_pump_realtime_ids:
|
||||||
@@ -424,7 +437,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
|
|||||||
|
|
||||||
# 创建Point对象
|
# 创建Point对象
|
||||||
point = (
|
point = (
|
||||||
Point('reservoir_liquid_level_realtime') # measurement name
|
Point('reservoir_liquid_level_realtime')
|
||||||
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
.tag("date", datetime.fromisoformat(get_real_value_time).strftime('%Y-%m-%d'))
|
||||||
.tag("description", data['description'])
|
.tag("description", data['description'])
|
||||||
.tag("device_ID", data['device_ID'])
|
.tag("device_ID", data['device_ID'])
|
||||||
@@ -1158,15 +1171,16 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati
|
|||||||
:return: dict: 最新记录的数据,如果没有找到则返回 None。
|
:return: dict: 最新记录的数据,如果没有找到则返回 None。
|
||||||
"""
|
"""
|
||||||
if client.ping():
|
if client.ping():
|
||||||
print("{} -- Successfully connected to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
else:
|
else:
|
||||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
query_api = client.query_api()
|
query_api = client.query_api()
|
||||||
if type == "node":
|
if type == "node":
|
||||||
flux_query = f'''
|
flux_query = f'''
|
||||||
from(bucket: "{bucket}")
|
from(bucket: "{bucket}")
|
||||||
|> range(start: -30d) // 查找最近一月的记录
|
|> range(start: -7d) // 查找最近七天的记录
|
||||||
|> filter(fn: (r) => r["_measurement"] == "node")
|
|> filter(fn: (r) => r["_measurement"] == "node")
|
||||||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||||||
|> pivot(
|
|> pivot(
|
||||||
@@ -1174,6 +1188,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati
|
|||||||
columnKey:["_field"],
|
columnKey:["_field"],
|
||||||
valueColumn:"_value"
|
valueColumn:"_value"
|
||||||
)
|
)
|
||||||
|
|> group() // 将所有数据聚合到同一个 group
|
||||||
|> sort(columns: ["_time"], desc: true)
|
|> sort(columns: ["_time"], desc: true)
|
||||||
|> limit(n: 1)
|
|> limit(n: 1)
|
||||||
'''
|
'''
|
||||||
@@ -1195,7 +1210,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati
|
|||||||
elif type == "link":
|
elif type == "link":
|
||||||
flux_query = f'''
|
flux_query = f'''
|
||||||
from(bucket: "{bucket}")
|
from(bucket: "{bucket}")
|
||||||
|> range(start: -30d) // 查找最近一月的记录
|
|> range(start: -7d) // 查找最近七天的记录
|
||||||
|> filter(fn: (r) => r["_measurement"] == "link")
|
|> filter(fn: (r) => r["_measurement"] == "link")
|
||||||
|> filter(fn: (r) => r["ID"] == "{ID}")
|
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||||||
|> pivot(
|
|> pivot(
|
||||||
@@ -1203,6 +1218,7 @@ def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_simulati
|
|||||||
columnKey:["_field"],
|
columnKey:["_field"],
|
||||||
valueColumn:"_value"
|
valueColumn:"_value"
|
||||||
)
|
)
|
||||||
|
|> group() // 将所有数据聚合到同一个 group
|
||||||
|> sort(columns: ["_time"], desc: true)
|
|> sort(columns: ["_time"], desc: true)
|
||||||
|> limit(n: 1)
|
|> limit(n: 1)
|
||||||
'''
|
'''
|
||||||
@@ -1268,10 +1284,8 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r
|
|||||||
# 将北京时间转换为 UTC 时间
|
# 将北京时间转换为 UTC 时间
|
||||||
beijing_time = datetime.fromisoformat(query_time)
|
beijing_time = datetime.fromisoformat(query_time)
|
||||||
utc_time = beijing_time.astimezone(timezone.utc)
|
utc_time = beijing_time.astimezone(timezone.utc)
|
||||||
# DingZQ, 2025-02-10, set delta time to 15, original is 1
|
utc_start_time = utc_time - timedelta(seconds=1)
|
||||||
utc_start_time = utc_time - timedelta(seconds=15)
|
utc_stop_time = utc_time + timedelta(seconds=1)
|
||||||
utc_stop_time = utc_time + timedelta(seconds=15)
|
|
||||||
|
|
||||||
# 构建 Flux 查询语句
|
# 构建 Flux 查询语句
|
||||||
flux_query = f'''
|
flux_query = f'''
|
||||||
from(bucket: "{bucket}")
|
from(bucket: "{bucket}")
|
||||||
@@ -1283,13 +1297,11 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r
|
|||||||
valueColumn:"_value"
|
valueColumn:"_value"
|
||||||
)
|
)
|
||||||
'''
|
'''
|
||||||
|
|
||||||
# 执行查询
|
# 执行查询
|
||||||
tables = query_api.query(flux_query)
|
tables = query_api.query(flux_query)
|
||||||
node_records = []
|
node_records = []
|
||||||
link_records = []
|
link_records = []
|
||||||
# 解析查询结果
|
# 解析查询结果
|
||||||
|
|
||||||
for table in tables:
|
for table in tables:
|
||||||
for record in table.records:
|
for record in table.records:
|
||||||
# print(record.values) # 打印完整记录内容
|
# print(record.values) # 打印完整记录内容
|
||||||
@@ -1322,6 +1334,69 @@ def query_all_record_by_time(query_time: str, bucket: str="realtime_simulation_r
|
|||||||
|
|
||||||
return node_records, link_records
|
return node_records, link_records
|
||||||
|
|
||||||
|
# 2025/02/21
|
||||||
|
def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> tuple:
|
||||||
|
"""
|
||||||
|
查询指定日期的所有记录,包括‘node’和‘link’,分别以指定的格式返回
|
||||||
|
:param query_date: 输入的日期,格式为‘2025-02-14’
|
||||||
|
:param bucket: 数据存储的bucket名称
|
||||||
|
:param client: 已初始化的InfluxDBClient 实例。
|
||||||
|
:return: dict: tuple: (node_records, link_records)
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 构建 Flux 查询语句
|
||||||
|
flux_query = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: 2025-01-01T00:00:00Z)
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|
||||||
|
|> filter(fn: (r) => r["date"] == "{query_date}")
|
||||||
|
|> pivot(
|
||||||
|
rowKey:["_time"],
|
||||||
|
columnKey:["_field"],
|
||||||
|
valueColumn:"_value"
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
node_records = []
|
||||||
|
link_records = []
|
||||||
|
# 解析查询结果
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
# print(record.values) # 打印完整记录内容
|
||||||
|
measurement = record["_measurement"]
|
||||||
|
# 处理 node 数据
|
||||||
|
if measurement == "node":
|
||||||
|
node_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"ID": record["ID"],
|
||||||
|
"head": record["head"],
|
||||||
|
"pressure": record["pressure"],
|
||||||
|
"actualdemand": record["actualdemand"],
|
||||||
|
"quality": record["quality"]
|
||||||
|
})
|
||||||
|
# 处理 link 数据
|
||||||
|
elif measurement == "link":
|
||||||
|
link_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"linkID": record["ID"],
|
||||||
|
"flow": record["flow"],
|
||||||
|
"velocity": record["velocity"],
|
||||||
|
"headloss": record["headloss"],
|
||||||
|
"quality": record["quality"],
|
||||||
|
"status": record["status"],
|
||||||
|
"setting": record["setting"],
|
||||||
|
"reaction": record["reaction"],
|
||||||
|
"friction": record["friction"]
|
||||||
|
})
|
||||||
|
|
||||||
|
return node_records, link_records
|
||||||
|
|
||||||
# 2025/02/01
|
# 2025/02/01
|
||||||
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list:
|
def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list:
|
||||||
@@ -1383,7 +1458,6 @@ def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, star
|
|||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
|
||||||
def query_buckets(client: InfluxDBClient=client) -> list[str]:
|
def query_buckets(client: InfluxDBClient=client) -> list[str]:
|
||||||
# 获取 Buckets API 实例
|
# 获取 Buckets API 实例
|
||||||
buckets_api = client.buckets_api()
|
buckets_api = client.buckets_api()
|
||||||
@@ -1411,6 +1485,642 @@ def query_measurements(bucket: str, client: InfluxDBClient=client) -> list[str]:
|
|||||||
return measurements
|
return measurements
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# WMH 2025/02/13
|
||||||
|
def store_scheme_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]],
|
||||||
|
scheme_start_time: str, num_periods: int = 1, scheme_Type: str = None, scheme_Name: str = None,
|
||||||
|
bucket: str = "scheme_simulation_result", client: InfluxDBClient = client):
|
||||||
|
"""
|
||||||
|
将方案模拟计算结果存入 InfluxuDb 的scheme_simulation_result这个bucket中。
|
||||||
|
:param node_result_list: (List[Dict[str, any]]): 包含节点和结果数据的字典列表。
|
||||||
|
:param link_result_list: (List[Dict[str, any]]): 包含连接和结果数据的字典列表。
|
||||||
|
:param scheme_start_time: (str): 方案模拟开始时间。
|
||||||
|
:param num_periods: (int): 方案模拟的周期数
|
||||||
|
:param scheme_Type: (str): 方案类型
|
||||||
|
:param scheme_Name: (str): 方案名称
|
||||||
|
:param bucket: (str): InfluxDB 的 bucket 名称,默认值为 "scheme_simulation_result"。
|
||||||
|
:param client: (InfluxDBClient): 已初始化的 InfluxDBClient 实例。
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
|
||||||
|
try:
|
||||||
|
write_api = client.write_api()
|
||||||
|
|
||||||
|
date_str = scheme_start_time.split('T')[0]
|
||||||
|
time_beijing = datetime.strptime(scheme_start_time, '%Y-%m-%dT%H:%M:%S%z')
|
||||||
|
|
||||||
|
timestep_parts = globals.hydraulic_timestep.split(':')
|
||||||
|
timestep = timedelta(hours=int(timestep_parts[0]), minutes=int(timestep_parts[1]), seconds=int(timestep_parts[2]))
|
||||||
|
|
||||||
|
for node_result in node_result_list:
|
||||||
|
# 提取节点信息和数据结果
|
||||||
|
node_id = node_result.get('node')
|
||||||
|
# 从period 0 到 period num_period - 1
|
||||||
|
for period_index in range(num_periods):
|
||||||
|
scheme_time = (time_beijing + (timestep * period_index)).isoformat()
|
||||||
|
data_list = [node_result.get('result', [])[period_index]]
|
||||||
|
for data in data_list:
|
||||||
|
# 构建 Point 数据,多个 field 存在于一个数据点中
|
||||||
|
node_point = Point("node") \
|
||||||
|
.tag("date", date_str) \
|
||||||
|
.tag("ID", node_id) \
|
||||||
|
.tag("scheme_Type", scheme_Type) \
|
||||||
|
.tag("scheme_Name", scheme_Name) \
|
||||||
|
.field("head", data.get('head', 0.0)) \
|
||||||
|
.field("pressure", data.get('pressure', 0.0)) \
|
||||||
|
.field("actualdemand", data.get('demand', 0.0)) \
|
||||||
|
.field("demanddeficit", None) \
|
||||||
|
.field("totalExternalOutflow", None) \
|
||||||
|
.field("quality", data.get('quality', 0.0)) \
|
||||||
|
.time(scheme_time)
|
||||||
|
# 写入数据到 InfluxDB,多个 field 在同一个 point 中
|
||||||
|
write_api.write(bucket=bucket, org=org_name, record=node_point)
|
||||||
|
write_api.flush()
|
||||||
|
|
||||||
|
for link_result in link_result_list:
|
||||||
|
link_id = link_result.get('link')
|
||||||
|
for period_index in range(num_periods):
|
||||||
|
scheme_time = (time_beijing + (timestep * period_index)).isoformat()
|
||||||
|
data_list = [link_result.get('result', [])[period_index]]
|
||||||
|
for data in data_list:
|
||||||
|
link_point = Point("link") \
|
||||||
|
.tag("date", date_str) \
|
||||||
|
.tag("ID", link_id) \
|
||||||
|
.tag("scheme_Type", scheme_Type) \
|
||||||
|
.tag("scheme_Name", scheme_Name) \
|
||||||
|
.field("flow", data.get('flow', 0.0)) \
|
||||||
|
.field("velocity", data.get('velocity', 0.0)) \
|
||||||
|
.field("headloss", data.get('headloss', 0.0)) \
|
||||||
|
.field("quality", data.get('quality', 0.0)) \
|
||||||
|
.field("status", data.get('status', "UNKNOWN")) \
|
||||||
|
.field("setting", data.get('setting', 0.0)) \
|
||||||
|
.field("reaction", data.get('reaction', 0.0)) \
|
||||||
|
.field("friction", data.get('friction', 0.0)) \
|
||||||
|
.time(scheme_time)
|
||||||
|
write_api.write(bucket=bucket, org=org_name, record=link_point)
|
||||||
|
write_api.flush()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise RuntimeError(f"数据写入 InfluxDB 时发生错误: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
# 2025/02/15
|
||||||
|
def query_SCADA_data_curve(api_query_id: str, start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list:
|
||||||
|
"""
|
||||||
|
根据SCADA设备的api_query_id和时间范围,查询得到曲线,查到的数据为0时区时间
|
||||||
|
:param api_query_id: SCADA设备的api_query_id
|
||||||
|
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例
|
||||||
|
:return: 查询结果的列表
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 将 start_date 的北京时间转换为 UTC 时间范围
|
||||||
|
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||||||
|
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
|
||||||
|
# 构建 Flux 查询语句
|
||||||
|
flux_query = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {start_time}, stop: {stop_time})
|
||||||
|
|> filter(fn: (r) => r["device_ID"] == "{api_query_id}")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
# 解析查询结果
|
||||||
|
results = []
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
results.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"value": record["_value"]
|
||||||
|
})
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
# 2025/02/18
|
||||||
|
def query_scheme_all_record_by_time(scheme_Type: str, scheme_Name: str, query_time: str,
|
||||||
|
bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> tuple:
|
||||||
|
"""
|
||||||
|
查询指定方案某一时刻的所有记录,包括‘node'和‘link’,分别以指定格式返回。
|
||||||
|
:param scheme_Type: 方案类型
|
||||||
|
:param scheme_Name: 方案名称
|
||||||
|
:param query_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。
|
||||||
|
:param bucket: 数据存储的 bucket 名称。
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例。
|
||||||
|
:return: dict: tuple: (node_records, link_records)
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 将北京时间转换为 UTC 时间
|
||||||
|
beijing_time = datetime.fromisoformat(query_time)
|
||||||
|
utc_time = beijing_time.astimezone(timezone.utc)
|
||||||
|
utc_start_time = utc_time - timedelta(seconds=1)
|
||||||
|
utc_stop_time = utc_time + timedelta(seconds=1)
|
||||||
|
# 构建 Flux 查询语句
|
||||||
|
flux_query = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
||||||
|
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|
||||||
|
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|
||||||
|
|> pivot(
|
||||||
|
rowKey:["_time"],
|
||||||
|
columnKey:["_field"],
|
||||||
|
valueColumn:"_value"
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
node_records = []
|
||||||
|
link_records = []
|
||||||
|
# 解析查询结果
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
# print(record.values) # 打印完整记录内容
|
||||||
|
measurement = record["_measurement"]
|
||||||
|
# 处理 node 数据
|
||||||
|
if measurement == "node":
|
||||||
|
node_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"ID": record["ID"],
|
||||||
|
"head": record["head"],
|
||||||
|
"pressure": record["pressure"],
|
||||||
|
"actualdemand": record["actualdemand"],
|
||||||
|
"quality": record["quality"]
|
||||||
|
})
|
||||||
|
# 处理 link 数据
|
||||||
|
elif measurement == "link":
|
||||||
|
link_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"linkID": record["ID"],
|
||||||
|
"flow": record["flow"],
|
||||||
|
"velocity": record["velocity"],
|
||||||
|
"headloss": record["headloss"],
|
||||||
|
"quality": record["quality"],
|
||||||
|
"status": record["status"],
|
||||||
|
"setting": record["setting"],
|
||||||
|
"reaction": record["reaction"],
|
||||||
|
"friction": record["friction"]
|
||||||
|
})
|
||||||
|
|
||||||
|
return node_records, link_records
|
||||||
|
|
||||||
|
# 2025/02/19
|
||||||
|
def query_scheme_curve_by_ID_property(scheme_Type: str, scheme_Name: str, ID: str, type: str, property: str,
|
||||||
|
bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> list:
|
||||||
|
"""
|
||||||
|
根据scheme_Type和scheme_Name,查询该模拟方案中,某一node或link的某一属性值的所有时间的结果
|
||||||
|
:param scheme_Type: 方案类型
|
||||||
|
:param scheme_Name: 方案名称
|
||||||
|
:param ID: 元素的ID
|
||||||
|
:param type: 元素的类型,node或link
|
||||||
|
:param property: 元素的属性值
|
||||||
|
:param bucket: 数据存储的 bucket 名称,默认值为 "scheme_simulation_result"
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例
|
||||||
|
:return: 查询结果的列表
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 确定 measurement
|
||||||
|
if type == "node":
|
||||||
|
measurement = "node"
|
||||||
|
elif type == "link":
|
||||||
|
measurement = "link"
|
||||||
|
else:
|
||||||
|
raise ValueError(f"不支持的类型: {type}")
|
||||||
|
# 构建 Flux 查询语句
|
||||||
|
flux_query = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: 2025-01-01T00:00:00Z)
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|
||||||
|
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|
||||||
|
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|
||||||
|
|> filter(fn: (r) => r["ID"] == "{ID}")
|
||||||
|
|> filter(fn: (r) => r["_field"] == "{property}")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
# 解析查询结果
|
||||||
|
results = []
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
results.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"value": record["_value"]
|
||||||
|
})
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
# 2025/02/21
|
||||||
|
def query_scheme_all_record(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result",
|
||||||
|
client: InfluxDBClient=client) -> tuple:
|
||||||
|
"""
|
||||||
|
查询指定方案的所有记录,包括‘node'和‘link’,分别以指定格式返回。
|
||||||
|
:param scheme_Type: 方案类型
|
||||||
|
:param scheme_Name: 方案名称
|
||||||
|
:param bucket: 数据存储的 bucket 名称。
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例。
|
||||||
|
:return: dict: tuple: (node_records, link_records)
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 构建 Flux 查询语句
|
||||||
|
flux_query = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: 2025-01-01T00:00:00Z)
|
||||||
|
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|
||||||
|
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link")
|
||||||
|
|> pivot(
|
||||||
|
rowKey:["_time"],
|
||||||
|
columnKey:["_field"],
|
||||||
|
valueColumn:"_value"
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
node_records = []
|
||||||
|
link_records = []
|
||||||
|
# 解析查询结果
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
# print(record.values) # 打印完整记录内容
|
||||||
|
measurement = record["_measurement"]
|
||||||
|
# 处理 node 数据
|
||||||
|
if measurement == "node":
|
||||||
|
node_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"ID": record["ID"],
|
||||||
|
"head": record["head"],
|
||||||
|
"pressure": record["pressure"],
|
||||||
|
"actualdemand": record["actualdemand"],
|
||||||
|
"quality": record["quality"]
|
||||||
|
})
|
||||||
|
# 处理 link 数据
|
||||||
|
elif measurement == "link":
|
||||||
|
link_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"linkID": record["ID"],
|
||||||
|
"flow": record["flow"],
|
||||||
|
"velocity": record["velocity"],
|
||||||
|
"headloss": record["headloss"],
|
||||||
|
"quality": record["quality"],
|
||||||
|
"status": record["status"],
|
||||||
|
"setting": record["setting"],
|
||||||
|
"reaction": record["reaction"],
|
||||||
|
"friction": record["friction"]
|
||||||
|
})
|
||||||
|
|
||||||
|
return node_records, link_records
|
||||||
|
|
||||||
|
# 2025/02/16
|
||||||
|
def export_SCADA_data_to_csv(start_date: str, end_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> None:
|
||||||
|
"""
|
||||||
|
导出influxdb中SCADA_data这个bucket的数据到csv中
|
||||||
|
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 将 start_date 的北京时间转换为 UTC 时间范围
|
||||||
|
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||||||
|
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
|
||||||
|
# 构建 Flux 查询语句
|
||||||
|
flux_query = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {start_time}, stop: {stop_time})
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
# 存储查询结果
|
||||||
|
rows = []
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
row = {
|
||||||
|
'time': record.get_time(),
|
||||||
|
'measurement': record.get_measurement(),
|
||||||
|
'date': record.values.get('date', None),
|
||||||
|
'description': record.values.get('description', None),
|
||||||
|
'device_ID': record.values.get('device_ID', None),
|
||||||
|
'monitored_value': record.get_value() if record.get_field() == 'monitored_value' else None,
|
||||||
|
'datacleaning_value': record.get_value() if record.get_field() == 'datacleaning_value' else None,
|
||||||
|
'simulation_value': record.get_value() if record.get_field() == 'simulation_value' else None,
|
||||||
|
}
|
||||||
|
rows.append(row)
|
||||||
|
# 动态生成 CSV 文件名
|
||||||
|
csv_filename = f"SCADA_data_{start_date}至{end_date}.csv"
|
||||||
|
# 写入到 CSV 文件
|
||||||
|
with open(csv_filename, mode='w', newline='') as file:
|
||||||
|
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'description', 'device_ID', 'monitored_value', 'datacleaning_value', 'simulation_value'])
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(rows)
|
||||||
|
|
||||||
|
print(f"Data exported to {csv_filename} successfully.")
|
||||||
|
|
||||||
|
# 2025/02/17
|
||||||
|
def export_realtime_simulation_result_to_csv(start_date: str, end_date: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> None:
|
||||||
|
"""
|
||||||
|
导出influxdb中realtime_simulation_result这个bucket的数据到csv中
|
||||||
|
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 将 start_date 的北京时间转换为 UTC 时间范围
|
||||||
|
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||||||
|
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
|
||||||
|
# 构建 Flux 查询语句,查询指定时间范围内的数据
|
||||||
|
flux_query_link = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {start_time}, stop: {stop_time})
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "link")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
link_tables = query_api.query(flux_query_link)
|
||||||
|
# 存储link类的数据
|
||||||
|
link_rows = []
|
||||||
|
link_data = {}
|
||||||
|
for table in link_tables:
|
||||||
|
for record in table.records:
|
||||||
|
key = (record.get_time(), record.values.get('ID', None))
|
||||||
|
if key not in link_data:
|
||||||
|
link_data[key] = {}
|
||||||
|
field = record.get_field()
|
||||||
|
link_data[key][field] = record.get_value()
|
||||||
|
link_data[key]['measurement'] = record.get_measurement()
|
||||||
|
link_data[key]['date'] = record.values.get('date', None)
|
||||||
|
# 构建 Flux 查询语句,查询指定时间范围内的数据
|
||||||
|
flux_query_node = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {start_time}, stop: {stop_time})
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "node")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
node_tables = query_api.query(flux_query_node)
|
||||||
|
# 存储node类的数据
|
||||||
|
node_rows = []
|
||||||
|
node_data = {}
|
||||||
|
for table in node_tables:
|
||||||
|
for record in table.records:
|
||||||
|
key = (record.get_time(), record.values.get('ID', None))
|
||||||
|
if key not in node_data:
|
||||||
|
node_data[key] = {}
|
||||||
|
field = record.get_field()
|
||||||
|
node_data[key][field] = record.get_value()
|
||||||
|
node_data[key]['measurement'] = record.get_measurement()
|
||||||
|
node_data[key]['date'] = record.values.get('date', None)
|
||||||
|
|
||||||
|
for key in set(link_data.keys()):
|
||||||
|
row = {'time': key[0], "ID": key[1]}
|
||||||
|
row.update(link_data.get(key, {}))
|
||||||
|
link_rows.append(row)
|
||||||
|
for key in set(node_data.keys()):
|
||||||
|
row = {'time': key[0], "ID": key[1]}
|
||||||
|
row.update(node_data.get(key, {}))
|
||||||
|
node_rows.append(row)
|
||||||
|
|
||||||
|
# 动态生成 CSV 文件名
|
||||||
|
csv_filename_link = f"realtime_simulation_link_result_{start_date}至{end_date}.csv"
|
||||||
|
csv_filename_node = f"realtime_simulation_node_result_{start_date}至{end_date}.csv"
|
||||||
|
# 写入到 CSV 文件
|
||||||
|
with open(csv_filename_link, mode='w', newline='') as file:
|
||||||
|
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction'])
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(link_rows)
|
||||||
|
with open(csv_filename_node, mode='w', newline='') as file:
|
||||||
|
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'ID', 'head', 'pressure', 'actualdemand',
|
||||||
|
'demanddeficit', 'totalExternalOutflow', 'quality'])
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(node_rows)
|
||||||
|
|
||||||
|
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
|
||||||
|
|
||||||
|
# 2025/02/18
|
||||||
|
def export_scheme_simulation_result_to_csv_time(start_date: str, end_date: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None:
|
||||||
|
"""
|
||||||
|
导出influxdb中scheme_simulation_result这个bucket的数据到csv中
|
||||||
|
:param start_date: 查询开始的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param end_date: 查询结束的时间,格式为 'YYYY-MM-DD'
|
||||||
|
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 将 start_date 的北京时间转换为 UTC 时间范围
|
||||||
|
start_time = (datetime.strptime(start_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||||||
|
stop_time = datetime.strptime(end_date, "%Y-%m-%d").replace(hour=15, minute=59, second=59, tzinfo=timezone.utc).isoformat()
|
||||||
|
# 构建 Flux 查询语句,查询指定时间范围内的数据
|
||||||
|
flux_query_link = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {start_time}, stop: {stop_time})
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "link")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
link_tables = query_api.query(flux_query_link)
|
||||||
|
# 存储link类的数据
|
||||||
|
link_rows = []
|
||||||
|
link_data = {}
|
||||||
|
for table in link_tables:
|
||||||
|
for record in table.records:
|
||||||
|
key = (record.get_time(), record.values.get('ID', None))
|
||||||
|
if key not in link_data:
|
||||||
|
link_data[key] = {}
|
||||||
|
field = record.get_field()
|
||||||
|
link_data[key][field] = record.get_value()
|
||||||
|
link_data[key]['measurement'] = record.get_measurement()
|
||||||
|
link_data[key]['date'] = record.values.get('date', None)
|
||||||
|
link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
|
||||||
|
link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
|
||||||
|
# 构建 Flux 查询语句,查询指定时间范围内的数据
|
||||||
|
flux_query_node = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {start_time}, stop: {stop_time})
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "node")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
node_tables = query_api.query(flux_query_node)
|
||||||
|
# 存储node类的数据
|
||||||
|
node_rows = []
|
||||||
|
node_data = {}
|
||||||
|
for table in node_tables:
|
||||||
|
for record in table.records:
|
||||||
|
key = (record.get_time(), record.values.get('ID', None))
|
||||||
|
if key not in node_data:
|
||||||
|
node_data[key] = {}
|
||||||
|
field = record.get_field()
|
||||||
|
node_data[key][field] = record.get_value()
|
||||||
|
node_data[key]['measurement'] = record.get_measurement()
|
||||||
|
node_data[key]['date'] = record.values.get('date', None)
|
||||||
|
node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
|
||||||
|
node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
|
||||||
|
|
||||||
|
for key in set(link_data.keys()):
|
||||||
|
row = {'time': key[0], "ID": key[1]}
|
||||||
|
row.update(link_data.get(key, {}))
|
||||||
|
link_rows.append(row)
|
||||||
|
for key in set(node_data.keys()):
|
||||||
|
row = {'time': key[0], "ID": key[1]}
|
||||||
|
row.update(node_data.get(key, {}))
|
||||||
|
node_rows.append(row)
|
||||||
|
|
||||||
|
# 动态生成 CSV 文件名
|
||||||
|
csv_filename_link = f"scheme_simulation_link_result_{start_date}至{end_date}.csv"
|
||||||
|
csv_filename_node = f"scheme_simulation_node_result_{start_date}至{end_date}.csv"
|
||||||
|
# 写入到 CSV 文件
|
||||||
|
with open(csv_filename_link, mode='w', newline='') as file:
|
||||||
|
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction'])
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(link_rows)
|
||||||
|
with open(csv_filename_node, mode='w', newline='') as file:
|
||||||
|
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand',
|
||||||
|
'demanddeficit', 'totalExternalOutflow', 'quality'])
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(node_rows)
|
||||||
|
|
||||||
|
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
|
||||||
|
|
||||||
|
# 2025/02/18
|
||||||
|
def export_scheme_simulation_result_to_csv_scheme(scheme_Type: str, scheme_Name: str, bucket: str="scheme_simulation_result", client: InfluxDBClient=client) -> None:
|
||||||
|
"""
|
||||||
|
导出influxdb中scheme_simulation_result这个bucket的数据到csv中
|
||||||
|
:param scheme_Type: 查询的方案类型
|
||||||
|
:param scheme_Name: 查询的方案名
|
||||||
|
:param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data"
|
||||||
|
:param client: 已初始化的 InfluxDBClient 实例
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
if client.ping():
|
||||||
|
print("{} -- Successfully connected to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format(
|
||||||
|
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 构建 Flux 查询语句,查询指定时间范围内的数据
|
||||||
|
flux_query_link = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: 2025-01-01T00:00:00Z)
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "link")
|
||||||
|
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|
||||||
|
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
link_tables = query_api.query(flux_query_link)
|
||||||
|
# 存储link类的数据
|
||||||
|
link_rows = []
|
||||||
|
link_data = {}
|
||||||
|
for table in link_tables:
|
||||||
|
for record in table.records:
|
||||||
|
key = (record.get_time(), record.values.get('ID', None))
|
||||||
|
if key not in link_data:
|
||||||
|
link_data[key] = {}
|
||||||
|
field = record.get_field()
|
||||||
|
link_data[key][field] = record.get_value()
|
||||||
|
link_data[key]['measurement'] = record.get_measurement()
|
||||||
|
link_data[key]['date'] = record.values.get('date', None)
|
||||||
|
link_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
|
||||||
|
link_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
|
||||||
|
# 构建 Flux 查询语句,查询指定时间范围内的数据
|
||||||
|
flux_query_node = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: 2025-01-01T00:00:00Z)
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "node")
|
||||||
|
|> filter(fn: (r) => r["scheme_Type"] == "{scheme_Type}")
|
||||||
|
|> filter(fn: (r) => r["scheme_Name"] == "{scheme_Name}")
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
node_tables = query_api.query(flux_query_node)
|
||||||
|
# 存储node类的数据
|
||||||
|
node_rows = []
|
||||||
|
node_data = {}
|
||||||
|
for table in node_tables:
|
||||||
|
for record in table.records:
|
||||||
|
key = (record.get_time(), record.values.get('ID', None))
|
||||||
|
if key not in node_data:
|
||||||
|
node_data[key] = {}
|
||||||
|
field = record.get_field()
|
||||||
|
node_data[key][field] = record.get_value()
|
||||||
|
node_data[key]['measurement'] = record.get_measurement()
|
||||||
|
node_data[key]['date'] = record.values.get('date', None)
|
||||||
|
node_data[key]['scheme_Type'] = record.values.get('scheme_Type', None)
|
||||||
|
node_data[key]['scheme_Name'] = record.values.get('scheme_Name', None)
|
||||||
|
|
||||||
|
for key in set(link_data.keys()):
|
||||||
|
row = {'time': key[0], "ID": key[1]}
|
||||||
|
row.update(link_data.get(key, {}))
|
||||||
|
link_rows.append(row)
|
||||||
|
for key in set(node_data.keys()):
|
||||||
|
row = {'time': key[0], "ID": key[1]}
|
||||||
|
row.update(node_data.get(key, {}))
|
||||||
|
node_rows.append(row)
|
||||||
|
|
||||||
|
# 动态生成 CSV 文件名
|
||||||
|
csv_filename_link = f"scheme_simulation_link_result_{scheme_Name}_of_{scheme_Type}.csv"
|
||||||
|
csv_filename_node = f"scheme_simulation_node_result_{scheme_Name}_of_{scheme_Type}.csv"
|
||||||
|
# 写入到 CSV 文件
|
||||||
|
with open(csv_filename_link, mode='w', newline='') as file:
|
||||||
|
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'flow', 'leakage', 'velocity', 'headloss', 'status', 'setting', 'quality', 'friction', 'reaction'])
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(link_rows)
|
||||||
|
with open(csv_filename_node, mode='w', newline='') as file:
|
||||||
|
writer = csv.DictWriter(file, fieldnames=['time', 'measurement', 'date', 'scheme_Type', 'scheme_Name', 'ID', 'head', 'pressure', 'actualdemand',
|
||||||
|
'demanddeficit', 'totalExternalOutflow', 'quality'])
|
||||||
|
writer.writeheader()
|
||||||
|
writer.writerows(node_rows)
|
||||||
|
|
||||||
|
print(f"Data exported to {csv_filename_link} and {csv_filename_node} successfully.")
|
||||||
|
|
||||||
# 示例调用
|
# 示例调用
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
url = influxdb_info.url
|
url = influxdb_info.url
|
||||||
@@ -1438,20 +2148,19 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
# step3: 查询测试示例
|
# step3: 查询测试示例
|
||||||
with InfluxDBClient(url=url, token=token, org=org_name) as client:
|
with InfluxDBClient(url=url, token=token, org=org_name) as client:
|
||||||
# 示例1:query_latest_record_by_ID
|
|
||||||
bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称
|
|
||||||
node_id = "ZBBDTZDP000022" # 查询的节点 ID
|
|
||||||
link_id = "ZBBGXSZW000002"
|
|
||||||
|
|
||||||
|
# # 示例1:query_latest_record_by_ID
|
||||||
|
# bucket_name = "realtime_simulation_result" # 数据存储的 bucket 名称
|
||||||
|
# node_id = "ZBBDTZDP000022" # 查询的节点 ID
|
||||||
|
# link_id = "ZBBGXSZW000002"
|
||||||
|
#
|
||||||
# latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
|
# latest_record = query_latest_record_by_ID(ID=node_id, type="node", bucket=bucket_name, client=client)
|
||||||
# latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
|
# # # latest_record = query_latest_record_by_ID(ID=link_id, type="link", bucket=bucket_name, client=client)
|
||||||
|
# #
|
||||||
all_records = query_all_record_by_time('2025-02-09T17:30:00+08:00')
|
# if latest_record:
|
||||||
|
# print("最新记录:", latest_record)
|
||||||
if latest_record:
|
# else:
|
||||||
print("最新记录:", latest_record)
|
# print("未找到符合条件的记录。")
|
||||||
else:
|
|
||||||
print("未找到符合条件的记录。")
|
|
||||||
|
|
||||||
# 示例2:query_all_record_by_time
|
# 示例2:query_all_record_by_time
|
||||||
# node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00")
|
# node_records, link_records = query_all_record_by_time(query_time="2024-11-25T06:00:00+08:00")
|
||||||
@@ -1464,5 +2173,43 @@ if __name__ == "__main__":
|
|||||||
# print(curve_result)
|
# print(curve_result)
|
||||||
|
|
||||||
# 示例4:query_SCADA_data_by_device_ID_and_time
|
# 示例4:query_SCADA_data_by_device_ID_and_time
|
||||||
# SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-08T10:30:00+08:00')
|
# SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-14T23:58:00+08:00')
|
||||||
# print(SCADA_result_dict)
|
# print(SCADA_result_dict)
|
||||||
|
|
||||||
|
# 示例5:query_SCADA_data_curve
|
||||||
|
# SCADA_result = query_SCADA_data_curve(api_query_id='3853', start_date='2025-02-14', end_date='2025-02-16')
|
||||||
|
# print(SCADA_result)
|
||||||
|
|
||||||
|
# 示例6:export_SCADA_data_to_csv
|
||||||
|
# export_SCADA_data_to_csv(start_date='2025-02-13', end_date='2025-02-15')
|
||||||
|
|
||||||
|
# 示例7:export_realtime_simulation_result_to_csv
|
||||||
|
# export_realtime_simulation_result_to_csv(start_date='2025-02-13', end_date='2025-02-15')
|
||||||
|
|
||||||
|
# 示例8:export_scheme_simulation_result_to_csv_time
|
||||||
|
# export_scheme_simulation_result_to_csv_time(start_date='2025-02-13', end_date='2025-02-15')
|
||||||
|
|
||||||
|
# 示例9:export_scheme_simulation_result_to_csv_scheme
|
||||||
|
# export_scheme_simulation_result_to_csv_scheme(scheme_Type='burst_Analysis', scheme_Name='scheme1')
|
||||||
|
|
||||||
|
# 示例10:query_scheme_all_record_by_time
|
||||||
|
# node_records, link_records = query_scheme_all_record_by_time(scheme_Type='burst_Analysis', scheme_Name='scheme1', query_time="2025-02-14T10:30:00+08:00")
|
||||||
|
# print("Node 数据:", node_records)
|
||||||
|
# print("Link 数据:", link_records)
|
||||||
|
|
||||||
|
# 示例11:query_scheme_curve_by_ID_property
|
||||||
|
# curve_result = query_scheme_curve_by_ID_property(scheme_Type='burst_Analysis', scheme_Name='scheme1', ID='ZBBDTZDP000022',
|
||||||
|
# type='node', property='head')
|
||||||
|
# print(curve_result)
|
||||||
|
|
||||||
|
# 示例12:query_all_record_by_date
|
||||||
|
# node_records, link_records = query_all_record_by_date(query_date='2025-02-14')
|
||||||
|
# print("Node 数据:", node_records)
|
||||||
|
# print("Link 数据:", link_records)
|
||||||
|
|
||||||
|
# 示例13:query_scheme_all_record
|
||||||
|
node_records, link_records = query_scheme_all_record(scheme_Type='burst_Analysis', scheme_Name='scheme1')
|
||||||
|
print("Node 数据:", node_records)
|
||||||
|
print("Link 数据:", link_records)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user