341 lines
13 KiB
Python
341 lines
13 KiB
Python
from typing import List, Optional, Any
|
||
from datetime import datetime
|
||
from psycopg import AsyncConnection
|
||
import pandas as pd
|
||
from api_ex.Fdataclean import clean_flow_data_df_kf
|
||
from api_ex.Pdataclean import clean_pressure_data_df_km
|
||
|
||
from postgresql.scada_info import ScadaRepository as PostgreScadaRepository
|
||
from timescaledb.schemas.realtime import RealtimeRepository
|
||
from timescaledb.schemas.scheme import SchemeRepository
|
||
from timescaledb.schemas.scada import ScadaRepository
|
||
|
||
|
||
class CompositeQueries:
|
||
"""
|
||
复合查询类,提供跨表查询功能
|
||
"""
|
||
|
||
@staticmethod
|
||
async def get_scada_associated_realtime_simulation_data(
|
||
timescale_conn: AsyncConnection,
|
||
postgres_conn: AsyncConnection,
|
||
device_ids: List[str],
|
||
start_time: datetime,
|
||
end_time: datetime,
|
||
) -> List[Optional[Any]]:
|
||
"""
|
||
获取 SCADA 关联的 link/node 模拟值
|
||
|
||
根据传入的 SCADA device_ids,找到关联的 link/node,
|
||
并根据对应的 type,查询对应的模拟数据
|
||
|
||
Args:
|
||
timescale_conn: TimescaleDB 异步连接
|
||
postgres_conn: PostgreSQL 异步连接
|
||
device_ids: SCADA 设备ID列表
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
field: 要查询的字段名
|
||
|
||
Returns:
|
||
模拟数据值列表,如果没有找到则对应位置返回 None
|
||
|
||
Raises:
|
||
ValueError: 当 SCADA 设备未找到或字段无效时
|
||
"""
|
||
results = []
|
||
# 1. 查询所有 SCADA 信息
|
||
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
|
||
|
||
for device_id in device_ids:
|
||
# 2. 根据 device_id 找到对应的 SCADA 信息
|
||
target_scada = None
|
||
for scada in scada_infos:
|
||
if scada["id"] == device_id:
|
||
target_scada = scada
|
||
break
|
||
|
||
if not target_scada:
|
||
raise ValueError(f"SCADA device {device_id} not found")
|
||
|
||
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
|
||
element_id = target_scada["associated_element_id"]
|
||
scada_type = target_scada["type"]
|
||
|
||
if scada_type.lower() == "pipe_flow":
|
||
# 查询 link 模拟数据
|
||
res = await RealtimeRepository.get_link_field_by_time_range(
|
||
timescale_conn, start_time, end_time, element_id, "flow"
|
||
)
|
||
elif scada_type.lower() == "pressure":
|
||
# 查询 node 模拟数据
|
||
res = await RealtimeRepository.get_node_field_by_time_range(
|
||
timescale_conn, start_time, end_time, element_id, "pressure"
|
||
)
|
||
else:
|
||
raise ValueError(f"Unknown SCADA type: {scada_type}")
|
||
results.append(res)
|
||
return results
|
||
|
||
@staticmethod
|
||
async def get_scada_associated_scheme_simulation_data(
|
||
timescale_conn: AsyncConnection,
|
||
postgres_conn: AsyncConnection,
|
||
device_ids: List[str],
|
||
start_time: datetime,
|
||
end_time: datetime,
|
||
scheme_type: str,
|
||
scheme_name: str,
|
||
) -> List[Optional[Any]]:
|
||
"""
|
||
获取 SCADA 关联的 link/node 模拟值
|
||
|
||
根据传入的 SCADA device_ids,找到关联的 link/node,
|
||
并根据对应的 type,查询对应的模拟数据
|
||
|
||
Args:
|
||
timescale_conn: TimescaleDB 异步连接
|
||
postgres_conn: PostgreSQL 异步连接
|
||
device_ids: SCADA 设备ID列表
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
field: 要查询的字段名
|
||
|
||
Returns:
|
||
模拟数据值列表,如果没有找到则对应位置返回 None
|
||
|
||
Raises:
|
||
ValueError: 当 SCADA 设备未找到或字段无效时
|
||
"""
|
||
results = []
|
||
# 1. 查询所有 SCADA 信息
|
||
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
|
||
|
||
for device_id in device_ids:
|
||
# 2. 根据 device_id 找到对应的 SCADA 信息
|
||
target_scada = None
|
||
for scada in scada_infos:
|
||
if scada["id"] == device_id:
|
||
target_scada = scada
|
||
break
|
||
|
||
if not target_scada:
|
||
raise ValueError(f"SCADA device {device_id} not found")
|
||
|
||
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
|
||
element_id = target_scada["associated_element_id"]
|
||
scada_type = target_scada["type"]
|
||
|
||
if scada_type.lower() == "pipe_flow":
|
||
# 查询 link 模拟数据
|
||
res = await SchemeRepository.get_link_field_by_scheme_and_time_range(
|
||
timescale_conn,
|
||
scheme_type,
|
||
scheme_name,
|
||
start_time,
|
||
end_time,
|
||
element_id,
|
||
"flow",
|
||
)
|
||
elif scada_type.lower() == "pressure":
|
||
# 查询 node 模拟数据
|
||
res = await SchemeRepository.get_node_field_by_scheme_and_time_range(
|
||
timescale_conn,
|
||
scheme_type,
|
||
scheme_name,
|
||
start_time,
|
||
end_time,
|
||
element_id,
|
||
"pressure",
|
||
)
|
||
else:
|
||
raise ValueError(f"Unknown SCADA type: {scada_type}")
|
||
results.append(res)
|
||
return results
|
||
|
||
@staticmethod
|
||
async def get_element_associated_scada_data(
|
||
timescale_conn: AsyncConnection,
|
||
postgres_conn: AsyncConnection,
|
||
element_id: str,
|
||
start_time: datetime,
|
||
end_time: datetime,
|
||
use_cleaned: bool = False,
|
||
) -> Optional[Any]:
|
||
"""
|
||
获取 link/node 关联的 SCADA 监测值
|
||
|
||
根据传入的 link/node id,匹配 SCADA 信息,
|
||
如果存在关联的 SCADA device_id,获取实际的监测数据
|
||
|
||
Args:
|
||
timescale_conn: TimescaleDB 异步连接
|
||
postgres_conn: PostgreSQL 异步连接
|
||
element_id: link 或 node 的 ID
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value")
|
||
|
||
Returns:
|
||
SCADA 监测数据值,如果没有找到则返回 None
|
||
|
||
Raises:
|
||
ValueError: 当元素类型无效时
|
||
"""
|
||
|
||
# 1. 查询所有 SCADA 信息
|
||
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
|
||
|
||
# 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备
|
||
associated_scada = None
|
||
for scada in scada_infos:
|
||
if scada["associated_element_id"] == element_id:
|
||
associated_scada = scada
|
||
break
|
||
|
||
if not associated_scada:
|
||
# 没有找到关联的 SCADA 设备
|
||
return None
|
||
|
||
# 3. 通过 SCADA device_id 获取监测数据
|
||
device_id = associated_scada["id"]
|
||
|
||
# 根据 use_cleaned 参数选择字段
|
||
data_field = "cleaned_value" if use_cleaned else "monitored_value"
|
||
|
||
return await ScadaRepository.get_scada_field_by_id_time_range(
|
||
timescale_conn, device_id, start_time, end_time, data_field
|
||
)
|
||
|
||
@staticmethod
|
||
async def clean_scada_data(
|
||
timescale_conn: AsyncConnection,
|
||
postgres_conn: AsyncConnection,
|
||
device_ids: List[str],
|
||
start_time: datetime,
|
||
end_time: datetime,
|
||
) -> str:
|
||
"""
|
||
清洗 SCADA 数据
|
||
|
||
根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value
|
||
|
||
Args:
|
||
timescale_conn: TimescaleDB 连接
|
||
postgres_conn: PostgreSQL 连接
|
||
device_ids: 设备 ID 列表
|
||
start_time: 开始时间
|
||
end_time: 结束时间
|
||
|
||
Returns:
|
||
"success" 或错误信息
|
||
"""
|
||
try:
|
||
# 获取所有 SCADA 信息
|
||
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
|
||
# 将列表转换为字典,以 device_id 为键
|
||
scada_device_info_dict = {info["id"]: info for info in scada_infos}
|
||
|
||
# 如果 device_ids 为空,则处理所有 SCADA 设备
|
||
if not device_ids:
|
||
device_ids = list(scada_device_info_dict.keys())
|
||
|
||
# 批量查询所有设备的数据
|
||
data = await ScadaRepository.get_scada_field_by_id_time_range(
|
||
timescale_conn, device_ids, start_time, end_time, "monitored_value"
|
||
)
|
||
|
||
if not data:
|
||
return "error: fetch none scada data" # 没有数据,直接返回
|
||
|
||
# 将嵌套字典转换为 DataFrame,使用 time 作为索引
|
||
# data 格式: {device_id: [{"time": "...", "value": ...}, ...]}
|
||
all_records = []
|
||
for device_id, records in data.items():
|
||
for record in records:
|
||
all_records.append(
|
||
{
|
||
"time": record["time"],
|
||
"device_id": device_id,
|
||
"value": record["value"],
|
||
}
|
||
)
|
||
|
||
if not all_records:
|
||
return "error: fetch none scada data" # 没有数据,直接返回
|
||
|
||
# 创建 DataFrame 并透视,使 device_id 成为列
|
||
df_long = pd.DataFrame(all_records)
|
||
df = df_long.pivot(index="time", columns="device_id", values="value")
|
||
|
||
# 根据type分类设备
|
||
pressure_ids = [
|
||
id
|
||
for id in df.columns
|
||
if scada_device_info_dict.get(id, {}).get("type") == "pressure"
|
||
]
|
||
flow_ids = [
|
||
id
|
||
for id in df.columns
|
||
if scada_device_info_dict.get(id, {}).get("type") == "pipe_flow"
|
||
]
|
||
|
||
# 处理pressure数据
|
||
# if pressure_ids:
|
||
# pressure_df = df[pressure_ids]
|
||
# # 重置索引,将 time 变为普通列
|
||
# pressure_df = pressure_df.reset_index()
|
||
# # 移除 time 列,准备输入给清洗方法
|
||
# value_df = pressure_df.drop(columns=["time"])
|
||
# # 调用清洗方法
|
||
# cleaned_value_df = clean_pressure_data_df_km(value_df)
|
||
# # 添加 time 列到首列
|
||
# cleaned_df = pd.concat([pressure_df["time"], cleaned_value_df], axis=1)
|
||
# # 将清洗后的数据写回数据库
|
||
# for device_id in pressure_ids:
|
||
# if device_id in cleaned_df.columns:
|
||
# cleaned_values = cleaned_df[device_id].tolist()
|
||
# time_values = cleaned_df["time"].tolist()
|
||
# for i, time_str in enumerate(time_values):
|
||
# time_dt = datetime.fromisoformat(time_str)
|
||
# value = cleaned_values[i]
|
||
# await ScadaRepository.update_scada_field(
|
||
# timescale_conn,
|
||
# time_dt,
|
||
# device_id,
|
||
# "cleaned_value",
|
||
# value,
|
||
# )
|
||
|
||
# 处理flow数据
|
||
if flow_ids:
|
||
flow_df = df[flow_ids]
|
||
# 重置索引,将 time 变为普通列
|
||
flow_df = flow_df.reset_index()
|
||
# 移除 time 列,准备输入给清洗方法
|
||
value_df = flow_df.drop(columns=["time"])
|
||
# 调用清洗方法
|
||
cleaned_value_df = clean_flow_data_df_kf(value_df)
|
||
# 添加 time 列到首列
|
||
cleaned_df = pd.concat([flow_df["time"], cleaned_value_df], axis=1)
|
||
# 将清洗后的数据写回数据库
|
||
for device_id in flow_ids:
|
||
if device_id in cleaned_df.columns:
|
||
cleaned_values = cleaned_df[device_id].tolist()
|
||
time_values = cleaned_df["time"].tolist()
|
||
for i, time_str in enumerate(time_values):
|
||
time_dt = datetime.fromisoformat(time_str)
|
||
value = cleaned_values[i]
|
||
await ScadaRepository.update_scada_field(
|
||
timescale_conn,
|
||
time_dt,
|
||
device_id,
|
||
"cleaned_value",
|
||
value,
|
||
)
|
||
|
||
return "success"
|
||
except Exception as e:
|
||
return f"error: {str(e)}"
|