from typing import List, Optional, Any from datetime import datetime from psycopg import AsyncConnection import pandas as pd from api_ex.Fdataclean import clean_flow_data_df_kf from api_ex.Pdataclean import clean_pressure_data_df_km from postgresql.scada_info import ScadaRepository as PostgreScadaRepository from timescaledb.schemas.realtime import RealtimeRepository from timescaledb.schemas.scheme import SchemeRepository from timescaledb.schemas.scada import ScadaRepository class CompositeQueries: """ 复合查询类,提供跨表查询功能 """ @staticmethod async def get_scada_associated_realtime_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await RealtimeRepository.get_link_field_by_time_range( timescale_conn, start_time, end_time, element_id, "flow" ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await RealtimeRepository.get_node_field_by_time_range( timescale_conn, start_time, end_time, element_id, "pressure" ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") results.append(res) return results @staticmethod async def get_scada_associated_scheme_simulation_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, scheme_type: str, scheme_name: str, ) -> List[Optional[Any]]: """ 获取 SCADA 关联的 link/node 模拟值 根据传入的 SCADA device_ids,找到关联的 link/node, 并根据对应的 type,查询对应的模拟数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 device_ids: SCADA 设备ID列表 start_time: 开始时间 end_time: 结束时间 field: 要查询的字段名 Returns: 模拟数据值列表,如果没有找到则对应位置返回 None Raises: ValueError: 当 SCADA 设备未找到或字段无效时 """ results = [] # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) for device_id in device_ids: # 2. 根据 device_id 找到对应的 SCADA 信息 target_scada = None for scada in scada_infos: if scada["id"] == device_id: target_scada = scada break if not target_scada: raise ValueError(f"SCADA device {device_id} not found") # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 element_id = target_scada["associated_element_id"] scada_type = target_scada["type"] if scada_type.lower() == "pipe_flow": # 查询 link 模拟数据 res = await SchemeRepository.get_link_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "flow", ) elif scada_type.lower() == "pressure": # 查询 node 模拟数据 res = await SchemeRepository.get_node_field_by_scheme_and_time_range( timescale_conn, scheme_type, scheme_name, start_time, end_time, element_id, "pressure", ) else: raise ValueError(f"Unknown SCADA type: {scada_type}") results.append(res) return results @staticmethod async def get_element_associated_scada_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, element_id: str, start_time: datetime, end_time: datetime, use_cleaned: bool = False, ) -> Optional[Any]: """ 获取 link/node 关联的 SCADA 监测值 根据传入的 link/node id,匹配 SCADA 信息, 如果存在关联的 SCADA device_id,获取实际的监测数据 Args: timescale_conn: TimescaleDB 异步连接 postgres_conn: PostgreSQL 异步连接 element_id: link 或 node 的 ID start_time: 开始时间 end_time: 结束时间 use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value") Returns: SCADA 监测数据值,如果没有找到则返回 None Raises: ValueError: 当元素类型无效时 """ # 1. 查询所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) # 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备 associated_scada = None for scada in scada_infos: if scada["associated_element_id"] == element_id: associated_scada = scada break if not associated_scada: # 没有找到关联的 SCADA 设备 return None # 3. 通过 SCADA device_id 获取监测数据 device_id = associated_scada["id"] # 根据 use_cleaned 参数选择字段 data_field = "cleaned_value" if use_cleaned else "monitored_value" return await ScadaRepository.get_scada_field_by_id_time_range( timescale_conn, device_id, start_time, end_time, data_field ) @staticmethod async def clean_scada_data( timescale_conn: AsyncConnection, postgres_conn: AsyncConnection, device_ids: List[str], start_time: datetime, end_time: datetime, ) -> str: """ 清洗 SCADA 数据 根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value Args: timescale_conn: TimescaleDB 连接 postgres_conn: PostgreSQL 连接 device_ids: 设备 ID 列表 start_time: 开始时间 end_time: 结束时间 Returns: "success" 或错误信息 """ try: # 获取所有 SCADA 信息 scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) # 将列表转换为字典,以 device_id 为键 scada_device_info_dict = {info["id"]: info for info in scada_infos} # 如果 device_ids 为空,则处理所有 SCADA 设备 if not device_ids: device_ids = list(scada_device_info_dict.keys()) # 批量查询所有设备的数据 data = await ScadaRepository.get_scada_field_by_id_time_range( timescale_conn, device_ids, start_time, end_time, "monitored_value" ) if not data: return "error: fetch none scada data" # 没有数据,直接返回 # 将嵌套字典转换为 DataFrame,使用 time 作为索引 # data 格式: {device_id: [{"time": "...", "value": ...}, ...]} all_records = [] for device_id, records in data.items(): for record in records: all_records.append( { "time": record["time"], "device_id": device_id, "value": record["value"], } ) if not all_records: return "error: fetch none scada data" # 没有数据,直接返回 # 创建 DataFrame 并透视,使 device_id 成为列 df_long = pd.DataFrame(all_records) df = df_long.pivot(index="time", columns="device_id", values="value") # 根据type分类设备 pressure_ids = [ id for id in df.columns if scada_device_info_dict.get(id, {}).get("type") == "pressure" ] flow_ids = [ id for id in df.columns if scada_device_info_dict.get(id, {}).get("type") == "pipe_flow" ] # 处理pressure数据 # if pressure_ids: # pressure_df = df[pressure_ids] # # 重置索引,将 time 变为普通列 # pressure_df = pressure_df.reset_index() # # 移除 time 列,准备输入给清洗方法 # value_df = pressure_df.drop(columns=["time"]) # # 调用清洗方法 # cleaned_value_df = clean_pressure_data_df_km(value_df) # # 添加 time 列到首列 # cleaned_df = pd.concat([pressure_df["time"], cleaned_value_df], axis=1) # # 将清洗后的数据写回数据库 # for device_id in pressure_ids: # if device_id in cleaned_df.columns: # cleaned_values = cleaned_df[device_id].tolist() # time_values = cleaned_df["time"].tolist() # for i, time_str in enumerate(time_values): # time_dt = datetime.fromisoformat(time_str) # value = cleaned_values[i] # await ScadaRepository.update_scada_field( # timescale_conn, # time_dt, # device_id, # "cleaned_value", # value, # ) # 处理flow数据 if flow_ids: flow_df = df[flow_ids] # 重置索引,将 time 变为普通列 flow_df = flow_df.reset_index() # 移除 time 列,准备输入给清洗方法 value_df = flow_df.drop(columns=["time"]) # 调用清洗方法 cleaned_value_df = clean_flow_data_df_kf(value_df) # 添加 time 列到首列 cleaned_df = pd.concat([flow_df["time"], cleaned_value_df], axis=1) # 将清洗后的数据写回数据库 for device_id in flow_ids: if device_id in cleaned_df.columns: cleaned_values = cleaned_df[device_id].tolist() time_values = cleaned_df["time"].tolist() for i, time_str in enumerate(time_values): time_dt = datetime.fromisoformat(time_str) value = cleaned_values[i] await ScadaRepository.update_scada_field( timescale_conn, time_dt, device_id, "cleaned_value", value, ) return "success" except Exception as e: return f"error: {str(e)}"