Files
TJWaterServerBinary/app/infra/db/timescaledb/composite_queries.py

605 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from typing import List, Optional, Any, Dict, Tuple
from datetime import datetime, timedelta
from psycopg import AsyncConnection
import pandas as pd
import numpy as np
from app.algorithms.api_ex.Fdataclean import clean_flow_data_df_kf
from app.algorithms.api_ex.Pdataclean import clean_pressure_data_df_km
from app.algorithms.api_ex.pipeline_health_analyzer import PipelineHealthAnalyzer
from app.infra.db.postgresql.internal_queries import InternalQueries
from app.infra.db.postgresql.scada_info import ScadaRepository as PostgreScadaRepository
from app.infra.db.timescaledb.schemas.realtime import RealtimeRepository
from app.infra.db.timescaledb.schemas.scheme import SchemeRepository
from app.infra.db.timescaledb.schemas.scada import ScadaRepository
class CompositeQueries:
"""
复合查询类,提供跨表查询功能
"""
@staticmethod
async def get_scada_associated_realtime_simulation_data(
timescale_conn: AsyncConnection,
postgres_conn: AsyncConnection,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
) -> Dict[str, List[Dict[str, Any]]]:
"""
获取 SCADA 关联的 link/node 模拟值
根据传入的 SCADA device_ids找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
postgres_conn: PostgreSQL 异步连接
device_ids: SCADA 设备ID列表
start_time: 开始时间
end_time: 结束时间
Returns:
模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id
Raises:
ValueError: 当 SCADA 设备未找到或字段无效时
"""
result = {}
# 1. 查询所有 SCADA 信息
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
for device_id in device_ids:
# 2. 根据 device_id 找到对应的 SCADA 信息
target_scada = None
for scada in scada_infos:
if scada["id"] == device_id:
target_scada = scada
break
if not target_scada:
raise ValueError(f"SCADA device {device_id} not found")
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
element_id = target_scada["associated_element_id"]
scada_type = target_scada["type"]
if scada_type.lower() == "pipe_flow":
# 查询 link 模拟数据
res = await RealtimeRepository.get_link_field_by_time_range(
timescale_conn, start_time, end_time, element_id, "flow"
)
elif scada_type.lower() == "pressure":
# 查询 node 模拟数据
res = await RealtimeRepository.get_node_field_by_time_range(
timescale_conn, start_time, end_time, element_id, "pressure"
)
else:
raise ValueError(f"Unknown SCADA type: {scada_type}")
# 添加 scada_id 到每个数据项
for item in res:
item["scada_id"] = device_id
result[device_id] = res
return result
@staticmethod
async def get_scada_associated_scheme_simulation_data(
timescale_conn: AsyncConnection,
postgres_conn: AsyncConnection,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
scheme_type: str,
scheme_name: str,
) -> Dict[str, List[Dict[str, Any]]]:
"""
获取 SCADA 关联的 link/node scheme 模拟值
根据传入的 SCADA device_ids找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
postgres_conn: PostgreSQL 异步连接
device_ids: SCADA 设备ID列表
start_time: 开始时间
end_time: 结束时间
Returns:
模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id
Raises:
ValueError: 当 SCADA 设备未找到或字段无效时
"""
result = {}
# 1. 查询所有 SCADA 信息
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
for device_id in device_ids:
# 2. 根据 device_id 找到对应的 SCADA 信息
target_scada = None
for scada in scada_infos:
if scada["id"] == device_id:
target_scada = scada
break
if not target_scada:
raise ValueError(f"SCADA device {device_id} not found")
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
element_id = target_scada["associated_element_id"]
scada_type = target_scada["type"]
if scada_type.lower() == "pipe_flow":
# 查询 link 模拟数据
res = await SchemeRepository.get_link_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
element_id,
"flow",
)
elif scada_type.lower() == "pressure":
# 查询 node 模拟数据
res = await SchemeRepository.get_node_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
element_id,
"pressure",
)
else:
raise ValueError(f"Unknown SCADA type: {scada_type}")
# 添加 scada_id 到每个数据项
for item in res:
item["scada_id"] = device_id
result[device_id] = res
return result
@staticmethod
async def get_realtime_simulation_data(
timescale_conn: AsyncConnection,
featureInfos: List[Tuple[str, str]],
start_time: datetime,
end_time: datetime,
) -> Dict[str, List[Dict[str, Any]]]:
"""
获取 link/node 模拟值
根据传入的 featureInfos找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
featureInfos: 传入的 feature 信息列表,包含 (element_id, type)
start_time: 开始时间
end_time: 结束时间
Returns:
模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id
Raises:
ValueError: 当 SCADA 设备未找到或字段无效时
"""
result = {}
for feature_id, type in featureInfos:
if type.lower() == "pipe":
# 查询 link 模拟数据
res = await RealtimeRepository.get_link_field_by_time_range(
timescale_conn, start_time, end_time, feature_id, "flow"
)
elif type.lower() == "junction":
# 查询 node 模拟数据
res = await RealtimeRepository.get_node_field_by_time_range(
timescale_conn, start_time, end_time, feature_id, "pressure"
)
else:
raise ValueError(f"Unknown type: {type}")
# 添加 scada_id 到每个数据项
for item in res:
item["feature_id"] = feature_id
result[feature_id] = res
return result
@staticmethod
async def get_scheme_simulation_data(
timescale_conn: AsyncConnection,
featureInfos: List[Tuple[str, str]],
start_time: datetime,
end_time: datetime,
scheme_type: str,
scheme_name: str,
) -> Dict[str, List[Dict[str, Any]]]:
"""
获取 link/node scheme 模拟值
根据传入的 featureInfos找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
featureInfos: 传入的 feature 信息列表,包含 (element_id, type)
start_time: 开始时间
end_time: 结束时间
scheme_type: 工况类型
scheme_name: 工况名称
Returns:
模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id
Raises:
ValueError: 当类型无效时
"""
result = {}
for feature_id, type in featureInfos:
if type.lower() == "pipe":
# 查询 link 模拟数据
res = await SchemeRepository.get_link_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
feature_id,
"flow",
)
elif type.lower() == "junction":
# 查询 node 模拟数据
res = await SchemeRepository.get_node_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
feature_id,
"pressure",
)
else:
raise ValueError(f"Unknown type: {type}")
# 添加 feature_id 到每个数据项
for item in res:
item["feature_id"] = feature_id
result[feature_id] = res
return result
@staticmethod
async def get_element_associated_scada_data(
timescale_conn: AsyncConnection,
postgres_conn: AsyncConnection,
element_id: str,
start_time: datetime,
end_time: datetime,
use_cleaned: bool = False,
) -> Optional[Any]:
"""
获取 link/node 关联的 SCADA 监测值
根据传入的 link/node id匹配 SCADA 信息,
如果存在关联的 SCADA device_id获取实际的监测数据
Args:
timescale_conn: TimescaleDB 异步连接
postgres_conn: PostgreSQL 异步连接
element_id: link 或 node 的 ID
start_time: 开始时间
end_time: 结束时间
use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value")
Returns:
SCADA 监测数据值,如果没有找到则返回 None
Raises:
ValueError: 当元素类型无效时
"""
# 1. 查询所有 SCADA 信息
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
# 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备
associated_scada = None
for scada in scada_infos:
if scada["associated_element_id"] == element_id:
associated_scada = scada
break
if not associated_scada:
# 没有找到关联的 SCADA 设备
return None
# 3. 通过 SCADA device_id 获取监测数据
device_id = associated_scada["id"]
# 根据 use_cleaned 参数选择字段
data_field = "cleaned_value" if use_cleaned else "monitored_value"
# 保证 device_id 以列表形式传递
res = await ScadaRepository.get_scada_field_by_id_time_range(
timescale_conn, [device_id], start_time, end_time, data_field
)
# 将 device_id 替换为 element_id 返回
return {element_id: res.get(device_id, [])}
@staticmethod
async def clean_scada_data(
timescale_conn: AsyncConnection,
postgres_conn: AsyncConnection,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
) -> str:
"""
清洗 SCADA 数据
根据 device_ids 查询 monitored_value清洗后更新 cleaned_value
Args:
timescale_conn: TimescaleDB 连接
postgres_conn: PostgreSQL 连接
device_ids: 设备 ID 列表
start_time: 开始时间
end_time: 结束时间
Returns:
"success" 或错误信息
"""
try:
# 获取所有 SCADA 信息
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
# 将列表转换为字典,以 device_id 为键
scada_device_info_dict = {info["id"]: info for info in scada_infos}
# 如果 device_ids 为空,则处理所有 SCADA 设备
if not device_ids:
device_ids = list(scada_device_info_dict.keys())
# 批量查询所有设备的数据
data = await ScadaRepository.get_scada_field_by_id_time_range(
timescale_conn, device_ids, start_time, end_time, "monitored_value"
)
if not data:
return "error: fetch none scada data" # 没有数据,直接返回
# 将嵌套字典转换为 DataFrame使用 time 作为索引
# data 格式: {device_id: [{"time": "...", "value": ...}, ...]}
all_records = []
for device_id, records in data.items():
for record in records:
all_records.append(
{
"time": record["time"],
"device_id": device_id,
"value": record["value"],
}
)
if not all_records:
return "error: fetch none scada data" # 没有数据,直接返回
# 创建 DataFrame 并透视,使 device_id 成为列
df_long = pd.DataFrame(all_records)
df = df_long.pivot(index="time", columns="device_id", values="value")
# 根据type分类设备
pressure_ids = [
id
for id in df.columns
if scada_device_info_dict.get(id, {}).get("type") == "pressure"
]
flow_ids = [
id
for id in df.columns
if scada_device_info_dict.get(id, {}).get("type") == "pipe_flow"
]
# 处理pressure数据
if pressure_ids:
pressure_df = df[pressure_ids]
# 重置索引,将 time 变为普通列
pressure_df = pressure_df.reset_index()
# 移除 time 列,准备输入给清洗方法
value_df = pressure_df.drop(columns=["time"])
# 调用清洗方法
cleaned_value_df = clean_pressure_data_df_km(value_df)
# 添加 time 列到首列
cleaned_df = pd.concat([pressure_df["time"], cleaned_value_df], axis=1)
# 将清洗后的数据写回数据库
for device_id in pressure_ids:
if device_id in cleaned_df.columns:
cleaned_values = cleaned_df[device_id].tolist()
time_values = cleaned_df["time"].tolist()
for i, time_str in enumerate(time_values):
time_dt = datetime.fromisoformat(time_str)
value = cleaned_values[i]
await ScadaRepository.update_scada_field(
timescale_conn,
time_dt,
device_id,
"cleaned_value",
value,
)
# 处理flow数据
if flow_ids:
flow_df = df[flow_ids]
# 重置索引,将 time 变为普通列
flow_df = flow_df.reset_index()
# 移除 time 列,准备输入给清洗方法
value_df = flow_df.drop(columns=["time"])
# 调用清洗方法
cleaned_value_df = clean_flow_data_df_kf(value_df)
# 添加 time 列到首列
cleaned_df = pd.concat([flow_df["time"], cleaned_value_df], axis=1)
# 将清洗后的数据写回数据库
for device_id in flow_ids:
if device_id in cleaned_df.columns:
cleaned_values = cleaned_df[device_id].tolist()
time_values = cleaned_df["time"].tolist()
for i, time_str in enumerate(time_values):
time_dt = datetime.fromisoformat(time_str)
value = cleaned_values[i]
await ScadaRepository.update_scada_field(
timescale_conn,
time_dt,
device_id,
"cleaned_value",
value,
)
return "success"
except Exception as e:
return f"error: {str(e)}"
@staticmethod
async def predict_pipeline_health(
timescale_conn: AsyncConnection,
network_name: str,
query_time: datetime,
) -> List[Dict[str, Any]]:
"""
预测管道健康状况
根据管网名称和当前时间,查询管道信息和实时数据,
使用随机生存森林模型预测管道的生存概率
Args:
timescale_conn: TimescaleDB 异步连接
db_name: 管网数据库名称
query_time: 查询时间
property_conditions: 可选的管道筛选条件,如 {"diameter": 300}
Returns:
预测结果列表,每个元素包含 link_id 和对应的生存函数
Raises:
ValueError: 当参数无效或数据不足时
FileNotFoundError: 当模型文件未找到时
"""
try:
# 1. 准备时间范围查询时间前后1秒
start_time = query_time - timedelta(seconds=1)
end_time = query_time + timedelta(seconds=1)
# 2. 先查询流速数据velocity获取有数据的管道ID列表
velocity_data = await RealtimeRepository.get_links_field_by_time_range(
timescale_conn, start_time, end_time, "velocity"
)
if not velocity_data:
raise ValueError("未找到流速数据")
# 3. 只查询有流速数据的管道的基本信息
valid_link_ids = list(velocity_data.keys())
# 批量查询这些管道的详细信息
fields = ["id", "diameter", "node1", "node2"]
all_links = InternalQueries.get_links_by_property(
fields=fields,
db_name=network_name,
)
# 转换为字典以快速查找
links_dict = {link["id"]: link for link in all_links}
# 获取所有需要查询的节点ID
node_ids = set()
for link_id in valid_link_ids:
if link_id in links_dict:
link = links_dict[link_id]
node_ids.add(link["node1"])
node_ids.add(link["node2"])
# 4. 批量查询压力数据pressure
pressure_data = await RealtimeRepository.get_nodes_field_by_time_range(
timescale_conn, start_time, end_time, "pressure"
)
# 5. 组合数据结构
materials = []
diameters = []
velocities = []
pressures = []
link_ids = []
for link_id in valid_link_ids:
# 跳过不在管道字典中的ID如泵等其他元素
if link_id not in links_dict:
continue
link = links_dict[link_id]
diameter = link["diameter"]
node1 = link["node1"]
node2 = link["node2"]
# 获取流速数据
velocity_values = velocity_data[link_id]
velocity = velocity_values[-1]["value"] if velocity_values else 0
# 获取node1和node2的压力数据计算平均值
node1_pressure = 0
node2_pressure = 0
if node1 in pressure_data and pressure_data[node1]:
pressure_values = pressure_data[node1]
node1_pressure = (
pressure_values[-1]["value"] if pressure_values else 0
)
if node2 in pressure_data and pressure_data[node2]:
pressure_values = pressure_data[node2]
node2_pressure = (
pressure_values[-1]["value"] if pressure_values else 0
)
# 计算平均压力
avg_pressure = (node1_pressure + node2_pressure) / 2
# 添加到列表
link_ids.append(link_id)
materials.append(7) # 默认材料类型为7可根据实际情况调整
diameters.append(diameter)
velocities.append(velocity)
pressures.append(avg_pressure)
if not link_ids:
raise ValueError("没有找到有效的管道数据用于预测")
# 6. 创建DataFrame
data = pd.DataFrame(
{
"Material": materials,
"Diameter": diameters,
"Flow Velocity": velocities,
"Pressure": pressures,
}
)
# 7. 使用PipelineHealthAnalyzer进行预测
analyzer = PipelineHealthAnalyzer()
survival_functions = analyzer.predict_survival(data)
# 8. 组合结果
results = []
for i, link_id in enumerate(link_ids):
sf = survival_functions[i]
results.append(
{
"link_id": link_id,
"survival_function": {
"x": sf.x.tolist(), # 时间点(年)
"y": sf.y.tolist(), # 生存概率
},
}
)
return results
except Exception as e:
raise ValueError(f"管道健康预测失败: {str(e)}")