# ...existing code... import pandas as pd import numpy as np import matplotlib.pyplot as plt from pykalman import KalmanFilter import os def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: """ 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。 仅保留输入文件路径作为参数(按要求)。 """ # 读取 CSV data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8") # 存储 Kalman 平滑结果 data_kf = pd.DataFrame(index=data.index, columns=data.columns) # 平滑每一列 for col in data.columns: observations = pd.Series(data[col].values).ffill().bfill() if observations.isna().any(): observations = observations.fillna(observations.mean()) obs = observations.values.astype(float) kf = KalmanFilter( transition_matrices=[1], observation_matrices=[1], initial_state_mean=float(obs[0]), initial_state_covariance=1, observation_covariance=1, transition_covariance=0.01 ) # 跳过EM学习,使用固定参数以提高性能 state_means, _ = kf.smooth(obs) data_kf[col] = state_means.flatten() # 计算残差并用IQR检测异常(更稳健的方法) residuals = data - data_kf residual_thresholds = {} for col in data.columns: res_values = residuals[col].dropna().values # 移除NaN以计算IQR q1 = np.percentile(res_values, 25) q3 = np.percentile(res_values, 75) iqr = q3 - q1 lower_threshold = q1 - 1.5 * iqr upper_threshold = q3 + 1.5 * iqr residual_thresholds[col] = (lower_threshold, upper_threshold) cleaned_data = data.copy() anomalies_info = {} for col in data.columns: lower, upper = residual_thresholds[col] sensor_residuals = residuals[col] anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper) anomaly_idx = data.index[anomaly_mask.fillna(False)] anomalies_info[col] = pd.DataFrame({ 'Observed': data.loc[anomaly_idx, col], 'Kalman_Predicted': data_kf.loc[anomaly_idx, col], 'Residual': sensor_residuals.loc[anomaly_idx] }) cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col] # 构造输出文件名:在输入文件名基础上加后缀 _cleaned.xlsx input_dir = os.path.dirname(os.path.abspath(input_csv_path)) input_base = os.path.splitext(os.path.basename(input_csv_path))[0] output_filename = f"{input_base}_cleaned.xlsx" output_path = os.path.join(input_dir, output_filename) # 覆盖同名文件 if os.path.exists(output_path): os.remove(output_path) cleaned_data.to_excel(output_path, index=False) # 可选可视化(第一个传感器) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False if show_plot and len(data.columns) > 0: sensor_to_plot = data.columns[0] plt.figure(figsize=(12, 6)) plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7) plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2) anomaly_idx = anomalies_info[sensor_to_plot].index if len(anomaly_idx) > 0: plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点") plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值") plt.xlabel("时间点(序号)") plt.ylabel("监测值") plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)") plt.legend() plt.show() # 返回输出文件的绝对路径 return os.path.abspath(output_path) def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: """ 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 返回清洗后的字典数据结构。 """ # 将字典转换为 DataFrame data = pd.DataFrame(data_dict) # 存储 Kalman 平滑结果 data_kf = pd.DataFrame(index=data.index, columns=data.columns) # 平滑每一列 for col in data.columns: observations = pd.Series(data[col].values).ffill().bfill() if observations.isna().any(): observations = observations.fillna(observations.mean()) obs = observations.values.astype(float) kf = KalmanFilter( transition_matrices=[1], observation_matrices=[1], initial_state_mean=float(obs[0]), initial_state_covariance=1, observation_covariance=10, transition_covariance=10 ) # 跳过EM学习,使用固定参数以提高性能 state_means, _ = kf.smooth(obs) data_kf[col] = state_means.flatten() # 计算残差并用IQR检测异常(更稳健的方法) residuals = data - data_kf residual_thresholds = {} for col in data.columns: res_values = residuals[col].dropna().values # 移除NaN以计算IQR q1 = np.percentile(res_values, 25) q3 = np.percentile(res_values, 75) iqr = q3 - q1 lower_threshold = q1 - 1.5 * iqr upper_threshold = q3 + 1.5 * iqr residual_thresholds[col] = (lower_threshold, upper_threshold) cleaned_data = data.copy() anomalies_info = {} for col in data.columns: lower, upper = residual_thresholds[col] sensor_residuals = residuals[col] anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper) anomaly_idx = data.index[anomaly_mask.fillna(False)] anomalies_info[col] = pd.DataFrame({ 'Observed': data.loc[anomaly_idx, col], 'Kalman_Predicted': data_kf.loc[anomaly_idx, col], 'Residual': sensor_residuals.loc[anomaly_idx] }) cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col] # 可选可视化(第一个传感器) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False if show_plot and len(data.columns) > 0: sensor_to_plot = data.columns[0] plt.figure(figsize=(12, 6)) plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7) plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2) anomaly_idx = anomalies_info[sensor_to_plot].index if len(anomaly_idx) > 0: plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点") plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值") plt.xlabel("时间点(序号)") plt.ylabel("监测值") plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)") plt.legend() plt.show() # 返回清洗后的字典 return cleaned_data.to_dict(orient='list') # # 测试 # if __name__ == "__main__": # # 默认:脚本目录下同名 CSV 文件 # script_dir = os.path.dirname(os.path.abspath(__file__)) # default_csv = os.path.join(script_dir, "pipe_flow_data_to_clean2.0.csv") # out = clean_flow_data_kf(default_csv) # print("清洗后的数据已保存到:", out) # 测试 clean_flow_data_dict 函数 if __name__ == "__main__": import random # 读取 szh_flow_scada.csv 文件 script_dir = os.path.dirname(os.path.abspath(__file__)) csv_path = os.path.join(script_dir, "szh_flow_scada.csv") data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8") # 排除 Time 列,随机选择 5 列 columns_to_exclude = ['Time'] available_columns = [col for col in data.columns if col not in columns_to_exclude] selected_columns = random.sample(available_columns, 1) # 将选中的列转换为字典 data_dict = {col: data[col].tolist() for col in selected_columns} print("选中的列:", selected_columns) print("原始数据长度:", len(data_dict[selected_columns[0]])) # 调用函数进行清洗 cleaned_dict = clean_flow_data_dict(data_dict, show_plot=True) # 将清洗后的字典写回 CSV out_csv = os.path.join(script_dir, f"{selected_columns[0]}_clean.csv") pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding='utf-8-sig') print("已保存清洗结果到:", out_csv) print("清洗后的字典键:", list(cleaned_dict.keys())) print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]])) print("测试完成:函数运行正常")