Files
TJWaterServerBinary/api_ex/Fdataclean.py

290 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ...existing code...
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pykalman import KalmanFilter
import os
def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str:
"""
读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。
保存输出为:<input_filename>_cleaned.xlsx与输入同目录并返回输出文件的绝对路径。
仅保留输入文件路径作为参数(按要求)。
"""
# 读取 CSV
data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8")
# 存储 Kalman 平滑结果
data_kf = pd.DataFrame(index=data.index, columns=data.columns)
# 平滑每一列
for col in data.columns:
observations = pd.Series(data[col].values).ffill().bfill()
if observations.isna().any():
observations = observations.fillna(observations.mean())
obs = observations.values.astype(float)
kf = KalmanFilter(
transition_matrices=[1],
observation_matrices=[1],
initial_state_mean=float(obs[0]),
initial_state_covariance=1,
observation_covariance=1,
transition_covariance=0.01,
)
# 跳过EM学习使用固定参数以提高性能
state_means, _ = kf.smooth(obs)
data_kf[col] = state_means.flatten()
# 计算残差并用IQR检测异常更稳健的方法
residuals = data - data_kf
residual_thresholds = {}
for col in data.columns:
res_values = residuals[col].dropna().values # 移除NaN以计算IQR
q1 = np.percentile(res_values, 25)
q3 = np.percentile(res_values, 75)
iqr = q3 - q1
lower_threshold = q1 - 1.5 * iqr
upper_threshold = q3 + 1.5 * iqr
residual_thresholds[col] = (lower_threshold, upper_threshold)
cleaned_data = data.copy()
anomalies_info = {}
for col in data.columns:
lower, upper = residual_thresholds[col]
sensor_residuals = residuals[col]
anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper)
anomaly_idx = data.index[anomaly_mask.fillna(False)]
anomalies_info[col] = pd.DataFrame(
{
"Observed": data.loc[anomaly_idx, col],
"Kalman_Predicted": data_kf.loc[anomaly_idx, col],
"Residual": sensor_residuals.loc[anomaly_idx],
}
)
cleaned_data.loc[anomaly_idx, f"{col}_cleaned"] = data_kf.loc[anomaly_idx, col]
# 构造输出文件名:在输入文件名基础上加后缀 _cleaned.xlsx
input_dir = os.path.dirname(os.path.abspath(input_csv_path))
input_base = os.path.splitext(os.path.basename(input_csv_path))[0]
output_filename = f"{input_base}_cleaned.xlsx"
output_path = os.path.join(input_dir, output_filename)
# 覆盖同名文件
if os.path.exists(output_path):
os.remove(output_path)
cleaned_data.to_excel(output_path, index=False)
# 可选可视化(第一个传感器)
plt.rcParams["font.sans-serif"] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False
if show_plot and len(data.columns) > 0:
sensor_to_plot = data.columns[0]
plt.figure(figsize=(12, 6))
plt.plot(
data.index,
data[sensor_to_plot],
label="监测值",
marker="o",
markersize=3,
alpha=0.7,
)
plt.plot(
data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2
)
anomaly_idx = anomalies_info[sensor_to_plot].index
if len(anomaly_idx) > 0:
plt.plot(
anomaly_idx,
data[sensor_to_plot].loc[anomaly_idx],
"ro",
markersize=8,
label="监测值异常点",
)
plt.plot(
anomaly_idx,
data_kf[sensor_to_plot].loc[anomaly_idx],
"go",
markersize=8,
label="Kalman修复值",
)
plt.xlabel("时间点(序号)")
plt.ylabel("监测值")
plt.title(f"{sensor_to_plot}观测值与Kalman滤波预测值异常点标记")
plt.legend()
plt.show()
# 返回输出文件的绝对路径
return os.path.abspath(output_path)
def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict:
"""
接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 IQR 检测出的异常点。
区分合理的0值流量转换和异常的0值连续多个0或孤立0
返回完整的清洗后的字典数据结构。
"""
# 将字典转换为 DataFrame
data = pd.DataFrame(data_dict)
# 替换0值填充NaN值
data_filled = data.replace(0, np.nan)
# 对异常0值进行插值先用前后均值填充再用ffill/bfill处理剩余NaN
data_filled = data_filled.interpolate(method="linear", limit_direction="both")
# 处理剩余的0值和NaN值
data_filled = data_filled.ffill().bfill()
# 存储 Kalman 平滑结果
data_kf = pd.DataFrame(index=data_filled.index, columns=data_filled.columns)
# 平滑每一列
for col in data_filled.columns:
observations = pd.Series(data_filled[col].values).ffill().bfill()
if observations.isna().any():
observations = observations.fillna(observations.mean())
obs = observations.values.astype(float)
kf = KalmanFilter(
transition_matrices=[1],
observation_matrices=[1],
initial_state_mean=float(obs[0]),
initial_state_covariance=1,
observation_covariance=10,
transition_covariance=10,
)
state_means, _ = kf.smooth(obs)
data_kf[col] = state_means.flatten()
# 计算残差并用IQR检测异常
residuals = data_filled - data_kf
residual_thresholds = {}
for col in data_filled.columns:
res_values = residuals[col].dropna().values
q1 = np.percentile(res_values, 25)
q3 = np.percentile(res_values, 75)
iqr = q3 - q1
lower_threshold = q1 - 1.5 * iqr
upper_threshold = q3 + 1.5 * iqr
residual_thresholds[col] = (lower_threshold, upper_threshold)
# 创建完整的修复数据
cleaned_data = data_filled.copy()
anomalies_info = {}
for col in data_filled.columns:
lower, upper = residual_thresholds[col]
sensor_residuals = residuals[col]
anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper)
anomaly_idx = data_filled.index[anomaly_mask.fillna(False)]
anomalies_info[col] = pd.DataFrame(
{
"Observed": data_filled.loc[anomaly_idx, col],
"Kalman_Predicted": data_kf.loc[anomaly_idx, col],
"Residual": sensor_residuals.loc[anomaly_idx],
}
)
# 直接在原列上替换异常值为 Kalman 预测值
cleaned_data.loc[anomaly_idx, col] = data_kf.loc[anomaly_idx, col]
# 可选可视化
plt.rcParams["font.sans-serif"] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False
if show_plot and len(data.columns) > 0:
sensor_to_plot = data.columns[0]
plt.figure(figsize=(12, 8))
plt.subplot(2, 1, 1)
plt.plot(
data.index,
data[sensor_to_plot],
label="原始监测值",
marker="o",
markersize=3,
alpha=0.7,
)
abnormal_zero_idx = data.index[data_filled[sensor_to_plot].isna()]
if len(abnormal_zero_idx) > 0:
plt.plot(
abnormal_zero_idx,
data[sensor_to_plot].loc[abnormal_zero_idx],
"mo",
markersize=8,
label="异常0值",
)
plt.plot(
data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2
)
anomaly_idx = anomalies_info[sensor_to_plot].index
if len(anomaly_idx) > 0:
plt.plot(
anomaly_idx,
data_filled[sensor_to_plot].loc[anomaly_idx],
"ro",
markersize=8,
label="IQR异常点",
)
plt.xlabel("时间点(序号)")
plt.ylabel("流量值")
plt.title(f"{sensor_to_plot}:原始数据与异常检测")
plt.legend()
plt.subplot(2, 1, 2)
plt.plot(
data.index,
cleaned_data[sensor_to_plot],
label="修复后监测值",
marker="o",
markersize=3,
color="green",
)
plt.xlabel("时间点(序号)")
plt.ylabel("流量值")
plt.title(f"{sensor_to_plot}:修复后数据")
plt.legend()
plt.tight_layout()
plt.show()
# 返回完整的修复后字典
return cleaned_data.to_dict(orient="list")
# # 测试
# if __name__ == "__main__":
# # 默认:脚本目录下同名 CSV 文件
# script_dir = os.path.dirname(os.path.abspath(__file__))
# default_csv = os.path.join(script_dir, "pipe_flow_data_to_clean2.0.csv")
# out = clean_flow_data_kf(default_csv)
# print("清洗后的数据已保存到:", out)
# 测试 clean_flow_data_dict 函数
if __name__ == "__main__":
import random
# 读取 szh_flow_scada.csv 文件
script_dir = os.path.dirname(os.path.abspath(__file__))
csv_path = os.path.join(script_dir, "szh_flow_scada.csv")
data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8")
# 排除 Time 列,随机选择 5 列
columns_to_exclude = ["Time"]
available_columns = [col for col in data.columns if col not in columns_to_exclude]
selected_columns = random.sample(available_columns, 1)
# 将选中的列转换为字典
data_dict = {col: data[col].tolist() for col in selected_columns}
print("选中的列:", selected_columns)
print("原始数据长度:", len(data_dict[selected_columns[0]]))
# 调用函数进行清洗
cleaned_dict = clean_flow_data_dict(data_dict, show_plot=True)
# 将清洗后的字典写回 CSV
out_csv = os.path.join(script_dir, f"{selected_columns[0]}_clean.csv")
pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding="utf-8-sig")
print("已保存清洗结果到:", out_csv)
print("清洗后的字典键:", list(cleaned_dict.keys()))
print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]]))
print("测试完成:函数运行正常")