import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.decomposition import PCA from sklearn.cluster import KMeans from sklearn.metrics import silhouette_score import os def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: """ 读取输入 CSV,基于 KMeans 检测异常并用滚动平均修复。输出为 _cleaned.xlsx(同目录)。 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 返回输出文件的绝对路径。 """ # 读取 CSV input_csv_path = os.path.abspath(input_csv_path) data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8") # 标准化 data_norm = (data - data.mean()) / data.std() # 聚类与异常检测 k = 3 kmeans = KMeans(n_clusters=k, init="k-means++", n_init=50, random_state=42) clusters = kmeans.fit_predict(data_norm) centers = kmeans.cluster_centers_ distances = np.linalg.norm(data_norm.values - centers[clusters], axis=1) threshold = distances.mean() + 3 * distances.std() anomaly_pos = np.where(distances > threshold)[0] anomaly_indices = data.index[anomaly_pos] anomaly_details = {} for pos in anomaly_pos: row_norm = data_norm.iloc[pos] cluster_idx = clusters[pos] center = centers[cluster_idx] diff = abs(row_norm - center) main_sensor = diff.idxmax() anomaly_details[data.index[pos]] = main_sensor # 修复:滚动平均(窗口可调) data_rolled = data.rolling(window=13, center=True, min_periods=1).mean() data_repaired = data.copy() for pos in anomaly_pos: label = data.index[pos] sensor = anomaly_details[label] data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] # 可选可视化(使用位置作为 x 轴) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['axes.unicode_minus'] = False if show_plot and len(data.columns) > 0: n = len(data) time = np.arange(n) plt.figure(figsize=(12, 8)) for col in data.columns: plt.plot(time, data[col].values, marker='o', markersize=3, label=col) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] plt.plot(pos, data.iloc[pos][sensor], 'ro', markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("压力监测值") plt.title("各传感器折线图(红色标记主要异常点)") plt.legend() plt.show() plt.figure(figsize=(12, 8)) for col in data_repaired.columns: plt.plot(time, data_repaired[col].values, marker='o', markersize=3, label=col) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] plt.plot(pos, data_repaired.iloc[pos][sensor], 'go', markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("修复后压力监测值") plt.title("修复后各传感器折线图(绿色标记修复值)") plt.legend() plt.show() # 保存到 Excel:两个 sheet input_dir = os.path.dirname(os.path.abspath(input_csv_path)) input_base = os.path.splitext(os.path.basename(input_csv_path))[0] output_filename = f"{input_base}_cleaned.xlsx" output_path = os.path.join(input_dir, output_filename) if os.path.exists(output_path): os.remove(output_path) # 覆盖同名文件 with pd.ExcelWriter(output_path, engine='openpyxl') as writer: data.to_excel(writer, sheet_name='raw_pressure_data', index=False) data_repaired.to_excel(writer, sheet_name='cleaned_pressusre_data', index=False) # 返回输出文件的绝对路径 return os.path.abspath(output_path) def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dict: """ 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用KMeans聚类检测异常并用滚动平均修复。 返回清洗后的字典数据结构。 """ # 将字典转换为 DataFrame data = pd.DataFrame(data_dict) # 填充NaN值 data = data.ffill().bfill() # 异常值预处理 # 将0值替换为NaN,然后用线性插值填充 data_filled = data.replace(0, np.nan) data_filled = data_filled.interpolate(method="linear", limit_direction="both") # 如果仍有NaN(全为0的列),用前后值填充 data_filled = data_filled.ffill().bfill() # 标准化(使用填充后的数据) data_norm = (data_filled - data_filled.mean()) / data_filled.std() # 聚类与异常检测 k = 3 kmeans = KMeans(n_clusters=k, init="k-means++", n_init=50, random_state=42) clusters = kmeans.fit_predict(data_norm) centers = kmeans.cluster_centers_ distances = np.linalg.norm(data_norm.values - centers[clusters], axis=1) threshold = distances.mean() + 3 * distances.std() anomaly_pos = np.where(distances > threshold)[0] anomaly_indices = data.index[anomaly_pos] anomaly_details = {} for pos in anomaly_pos: row_norm = data_norm.iloc[pos] cluster_idx = clusters[pos] center = centers[cluster_idx] diff = abs(row_norm - center) main_sensor = diff.idxmax() anomaly_details[data.index[pos]] = main_sensor # 修复:滚动平均(窗口可调) data_rolled = data_filled.rolling(window=13, center=True, min_periods=1).mean() data_repaired = data_filled.copy() for pos in anomaly_pos: label = data.index[pos] sensor = anomaly_details[label] data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] # 可选可视化(使用位置作为 x 轴) plt.rcParams["font.sans-serif"] = ["SimHei"] plt.rcParams["axes.unicode_minus"] = False if show_plot and len(data.columns) > 0: n = len(data) time = np.arange(n) plt.figure(figsize=(12, 8)) for col in data.columns: plt.plot( time, data[col].values, marker="o", markersize=3, label=col, alpha=0.5 ) for col in data_filled.columns: plt.plot( time, data_filled[col].values, marker="x", markersize=3, label=f"{col}_filled", linestyle="--", ) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] plt.plot(pos, data_filled.iloc[pos][sensor], "ro", markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("压力监测值") plt.title("各传感器折线图(红色标记主要异常点,虚线为0值填充后)") plt.legend() plt.show() plt.figure(figsize=(12, 8)) for col in data_repaired.columns: plt.plot( time, data_repaired[col].values, marker="o", markersize=3, label=col ) for pos in anomaly_pos: sensor = anomaly_details[data.index[pos]] plt.plot(pos, data_repaired.iloc[pos][sensor], "go", markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("修复后压力监测值") plt.title("修复后各传感器折线图(绿色标记修复值)") plt.legend() plt.show() # 返回清洗后的字典 return data_repaired.to_dict(orient="list") # 测试 # if __name__ == "__main__": # # 默认使用脚本目录下的 pressure_raw_data.csv # script_dir = os.path.dirname(os.path.abspath(__file__)) # default_csv = os.path.join(script_dir, "pressure_raw_data.csv") # out_path = clean_pressure_data_km(default_csv, show_plot=False) # print("保存路径:", out_path) # 测试 clean_pressure_data_dict_km 函数 if __name__ == "__main__": import random # 读取 szh_pressure_scada.csv 文件 script_dir = os.path.dirname(os.path.abspath(__file__)) csv_path = os.path.join(script_dir, "szh_pressure_scada.csv") data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8") # 排除 Time 列,随机选择 5 列 columns_to_exclude = ['Time'] available_columns = [col for col in data.columns if col not in columns_to_exclude] selected_columns = random.sample(available_columns, 5) # 将选中的列转换为字典 data_dict = {col: data[col].tolist() for col in selected_columns} print("选中的列:", selected_columns) print("原始数据长度:", len(data_dict[selected_columns[0]])) # 调用函数进行清洗 cleaned_dict = clean_pressure_data_dict_km(data_dict, show_plot=True) print("清洗后的字典键:", list(cleaned_dict.keys())) print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]])) print("测试完成:函数运行正常")