From 7426faab2cc021aaf743a11bd1cded8d382b8d67 Mon Sep 17 00:00:00 2001 From: JIANG Date: Fri, 12 Dec 2025 18:04:07 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0=20scada=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=B8=85=E6=B4=97=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api_ex/Fdataclean.py | 12 +-- api_ex/Pdataclean.py | 63 ++++++++------ timescaledb/composite_queries.py | 138 ++++++++++++++++--------------- timescaledb/router.py | 13 +-- timescaledb/schemas/realtime.py | 6 +- 5 files changed, 129 insertions(+), 103 deletions(-) diff --git a/api_ex/Fdataclean.py b/api_ex/Fdataclean.py index 28f95db..33228f3 100644 --- a/api_ex/Fdataclean.py +++ b/api_ex/Fdataclean.py @@ -117,14 +117,14 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: return os.path.abspath(output_path) -def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: +def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: """ - 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 IQR 检测出的异常点。 + 接收一个 DataFrame 数据结构,使用一维 Kalman 滤波平滑并用预测值替换基于 IQR 检测出的异常点。 区分合理的0值(流量转换)和异常的0值(连续多个0或孤立0)。 返回完整的清洗后的字典数据结构。 """ - # 将字典转换为 DataFrame - data = pd.DataFrame(data_dict) + # 使用传入的 DataFrame + data = data.copy() # 替换0值,填充NaN值 data_filled = data.replace(0, np.nan) @@ -247,7 +247,7 @@ def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: plt.show() # 返回完整的修复后字典 - return cleaned_data.to_dict(orient="list") + return cleaned_data # # 测试 @@ -279,7 +279,7 @@ if __name__ == "__main__": print("原始数据长度:", len(data_dict[selected_columns[0]])) # 调用函数进行清洗 - cleaned_dict = clean_flow_data_dict(data_dict, show_plot=True) + cleaned_dict = clean_flow_data_df_kf(data_dict, show_plot=True) # 将清洗后的字典写回 CSV out_csv = os.path.join(script_dir, f"{selected_columns[0]}_clean.csv") pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding="utf-8-sig") diff --git a/api_ex/Pdataclean.py b/api_ex/Pdataclean.py index b5e73f4..f62ecbe 100644 --- a/api_ex/Pdataclean.py +++ b/api_ex/Pdataclean.py @@ -1,13 +1,11 @@ import pandas as pd import numpy as np import matplotlib.pyplot as plt -from sklearn.decomposition import PCA from sklearn.cluster import KMeans -from sklearn.metrics import silhouette_score +from sklearn.impute import SimpleImputer import os - def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: """ 读取输入 CSV,基于 KMeans 检测异常并用滚动平均修复。输出为 _cleaned.xlsx(同目录)。 @@ -50,18 +48,18 @@ def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] # 可选可视化(使用位置作为 x 轴) - plt.rcParams['font.sans-serif'] = ['SimHei'] - plt.rcParams['axes.unicode_minus'] = False + plt.rcParams["font.sans-serif"] = ["SimHei"] + plt.rcParams["axes.unicode_minus"] = False if show_plot and len(data.columns) > 0: n = len(data) time = np.arange(n) plt.figure(figsize=(12, 8)) for col in data.columns: - plt.plot(time, data[col].values, marker='o', markersize=3, label=col) + plt.plot(time, data[col].values, marker="o", markersize=3, label=col) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] - plt.plot(pos, data.iloc[pos][sensor], 'ro', markersize=8) + plt.plot(pos, data.iloc[pos][sensor], "ro", markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("压力监测值") plt.title("各传感器折线图(红色标记主要异常点)") @@ -70,10 +68,12 @@ def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: plt.figure(figsize=(12, 8)) for col in data_repaired.columns: - plt.plot(time, data_repaired[col].values, marker='o', markersize=3, label=col) + plt.plot( + time, data_repaired[col].values, marker="o", markersize=3, label=col + ) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] - plt.plot(pos, data_repaired.iloc[pos][sensor], 'go', markersize=8) + plt.plot(pos, data_repaired.iloc[pos][sensor], "go", markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("修复后压力监测值") plt.title("修复后各传感器折线图(绿色标记修复值)") @@ -87,22 +87,22 @@ def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: output_path = os.path.join(input_dir, output_filename) if os.path.exists(output_path): - os.remove(output_path) # 覆盖同名文件 - with pd.ExcelWriter(output_path, engine='openpyxl') as writer: - data.to_excel(writer, sheet_name='raw_pressure_data', index=False) - data_repaired.to_excel(writer, sheet_name='cleaned_pressusre_data', index=False) - + os.remove(output_path) # 覆盖同名文件 + with pd.ExcelWriter(output_path, engine="openpyxl") as writer: + data.to_excel(writer, sheet_name="raw_pressure_data", index=False) + data_repaired.to_excel(writer, sheet_name="cleaned_pressusre_data", index=False) + # 返回输出文件的绝对路径 return os.path.abspath(output_path) -def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dict: +def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> dict: """ - 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用KMeans聚类检测异常并用滚动平均修复。 + 接收一个 DataFrame 数据结构,使用KMeans聚类检测异常并用滚动平均修复。 返回清洗后的字典数据结构。 """ - # 将字典转换为 DataFrame - data = pd.DataFrame(data_dict) + # 使用传入的 DataFrame + data = data.copy() # 填充NaN值 data = data.ffill().bfill() # 异常值预处理 @@ -115,6 +115,16 @@ def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dic # 标准化(使用填充后的数据) data_norm = (data_filled - data_filled.mean()) / data_filled.std() + # 添加:处理标准化后的 NaN(例如,标准差为0的列),防止异常数据,时间段内所有数据都相同导致计算结果为 NaN + imputer = SimpleImputer( + strategy="constant", fill_value=0, keep_empty_features=True + ) # 用 0 填充 NaN,包括全 NaN,并保留空特征 + data_norm = pd.DataFrame( + imputer.fit_transform(data_norm), + columns=data_norm.columns, + index=data_norm.index, + ) + # 聚类与异常检测 k = 3 kmeans = KMeans(n_clusters=k, init="k-means++", n_init=50, random_state=42) @@ -189,7 +199,7 @@ def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dic plt.show() # 返回清洗后的字典 - return data_repaired.to_dict(orient="list") + return data_repaired # 测试 @@ -203,25 +213,26 @@ def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dic # 测试 clean_pressure_data_dict_km 函数 if __name__ == "__main__": import random + # 读取 szh_pressure_scada.csv 文件 script_dir = os.path.dirname(os.path.abspath(__file__)) csv_path = os.path.join(script_dir, "szh_pressure_scada.csv") data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8") - + # 排除 Time 列,随机选择 5 列 - columns_to_exclude = ['Time'] + columns_to_exclude = ["Time"] available_columns = [col for col in data.columns if col not in columns_to_exclude] selected_columns = random.sample(available_columns, 5) - + # 将选中的列转换为字典 data_dict = {col: data[col].tolist() for col in selected_columns} - + print("选中的列:", selected_columns) print("原始数据长度:", len(data_dict[selected_columns[0]])) - + # 调用函数进行清洗 - cleaned_dict = clean_pressure_data_dict_km(data_dict, show_plot=True) - + cleaned_dict = clean_pressure_data_df_km(data_dict, show_plot=True) + print("清洗后的字典键:", list(cleaned_dict.keys())) print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]])) print("测试完成:函数运行正常") diff --git a/timescaledb/composite_queries.py b/timescaledb/composite_queries.py index 423118f..61de4c4 100644 --- a/timescaledb/composite_queries.py +++ b/timescaledb/composite_queries.py @@ -2,7 +2,8 @@ from typing import List, Optional, Any from datetime import datetime from psycopg import AsyncConnection import pandas as pd -import api_ex +from api_ex.Fdataclean import clean_flow_data_df_kf +from api_ex.Pdataclean import clean_pressure_data_df_km from postgresql.scada_info import ScadaRepository as PostgreScadaRepository from timescaledb.schemas.realtime import RealtimeRepository @@ -236,87 +237,94 @@ class CompositeQueries: # 将列表转换为字典,以 device_id 为键 scada_device_info_dict = {info["id"]: info for info in scada_infos} - # 按设备类型分组设备 - type_groups = {} - for device_id in device_ids: - device_info = scada_device_info_dict.get(device_id, {}) - device_type = device_info.get("type", "unknown") - if device_type not in type_groups: - type_groups[device_type] = [] - type_groups[device_type].append(device_id) + # 如果 device_ids 为空,则处理所有 SCADA 设备 + if not device_ids: + device_ids = list(scada_device_info_dict.keys()) - # 批量处理每种类型的设备 - for device_type, ids in type_groups.items(): - if device_type not in ["pressure", "pipe_flow"]: - continue # 跳过未知类型 + # 批量查询所有设备的数据 + data = await ScadaRepository.get_scada_field_by_id_time_range( + timescale_conn, device_ids, start_time, end_time, "monitored_value" + ) - # 查询 monitored_value 数据 - data = await ScadaRepository.get_scada_field_by_id_time_range( - timescale_conn, ids, start_time, end_time, "monitored_value" - ) + if not data: + return "error: fetch none scada data" # 没有数据,直接返回 - if not data: - continue + # 将嵌套字典转换为 DataFrame,使用 time 作为索引 + # data 格式: {device_id: [{"time": "...", "value": ...}, ...]} + all_records = [] + for device_id, records in data.items(): + for record in records: + all_records.append( + { + "time": record["time"], + "device_id": device_id, + "value": record["value"], + } + ) - # 将嵌套字典转换为 DataFrame,使用 time 作为索引 - # data 格式: {device_id: [{"time": "...", "value": ...}, ...]} - all_records = [] - for device_id, records in data.items(): - for record in records: - all_records.append( - { - "time": record["time"], - "device_id": device_id, - "value": record["value"], - } - ) + if not all_records: + return "error: fetch none scada data" # 没有数据,直接返回 - if not all_records: - continue + # 创建 DataFrame 并透视,使 device_id 成为列 + df_long = pd.DataFrame(all_records) + df = df_long.pivot(index="time", columns="device_id", values="value") - # 创建 DataFrame 并透视,使 device_id 成为列 - df_long = pd.DataFrame(all_records) - df = df_long.pivot(index="time", columns="device_id", values="value") + # 根据type分类设备 + pressure_ids = [ + id + for id in df.columns + if scada_device_info_dict.get(id, {}).get("type") == "pressure" + ] + flow_ids = [ + id + for id in df.columns + if scada_device_info_dict.get(id, {}).get("type") == "pipe_flow" + ] - # 确保所有请求的设备都在列中(即使没有数据) - for device_id in ids: - if device_id not in df.columns: - df[device_id] = None - - # 只保留请求的设备列 - df = df[ids] + # 处理pressure数据 + # if pressure_ids: + # pressure_df = df[pressure_ids] + # # 重置索引,将 time 变为普通列 + # pressure_df = pressure_df.reset_index() + # # 移除 time 列,准备输入给清洗方法 + # value_df = pressure_df.drop(columns=["time"]) + # # 调用清洗方法 + # cleaned_value_df = clean_pressure_data_df_km(value_df) + # # 添加 time 列到首列 + # cleaned_df = pd.concat([pressure_df["time"], cleaned_value_df], axis=1) + # # 将清洗后的数据写回数据库 + # for device_id in pressure_ids: + # if device_id in cleaned_df.columns: + # cleaned_values = cleaned_df[device_id].tolist() + # time_values = cleaned_df["time"].tolist() + # for i, time_str in enumerate(time_values): + # time_dt = datetime.fromisoformat(time_str) + # value = cleaned_values[i] + # await ScadaRepository.update_scada_field( + # timescale_conn, + # time_dt, + # device_id, + # "cleaned_value", + # value, + # ) + # 处理flow数据 + if flow_ids: + flow_df = df[flow_ids] # 重置索引,将 time 变为普通列 - df = df.reset_index() - + flow_df = flow_df.reset_index() # 移除 time 列,准备输入给清洗方法 - value_df = df.drop(columns=["time"]) - + value_df = flow_df.drop(columns=["time"]) # 调用清洗方法 - if device_type == "pressure": - cleaned_dict = api_ex.Pdataclean.clean_pressure_data_dict_km( - value_df.to_dict(orient="list") - ) - elif device_type == "pipe_flow": - cleaned_dict = api_ex.Fdataclean.clean_flow_data_dict( - value_df.to_dict(orient="list") - ) - else: - continue - - # 将字典转换为 DataFrame(字典键为设备ID,值为值列表) - cleaned_value_df = pd.DataFrame(cleaned_dict) - + cleaned_value_df = clean_flow_data_df_kf(value_df) # 添加 time 列到首列 - cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1) - + cleaned_df = pd.concat([flow_df["time"], cleaned_value_df], axis=1) # 将清洗后的数据写回数据库 - for device_id in ids: + for device_id in flow_ids: if device_id in cleaned_df.columns: cleaned_values = cleaned_df[device_id].tolist() time_values = cleaned_df["time"].tolist() for i, time_str in enumerate(time_values): - # time_str 已经是 ISO 格式字符串 time_dt = datetime.fromisoformat(time_str) value = cleaned_values[i] await ScadaRepository.update_scada_field( diff --git a/timescaledb/router.py b/timescaledb/router.py index 7576b78..40ad3be 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -521,11 +521,14 @@ async def clean_scada_data( 根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value """ try: - device_ids_list = ( - [id.strip() for id in device_ids.split(",") if id.strip()] - if device_ids - else [] - ) + if device_ids == "all": + device_ids_list = [] + else: + device_ids_list = ( + [id.strip() for id in device_ids.split(",") if id.strip()] + if device_ids + else [] + ) return await CompositeQueries.clean_scada_data( timescale_conn, postgres_conn, device_ids_list, start_time, end_time ) diff --git a/timescaledb/schemas/realtime.py b/timescaledb/schemas/realtime.py index 00a2e28..06a32de 100644 --- a/timescaledb/schemas/realtime.py +++ b/timescaledb/schemas/realtime.py @@ -589,7 +589,11 @@ class RealtimeRepository: raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'") # Format the results - return [{"ID": item["id"], "value": item["value"]} for item in data] + result = [] + for id, items in data.items(): + for item in items: + result.append({"ID": id, "value": item["value"]}) + return result @staticmethod async def query_simulation_result_by_id_time(