from typing import List, Optional, Dict, Any from datetime import datetime from psycopg import AsyncConnection from postgresql.scada_info import ScadaRepository as PostgreScadaRepository from timescaledb.schemas.realtime import RealtimeRepository from timescaledb.schemas.scheme import SchemeRepository from timescaledb.schemas.scada import ScadaRepository class CompositeQueries: """ 复合查询类,提供跨表查询功能 """ @staticmethod async def get_scada_associated_realtime_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await RealtimeRepository.get_link_field_by_time_range( timescale_conn, start_time, end_time, element_id, "flow" ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await RealtimeRepository.get_node_field_by_time_range( timescale_conn, start_time, end_time, element_id, "pressure" ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") results.append(res) return results @staticmethod async def get_scada_associated_scheme_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, scheme_type: str, scheme_name: str, ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await SchemeRepository.get_link_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "flow", ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await SchemeRepository.get_node_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "pressure", ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") results.append(res) return results @staticmethod async def get_element_associated_scada_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, element_id: str, start_time: datetime, end_time: datetime, use_cleaned: bool = False, ) -> Optional[Any]: """ 获取 link/node 关联的 SCADA 监测值 根据传入的 link/node id,匹配 SCADA 信息, 如果存在关联的 SCADA device_id,获取实际的监测数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 element_id: link 或 node 的 ID start_time: 开始时间 end_time: 结束时间 use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value") Returns: SCADA 监测数据值,如果没有找到则返回 None Raises: ValueError: 当元素类型无效时 """ # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) # 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备 associated_scada = None for scada in scada_infos: if scada["associated_element_id"] == element_id: associated_scada = scada break if not associated_scada: # 没有找到关联的 SCADA 设备 return None # 3. 通过 SCADA device_id 获取监测数据 device_id = associated_scada["id"] # 根据 use_cleaned 参数选择字段 data_field = "cleaned_value" if use_cleaned else "monitored_value" return await ScadaRepository.get_scada_field_by_id_time_range( timescale_conn, device_id, start_time, end_time, data_field )