From a0519295cbd3d3fa5c223e1a6d9f0dffb9392e40 Mon Sep 17 00:00:00 2001 From: JIANG Date: Wed, 12 Nov 2025 15:06:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=8E=8B=E5=8A=9B=E3=80=81?= =?UTF-8?q?=E6=B5=81=E9=87=8F=E6=95=B0=E6=8D=AE=E6=B8=85=E6=B4=97=E5=89=8D?= =?UTF-8?q?=E7=9A=84=E5=AF=B9=E5=BC=82=E5=B8=B8=E5=80=BC0=E7=9A=84?= =?UTF-8?q?=E9=A2=84=E5=A4=84=E7=90=86=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api_ex/Fdataclean.py | 186 +++++++++++++++++++++++++++++++------------ api_ex/Pdataclean.py | 54 +++++++++---- 2 files changed, 171 insertions(+), 69 deletions(-) diff --git a/api_ex/Fdataclean.py b/api_ex/Fdataclean.py index deb2048..28f95db 100644 --- a/api_ex/Fdataclean.py +++ b/api_ex/Fdataclean.py @@ -6,7 +6,6 @@ from pykalman import KalmanFilter import os - def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: """ 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 @@ -30,7 +29,7 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: initial_state_mean=float(obs[0]), initial_state_covariance=1, observation_covariance=1, - transition_covariance=0.01 + transition_covariance=0.01, ) # 跳过EM学习,使用固定参数以提高性能 state_means, _ = kf.smooth(obs) @@ -55,12 +54,14 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: sensor_residuals = residuals[col] anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper) anomaly_idx = data.index[anomaly_mask.fillna(False)] - anomalies_info[col] = pd.DataFrame({ - 'Observed': data.loc[anomaly_idx, col], - 'Kalman_Predicted': data_kf.loc[anomaly_idx, col], - 'Residual': sensor_residuals.loc[anomaly_idx] - }) - cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col] + anomalies_info[col] = pd.DataFrame( + { + "Observed": data.loc[anomaly_idx, col], + "Kalman_Predicted": data_kf.loc[anomaly_idx, col], + "Residual": sensor_residuals.loc[anomaly_idx], + } + ) + cleaned_data.loc[anomaly_idx, f"{col}_cleaned"] = data_kf.loc[anomaly_idx, col] # 构造输出文件名:在输入文件名基础上加后缀 _cleaned.xlsx input_dir = os.path.dirname(os.path.abspath(input_csv_path)) @@ -74,17 +75,38 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: cleaned_data.to_excel(output_path, index=False) # 可选可视化(第一个传感器) - plt.rcParams['font.sans-serif'] = ['SimHei'] - plt.rcParams['axes.unicode_minus'] = False + plt.rcParams["font.sans-serif"] = ["SimHei"] + plt.rcParams["axes.unicode_minus"] = False if show_plot and len(data.columns) > 0: sensor_to_plot = data.columns[0] plt.figure(figsize=(12, 6)) - plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7) - plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2) + plt.plot( + data.index, + data[sensor_to_plot], + label="监测值", + marker="o", + markersize=3, + alpha=0.7, + ) + plt.plot( + data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2 + ) anomaly_idx = anomalies_info[sensor_to_plot].index if len(anomaly_idx) > 0: - plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点") - plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值") + plt.plot( + anomaly_idx, + data[sensor_to_plot].loc[anomaly_idx], + "ro", + markersize=8, + label="监测值异常点", + ) + plt.plot( + anomaly_idx, + data_kf[sensor_to_plot].loc[anomaly_idx], + "go", + markersize=8, + label="Kalman修复值", + ) plt.xlabel("时间点(序号)") plt.ylabel("监测值") plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)") @@ -94,18 +116,29 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: # 返回输出文件的绝对路径 return os.path.abspath(output_path) + def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: """ - 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 - 返回清洗后的字典数据结构。 + 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 IQR 检测出的异常点。 + 区分合理的0值(流量转换)和异常的0值(连续多个0或孤立0)。 + 返回完整的清洗后的字典数据结构。 """ # 将字典转换为 DataFrame data = pd.DataFrame(data_dict) + # 替换0值,填充NaN值 + data_filled = data.replace(0, np.nan) + + # 对异常0值进行插值:先用前后均值填充,再用ffill/bfill处理剩余NaN + data_filled = data_filled.interpolate(method="linear", limit_direction="both") + + # 处理剩余的0值和NaN值 + data_filled = data_filled.ffill().bfill() + # 存储 Kalman 平滑结果 - data_kf = pd.DataFrame(index=data.index, columns=data.columns) + data_kf = pd.DataFrame(index=data_filled.index, columns=data_filled.columns) # 平滑每一列 - for col in data.columns: - observations = pd.Series(data[col].values).ffill().bfill() + for col in data_filled.columns: + observations = pd.Series(data_filled[col].values).ffill().bfill() if observations.isna().any(): observations = observations.fillna(observations.mean()) obs = observations.values.astype(float) @@ -116,17 +149,16 @@ def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: initial_state_mean=float(obs[0]), initial_state_covariance=1, observation_covariance=10, - transition_covariance=10 + transition_covariance=10, ) - # 跳过EM学习,使用固定参数以提高性能 state_means, _ = kf.smooth(obs) data_kf[col] = state_means.flatten() - # 计算残差并用IQR检测异常(更稳健的方法) - residuals = data - data_kf + # 计算残差并用IQR检测异常 + residuals = data_filled - data_kf residual_thresholds = {} - for col in data.columns: - res_values = residuals[col].dropna().values # 移除NaN以计算IQR + for col in data_filled.columns: + res_values = residuals[col].dropna().values q1 = np.percentile(res_values, 25) q3 = np.percentile(res_values, 75) iqr = q3 - q1 @@ -134,40 +166,89 @@ def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: upper_threshold = q3 + 1.5 * iqr residual_thresholds[col] = (lower_threshold, upper_threshold) - cleaned_data = data.copy() + # 创建完整的修复数据 + cleaned_data = data_filled.copy() anomalies_info = {} - for col in data.columns: + + for col in data_filled.columns: lower, upper = residual_thresholds[col] sensor_residuals = residuals[col] anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper) - anomaly_idx = data.index[anomaly_mask.fillna(False)] - anomalies_info[col] = pd.DataFrame({ - 'Observed': data.loc[anomaly_idx, col], - 'Kalman_Predicted': data_kf.loc[anomaly_idx, col], - 'Residual': sensor_residuals.loc[anomaly_idx] - }) - cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col] + anomaly_idx = data_filled.index[anomaly_mask.fillna(False)] - # 可选可视化(第一个传感器) - plt.rcParams['font.sans-serif'] = ['SimHei'] - plt.rcParams['axes.unicode_minus'] = False + anomalies_info[col] = pd.DataFrame( + { + "Observed": data_filled.loc[anomaly_idx, col], + "Kalman_Predicted": data_kf.loc[anomaly_idx, col], + "Residual": sensor_residuals.loc[anomaly_idx], + } + ) + + # 直接在原列上替换异常值为 Kalman 预测值 + cleaned_data.loc[anomaly_idx, col] = data_kf.loc[anomaly_idx, col] + + # 可选可视化 + plt.rcParams["font.sans-serif"] = ["SimHei"] + plt.rcParams["axes.unicode_minus"] = False if show_plot and len(data.columns) > 0: sensor_to_plot = data.columns[0] - plt.figure(figsize=(12, 6)) - plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7) - plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2) + plt.figure(figsize=(12, 8)) + + plt.subplot(2, 1, 1) + plt.plot( + data.index, + data[sensor_to_plot], + label="原始监测值", + marker="o", + markersize=3, + alpha=0.7, + ) + abnormal_zero_idx = data.index[data_filled[sensor_to_plot].isna()] + if len(abnormal_zero_idx) > 0: + plt.plot( + abnormal_zero_idx, + data[sensor_to_plot].loc[abnormal_zero_idx], + "mo", + markersize=8, + label="异常0值", + ) + plt.plot( + data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2 + ) anomaly_idx = anomalies_info[sensor_to_plot].index if len(anomaly_idx) > 0: - plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点") - plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值") + plt.plot( + anomaly_idx, + data_filled[sensor_to_plot].loc[anomaly_idx], + "ro", + markersize=8, + label="IQR异常点", + ) plt.xlabel("时间点(序号)") - plt.ylabel("监测值") - plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)") + plt.ylabel("流量值") + plt.title(f"{sensor_to_plot}:原始数据与异常检测") plt.legend() + + plt.subplot(2, 1, 2) + plt.plot( + data.index, + cleaned_data[sensor_to_plot], + label="修复后监测值", + marker="o", + markersize=3, + color="green", + ) + plt.xlabel("时间点(序号)") + plt.ylabel("流量值") + plt.title(f"{sensor_to_plot}:修复后数据") + plt.legend() + + plt.tight_layout() plt.show() - # 返回清洗后的字典 - return cleaned_data.to_dict(orient='list') + # 返回完整的修复后字典 + return cleaned_data.to_dict(orient="list") + # # 测试 # if __name__ == "__main__": @@ -180,27 +261,28 @@ def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: # 测试 clean_flow_data_dict 函数 if __name__ == "__main__": import random + # 读取 szh_flow_scada.csv 文件 script_dir = os.path.dirname(os.path.abspath(__file__)) csv_path = os.path.join(script_dir, "szh_flow_scada.csv") data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8") - + # 排除 Time 列,随机选择 5 列 - columns_to_exclude = ['Time'] + columns_to_exclude = ["Time"] available_columns = [col for col in data.columns if col not in columns_to_exclude] selected_columns = random.sample(available_columns, 1) - + # 将选中的列转换为字典 data_dict = {col: data[col].tolist() for col in selected_columns} - + print("选中的列:", selected_columns) print("原始数据长度:", len(data_dict[selected_columns[0]])) - + # 调用函数进行清洗 cleaned_dict = clean_flow_data_dict(data_dict, show_plot=True) # 将清洗后的字典写回 CSV out_csv = os.path.join(script_dir, f"{selected_columns[0]}_clean.csv") - pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding='utf-8-sig') + pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding="utf-8-sig") print("已保存清洗结果到:", out_csv) print("清洗后的字典键:", list(cleaned_dict.keys())) print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]])) diff --git a/api_ex/Pdataclean.py b/api_ex/Pdataclean.py index a5c0bd1..b5e73f4 100644 --- a/api_ex/Pdataclean.py +++ b/api_ex/Pdataclean.py @@ -105,8 +105,15 @@ def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dic data = pd.DataFrame(data_dict) # 填充NaN值 data = data.ffill().bfill() - # 标准化 - data_norm = (data - data.mean()) / data.std() + # 异常值预处理 + # 将0值替换为NaN,然后用线性插值填充 + data_filled = data.replace(0, np.nan) + data_filled = data_filled.interpolate(method="linear", limit_direction="both") + # 如果仍有NaN(全为0的列),用前后值填充 + data_filled = data_filled.ffill().bfill() + + # 标准化(使用填充后的数据) + data_norm = (data_filled - data_filled.mean()) / data_filled.std() # 聚类与异常检测 k = 3 @@ -130,46 +137,59 @@ def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dic anomaly_details[data.index[pos]] = main_sensor # 修复:滚动平均(窗口可调) - data_rolled = data.rolling(window=13, center=True, min_periods=1).mean() - data_repaired = data.copy() + data_rolled = data_filled.rolling(window=13, center=True, min_periods=1).mean() + data_repaired = data_filled.copy() for pos in anomaly_pos: label = data.index[pos] sensor = anomaly_details[label] data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] # 可选可视化(使用位置作为 x 轴) - plt.rcParams['font.sans-serif'] = ['SimHei'] - plt.rcParams['axes.unicode_minus'] = False + plt.rcParams["font.sans-serif"] = ["SimHei"] + plt.rcParams["axes.unicode_minus"] = False if show_plot and len(data.columns) > 0: n = len(data) time = np.arange(n) plt.figure(figsize=(12, 8)) for col in data.columns: - plt.plot(time, data[col].values, marker='o', markersize=3, label=col) + plt.plot( + time, data[col].values, marker="o", markersize=3, label=col, alpha=0.5 + ) + for col in data_filled.columns: + plt.plot( + time, + data_filled[col].values, + marker="x", + markersize=3, + label=f"{col}_filled", + linestyle="--", + ) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] - plt.plot(pos, data.iloc[pos][sensor], 'ro', markersize=8) + plt.plot(pos, data_filled.iloc[pos][sensor], "ro", markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("压力监测值") - plt.title("各传感器折线图(红色标记主要异常点)") + plt.title("各传感器折线图(红色标记主要异常点,虚线为0值填充后)") plt.legend() plt.show() plt.figure(figsize=(12, 8)) for col in data_repaired.columns: - plt.plot(time, data_repaired[col].values, marker='o', markersize=3, label=col) + plt.plot( + time, data_repaired[col].values, marker="o", markersize=3, label=col + ) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] - plt.plot(pos, data_repaired.iloc[pos][sensor], 'go', markersize=8) - plt.xlabel("时间点(序号)") - plt.ylabel("修复后压力监测值") - plt.title("修复后各传感器折线图(绿色标记修复值)") - plt.legend() - plt.show() + plt.plot(pos, data_repaired.iloc[pos][sensor], "go", markersize=8) + plt.xlabel("时间点(序号)") + plt.ylabel("修复后压力监测值") + plt.title("修复后各传感器折线图(绿色标记修复值)") + plt.legend() + plt.show() # 返回清洗后的字典 - return data_repaired.to_dict(orient='list') + return data_repaired.to_dict(orient="list") # 测试