from typing import List, Optional, Any from datetime import datetime from psycopg import AsyncConnection import pandas as pd import api_ex from postgresql.scada_info import ScadaRepository as PostgreScadaRepository from timescaledb.schemas.realtime import RealtimeRepository from timescaledb.schemas.scheme import SchemeRepository from timescaledb.schemas.scada import ScadaRepository class CompositeQueries: """ 复合查询类,提供跨表查询功能 """ @staticmethod async def get_scada_associated_realtime_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await RealtimeRepository.get_link_field_by_time_range( timescale_conn, start_time, end_time, element_id, "flow" ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await RealtimeRepository.get_node_field_by_time_range( timescale_conn, start_time, end_time, element_id, "pressure" ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") results.append(res) return results @staticmethod async def get_scada_associated_scheme_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, scheme_type: str, scheme_name: str, ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await SchemeRepository.get_link_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "flow", ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await SchemeRepository.get_node_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "pressure", ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") results.append(res) return results @staticmethod async def get_element_associated_scada_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, element_id: str, start_time: datetime, end_time: datetime, use_cleaned: bool = False, ) -> Optional[Any]: """ 获取 link/node 关联的 SCADA 监测值 根据传入的 link/node id,匹配 SCADA 信息, 如果存在关联的 SCADA device_id,获取实际的监测数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 element_id: link 或 node 的 ID start_time: 开始时间 end_time: 结束时间 use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value") Returns: SCADA 监测数据值,如果没有找到则返回 None Raises: ValueError: 当元素类型无效时 """ # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) # 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备 associated_scada = None for scada in scada_infos: if scada["associated_element_id"] == element_id: associated_scada = scada break if not associated_scada: # 没有找到关联的 SCADA 设备 return None # 3. 通过 SCADA device_id 获取监测数据 device_id = associated_scada["id"] # 根据 use_cleaned 参数选择字段 data_field = "cleaned_value" if use_cleaned else "monitored_value" return await ScadaRepository.get_scada_field_by_id_time_range( timescale_conn, device_id, start_time, end_time, data_field ) @staticmethod async def clean_scada_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, ) -> str: """ 清洗 SCADA 数据 根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value Args: timescale_conn: TimescaleDB 连接 postgres_conn: PostgreSQL 连接 device_ids: 设备 ID 列表 start_time: 开始时间 end_time: 结束时间 Returns: "success" 或错误信息 """ try: # 获取所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) # 将列表转换为字典,以 device_id 为键 scada_device_info_dict = {info["id"]: info for info in scada_infos} # 按设备类型分组设备 type_groups = {} for device_id in device_ids: device_info = scada_device_info_dict.get(device_id, {}) device_type = device_info.get("type", "unknown") if device_type not in type_groups: type_groups[device_type] = [] type_groups[device_type].append(device_id) # 批量处理每种类型的设备 for device_type, ids in type_groups.items(): if device_type not in ["pressure", "pipe_flow"]: continue # 跳过未知类型 # 查询 monitored_value 数据 data = await ScadaRepository.get_scada_field_by_id_time_range( timescale_conn, ids, start_time, end_time, "monitored_value" ) if not data: continue # 将嵌套字典转换为 DataFrame,使用 time 作为索引 # data 格式: {device_id: [{"time": "...", "value": ...}, ...]} all_records = [] for device_id, records in data.items(): for record in records: all_records.append( { "time": record["time"], "device_id": device_id, "value": record["value"], } ) if not all_records: continue # 创建 DataFrame 并透视,使 device_id 成为列 df_long = pd.DataFrame(all_records) df = df_long.pivot(index="time", columns="device_id", values="value") # 确保所有请求的设备都在列中(即使没有数据) for device_id in ids: if device_id not in df.columns: df[device_id] = None # 只保留请求的设备列 df = df[ids] # 重置索引,将 time 变为普通列 df = df.reset_index() # 移除 time 列,准备输入给清洗方法 value_df = df.drop(columns=["time"]) # 调用清洗方法 if device_type == "pressure": cleaned_dict = api_ex.Pdataclean.clean_pressure_data_dict_km( value_df.to_dict(orient="list") ) elif device_type == "pipe_flow": cleaned_dict = api_ex.Fdataclean.clean_flow_data_dict( value_df.to_dict(orient="list") ) else: continue # 将字典转换为 DataFrame(字典键为设备ID,值为值列表) cleaned_value_df = pd.DataFrame(cleaned_dict) # 添加 time 列到首列 cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1) # 将清洗后的数据写回数据库 for device_id in ids: if device_id in cleaned_df.columns: cleaned_values = cleaned_df[device_id].tolist() time_values = cleaned_df["time"].tolist() for i, time_str in enumerate(time_values): # time_str 已经是 ISO 格式字符串 time_dt = datetime.fromisoformat(time_str) value = cleaned_values[i] await ScadaRepository.update_scada_field( timescale_conn, time_dt, device_id, "cleaned_value", value, ) return "success" except Exception as e: return f"error: {str(e)}"