import time from typing import List, Optional, Any, Dict, Tuple from datetime import datetime, timedelta from psycopg import AsyncConnection import pandas as pd import numpy as np from app.algorithms.cleaning.flow import clean_flow_data_df_kf from app.algorithms.cleaning.pressure import clean_pressure_data_df_km from app.algorithms.health.analyzer import PipelineHealthAnalyzer import app.native.wndb as wndb from app.infra.db.timescaledb.repositories.realtime import RealtimeRepository from app.infra.db.timescaledb.repositories.scheme import SchemeRepository from app.infra.db.timescaledb.repositories.scada import ScadaRepository from app.services import project_info class CompositeQueries: """ 复合查询类,提供跨表查询功能 """ @staticmethod async def get_scada_associated_realtime_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, ) -> Dict[str, List[Dict[str, Any]]]: """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 Returns: 模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ result = {} # 1. 查询所有 SCADA 信息 network_name = project_info.name scada_infos = wndb.get_all_scada_info(network_name) if network_name else [] for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await RealtimeRepository.get_link_field_by_time_range( timescale_conn, start_time, end_time, element_id, "flow" ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await RealtimeRepository.get_node_field_by_time_range( timescale_conn, start_time, end_time, element_id, "pressure" ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") # 添加 scada_id 到每个数据项 for item in res: item["scada_id"] = device_id result[device_id] = res return result @staticmethod async def get_scada_associated_scheme_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, scheme_type: str, scheme_name: str, ) -> Dict[str, List[Dict[str, Any]]]: """ 获取 SCADA 关联的 link/node scheme 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 Returns: 模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ result = {} # 1. 查询所有 SCADA 信息 network_name = project_info.name scada_infos = wndb.get_all_scada_info(network_name) if network_name else [] for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await SchemeRepository.get_link_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "flow", ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await SchemeRepository.get_node_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "pressure", ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") # 添加 scada_id 到每个数据项 for item in res: item["scada_id"] = device_id result[device_id] = res return result @staticmethod async def get_realtime_simulation_data( timescale_conn: AsyncConnection, featureInfos: List[Tuple[str, str]], start_time: datetime, end_time: datetime, ) -> Dict[str, List[Dict[str, Any]]]: """ 获取 link/node 模拟值 根据传入的 featureInfos,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 featureInfos: 传入的 feature 信息列表,包含 (element_id, type) start_time: 开始时间 end_time: 结束时间 Returns: 模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ result = {} for feature_id, type in featureInfos: if type.lower() == "pipe": # 查询 link 模拟数据 res = await RealtimeRepository.get_link_field_by_time_range( timescale_conn, start_time, end_time, feature_id, "flow" ) elif type.lower() == "junction": # 查询 node 模拟数据 res = await RealtimeRepository.get_node_field_by_time_range( timescale_conn, start_time, end_time, feature_id, "pressure" ) else: raise ValueError(f"Unknown type: {type}") # 添加 scada_id 到每个数据项 for item in res: item["feature_id"] = feature_id result[feature_id] = res return result @staticmethod async def get_scheme_simulation_data( timescale_conn: AsyncConnection, featureInfos: List[Tuple[str, str]], start_time: datetime, end_time: datetime, scheme_type: str, scheme_name: str, ) -> Dict[str, List[Dict[str, Any]]]: """ 获取 link/node scheme 模拟值 根据传入的 featureInfos,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 featureInfos: 传入的 feature 信息列表,包含 (element_id, type) start_time: 开始时间 end_time: 结束时间 scheme_type: 工况类型 scheme_name: 工况名称 Returns: 模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id Raises: ValueError: 当类型无效时 """ result = {} for feature_id, type in featureInfos: if type.lower() == "pipe": # 查询 link 模拟数据 res = await SchemeRepository.get_link_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, feature_id, "flow", ) elif type.lower() == "junction": # 查询 node 模拟数据 res = await SchemeRepository.get_node_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, feature_id, "pressure", ) else: raise ValueError(f"Unknown type: {type}") # 添加 feature_id 到每个数据项 for item in res: item["feature_id"] = feature_id result[feature_id] = res return result @staticmethod async def get_element_associated_scada_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, element_id: str, start_time: datetime, end_time: datetime, use_cleaned: bool = False, ) -> Optional[Any]: """ 获取 link/node 关联的 SCADA 监测值 根据传入的 link/node id,匹配 SCADA 信息, 如果存在关联的 SCADA device_id,获取实际的监测数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 element_id: link 或 node 的 ID start_time: 开始时间 end_time: 结束时间 use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value") Returns: SCADA 监测数据值,如果没有找到则返回 None Raises: ValueError: 当元素类型无效时 """ # 1. 查询所有 SCADA 信息 network_name = project_info.name scada_infos = wndb.get_all_scada_info(network_name) if network_name else [] # 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备 associated_scada = None for scada in scada_infos: if scada["associated_element_id"] == element_id: associated_scada = scada break if not associated_scada: # 没有找到关联的 SCADA 设备 return None # 3. 通过 SCADA device_id 获取监测数据 device_id = associated_scada["id"] # 根据 use_cleaned 参数选择字段 data_field = "cleaned_value" if use_cleaned else "monitored_value" # 保证 device_id 以列表形式传递 res = await ScadaRepository.get_scada_field_by_id_time_range( timescale_conn, [device_id], start_time, end_time, data_field ) # 将 device_id 替换为 element_id 返回 return {element_id: res.get(device_id, [])} @staticmethod async def clean_scada_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, ) -> str: """ 清洗 SCADA 数据 根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value Args: timescale_conn: TimescaleDB 连接 postgres_conn: PostgreSQL 连接 device_ids: 设备 ID 列表 start_time: 开始时间 end_time: 结束时间 Returns: "success" 或错误信息 """ try: # 获取所有 SCADA 信息 network_name = project_info.name scada_infos = wndb.get_all_scada_info(network_name) if network_name else [] # 将列表转换为字典,以 device_id 为键 scada_device_info_dict = {info["id"]: info for info in scada_infos} # 如果 device_ids 为空,则处理所有 SCADA 设备 if not device_ids: device_ids = list(scada_device_info_dict.keys()) # 批量查询所有设备的数据 data = await ScadaRepository.get_scada_field_by_id_time_range( timescale_conn, device_ids, start_time, end_time, "monitored_value" ) if not data: return "error: fetch none scada data" # 没有数据,直接返回 # 将嵌套字典转换为 DataFrame,使用 time 作为索引 # data 格式: {device_id: [{"time": "...", "value": ...}, ...]} all_records = [] for device_id, records in data.items(): for record in records: all_records.append( { "time": record["time"], "device_id": device_id, "value": record["value"], } ) if not all_records: return "error: fetch none scada data" # 没有数据,直接返回 # 创建 DataFrame 并透视,使 device_id 成为列 df_long = pd.DataFrame(all_records) df = df_long.pivot(index="time", columns="device_id", values="value") # 根据type分类设备 pressure_ids = [ id for id in df.columns if scada_device_info_dict.get(id, {}).get("type") == "pressure" ] flow_ids = [ id for id in df.columns if scada_device_info_dict.get(id, {}).get("type") == "pipe_flow" ] # 处理pressure数据 if pressure_ids: pressure_df = df[pressure_ids] # 重置索引,将 time 变为普通列 pressure_df = pressure_df.reset_index() # 调用清洗方法 cleaned_df = clean_pressure_data_df_km(pressure_df) # 将清洗后的数据写回数据库 for device_id in pressure_ids: if device_id in cleaned_df.columns: cleaned_values = cleaned_df[device_id].tolist() time_values = cleaned_df["time"].tolist() for i, time_str in enumerate(time_values): time_dt = datetime.fromisoformat(time_str) value = cleaned_values[i] await ScadaRepository.update_scada_field( timescale_conn, time_dt, device_id, "cleaned_value", value, ) # 处理flow数据 if flow_ids: flow_df = df[flow_ids] # 重置索引,将 time 变为普通列 flow_df = flow_df.reset_index() # 调用清洗方法 cleaned_df = clean_flow_data_df_kf(flow_df) # 将清洗后的数据写回数据库 for device_id in flow_ids: if device_id in cleaned_df.columns: cleaned_values = cleaned_df[device_id].tolist() time_values = cleaned_df["time"].tolist() for i, time_str in enumerate(time_values): time_dt = datetime.fromisoformat(time_str) value = cleaned_values[i] await ScadaRepository.update_scada_field( timescale_conn, time_dt, device_id, "cleaned_value", value, ) return "success" except Exception as e: return f"error: {str(e)}" @staticmethod async def predict_pipeline_health( timescale_conn: AsyncConnection, network_name: str, query_time: datetime, ) -> List[Dict[str, Any]]: """ 预测管道健康状况 根据管网名称和当前时间,查询管道信息和实时数据, 使用随机生存森林模型预测管道的生存概率 Args: timescale_conn: TimescaleDB 异步连接 db_name: 管网数据库名称 query_time: 查询时间 property_conditions: 可选的管道筛选条件,如 {"diameter": 300} Returns: 预测结果列表,每个元素包含 link_id 和对应的生存函数 Raises: ValueError: 当参数无效或数据不足时 FileNotFoundError: 当模型文件未找到时 """ try: # 1. 准备时间范围(查询时间前后1秒) start_time = query_time - timedelta(seconds=1) end_time = query_time + timedelta(seconds=1) # 2. 先查询流速数据(velocity),获取有数据的管道ID列表 velocity_data = await RealtimeRepository.get_links_field_by_time_range( timescale_conn, start_time, end_time, "velocity" ) if not velocity_data: raise ValueError("未找到流速数据") # 3. 只查询有流速数据的管道的基本信息 valid_link_ids = list(velocity_data.keys()) # 批量查询这些管道的详细信息 fields = ["id", "diameter", "node1", "node2"] all_links = wndb.get_pipes_by_property(network_name, fields=fields) # 转换为字典以快速查找 links_dict = {link["id"]: link for link in all_links} # 获取所有需要查询的节点ID node_ids = set() for link_id in valid_link_ids: if link_id in links_dict: link = links_dict[link_id] node_ids.add(link["node1"]) node_ids.add(link["node2"]) # 4. 批量查询压力数据(pressure) pressure_data = await RealtimeRepository.get_nodes_field_by_time_range( timescale_conn, start_time, end_time, "pressure" ) # 5. 组合数据结构 materials = [] diameters = [] velocities = [] pressures = [] link_ids = [] for link_id in valid_link_ids: # 跳过不在管道字典中的ID(如泵等其他元素) if link_id not in links_dict: continue link = links_dict[link_id] diameter = link["diameter"] node1 = link["node1"] node2 = link["node2"] # 获取流速数据 velocity_values = velocity_data[link_id] velocity = velocity_values[-1]["value"] if velocity_values else 0 # 获取node1和node2的压力数据,计算平均值 node1_pressure = 0 node2_pressure = 0 if node1 in pressure_data and pressure_data[node1]: pressure_values = pressure_data[node1] node1_pressure = ( pressure_values[-1]["value"] if pressure_values else 0 ) if node2 in pressure_data and pressure_data[node2]: pressure_values = pressure_data[node2] node2_pressure = ( pressure_values[-1]["value"] if pressure_values else 0 ) # 计算平均压力 avg_pressure = (node1_pressure + node2_pressure) / 2 # 添加到列表 link_ids.append(link_id) materials.append(7) # 默认材料类型为7,可根据实际情况调整 diameters.append(diameter) velocities.append(velocity) pressures.append(avg_pressure) if not link_ids: raise ValueError("没有找到有效的管道数据用于预测") # 6. 创建DataFrame data = pd.DataFrame( { "Material": materials, "Diameter": diameters, "Flow Velocity": velocities, "Pressure": pressures, } ) # 7. 使用PipelineHealthAnalyzer进行预测 analyzer = PipelineHealthAnalyzer() survival_functions = analyzer.predict_survival(data) # 8. 组合结果 results = [] for i, link_id in enumerate(link_ids): sf = survival_functions[i] results.append( { "link_id": link_id, "survival_function": { "x": sf.x.tolist(), # 时间点(年) "y": sf.y.tolist(), # 生存概率 }, } ) return results except Exception as e: raise ValueError(f"管道健康预测失败: {str(e)}")