新增压力、流量数据清洗前的对异常值0的预处理方法
This commit is contained in:
@@ -6,7 +6,6 @@ from pykalman import KalmanFilter
|
||||
import os
|
||||
|
||||
|
||||
|
||||
def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str:
|
||||
"""
|
||||
读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。
|
||||
@@ -30,7 +29,7 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str:
|
||||
initial_state_mean=float(obs[0]),
|
||||
initial_state_covariance=1,
|
||||
observation_covariance=1,
|
||||
transition_covariance=0.01
|
||||
transition_covariance=0.01,
|
||||
)
|
||||
# 跳过EM学习,使用固定参数以提高性能
|
||||
state_means, _ = kf.smooth(obs)
|
||||
@@ -55,12 +54,14 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str:
|
||||
sensor_residuals = residuals[col]
|
||||
anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper)
|
||||
anomaly_idx = data.index[anomaly_mask.fillna(False)]
|
||||
anomalies_info[col] = pd.DataFrame({
|
||||
'Observed': data.loc[anomaly_idx, col],
|
||||
'Kalman_Predicted': data_kf.loc[anomaly_idx, col],
|
||||
'Residual': sensor_residuals.loc[anomaly_idx]
|
||||
})
|
||||
cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col]
|
||||
anomalies_info[col] = pd.DataFrame(
|
||||
{
|
||||
"Observed": data.loc[anomaly_idx, col],
|
||||
"Kalman_Predicted": data_kf.loc[anomaly_idx, col],
|
||||
"Residual": sensor_residuals.loc[anomaly_idx],
|
||||
}
|
||||
)
|
||||
cleaned_data.loc[anomaly_idx, f"{col}_cleaned"] = data_kf.loc[anomaly_idx, col]
|
||||
|
||||
# 构造输出文件名:在输入文件名基础上加后缀 _cleaned.xlsx
|
||||
input_dir = os.path.dirname(os.path.abspath(input_csv_path))
|
||||
@@ -74,17 +75,38 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str:
|
||||
cleaned_data.to_excel(output_path, index=False)
|
||||
|
||||
# 可选可视化(第一个传感器)
|
||||
plt.rcParams['font.sans-serif'] = ['SimHei']
|
||||
plt.rcParams['axes.unicode_minus'] = False
|
||||
plt.rcParams["font.sans-serif"] = ["SimHei"]
|
||||
plt.rcParams["axes.unicode_minus"] = False
|
||||
if show_plot and len(data.columns) > 0:
|
||||
sensor_to_plot = data.columns[0]
|
||||
plt.figure(figsize=(12, 6))
|
||||
plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7)
|
||||
plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2)
|
||||
plt.plot(
|
||||
data.index,
|
||||
data[sensor_to_plot],
|
||||
label="监测值",
|
||||
marker="o",
|
||||
markersize=3,
|
||||
alpha=0.7,
|
||||
)
|
||||
plt.plot(
|
||||
data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2
|
||||
)
|
||||
anomaly_idx = anomalies_info[sensor_to_plot].index
|
||||
if len(anomaly_idx) > 0:
|
||||
plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点")
|
||||
plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值")
|
||||
plt.plot(
|
||||
anomaly_idx,
|
||||
data[sensor_to_plot].loc[anomaly_idx],
|
||||
"ro",
|
||||
markersize=8,
|
||||
label="监测值异常点",
|
||||
)
|
||||
plt.plot(
|
||||
anomaly_idx,
|
||||
data_kf[sensor_to_plot].loc[anomaly_idx],
|
||||
"go",
|
||||
markersize=8,
|
||||
label="Kalman修复值",
|
||||
)
|
||||
plt.xlabel("时间点(序号)")
|
||||
plt.ylabel("监测值")
|
||||
plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)")
|
||||
@@ -94,18 +116,29 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str:
|
||||
# 返回输出文件的绝对路径
|
||||
return os.path.abspath(output_path)
|
||||
|
||||
|
||||
def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict:
|
||||
"""
|
||||
接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。
|
||||
返回清洗后的字典数据结构。
|
||||
接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 IQR 检测出的异常点。
|
||||
区分合理的0值(流量转换)和异常的0值(连续多个0或孤立0)。
|
||||
返回完整的清洗后的字典数据结构。
|
||||
"""
|
||||
# 将字典转换为 DataFrame
|
||||
data = pd.DataFrame(data_dict)
|
||||
# 替换0值,填充NaN值
|
||||
data_filled = data.replace(0, np.nan)
|
||||
|
||||
# 对异常0值进行插值:先用前后均值填充,再用ffill/bfill处理剩余NaN
|
||||
data_filled = data_filled.interpolate(method="linear", limit_direction="both")
|
||||
|
||||
# 处理剩余的0值和NaN值
|
||||
data_filled = data_filled.ffill().bfill()
|
||||
|
||||
# 存储 Kalman 平滑结果
|
||||
data_kf = pd.DataFrame(index=data.index, columns=data.columns)
|
||||
data_kf = pd.DataFrame(index=data_filled.index, columns=data_filled.columns)
|
||||
# 平滑每一列
|
||||
for col in data.columns:
|
||||
observations = pd.Series(data[col].values).ffill().bfill()
|
||||
for col in data_filled.columns:
|
||||
observations = pd.Series(data_filled[col].values).ffill().bfill()
|
||||
if observations.isna().any():
|
||||
observations = observations.fillna(observations.mean())
|
||||
obs = observations.values.astype(float)
|
||||
@@ -116,17 +149,16 @@ def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict:
|
||||
initial_state_mean=float(obs[0]),
|
||||
initial_state_covariance=1,
|
||||
observation_covariance=10,
|
||||
transition_covariance=10
|
||||
transition_covariance=10,
|
||||
)
|
||||
# 跳过EM学习,使用固定参数以提高性能
|
||||
state_means, _ = kf.smooth(obs)
|
||||
data_kf[col] = state_means.flatten()
|
||||
|
||||
# 计算残差并用IQR检测异常(更稳健的方法)
|
||||
residuals = data - data_kf
|
||||
# 计算残差并用IQR检测异常
|
||||
residuals = data_filled - data_kf
|
||||
residual_thresholds = {}
|
||||
for col in data.columns:
|
||||
res_values = residuals[col].dropna().values # 移除NaN以计算IQR
|
||||
for col in data_filled.columns:
|
||||
res_values = residuals[col].dropna().values
|
||||
q1 = np.percentile(res_values, 25)
|
||||
q3 = np.percentile(res_values, 75)
|
||||
iqr = q3 - q1
|
||||
@@ -134,40 +166,89 @@ def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict:
|
||||
upper_threshold = q3 + 1.5 * iqr
|
||||
residual_thresholds[col] = (lower_threshold, upper_threshold)
|
||||
|
||||
cleaned_data = data.copy()
|
||||
# 创建完整的修复数据
|
||||
cleaned_data = data_filled.copy()
|
||||
anomalies_info = {}
|
||||
for col in data.columns:
|
||||
|
||||
for col in data_filled.columns:
|
||||
lower, upper = residual_thresholds[col]
|
||||
sensor_residuals = residuals[col]
|
||||
anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper)
|
||||
anomaly_idx = data.index[anomaly_mask.fillna(False)]
|
||||
anomalies_info[col] = pd.DataFrame({
|
||||
'Observed': data.loc[anomaly_idx, col],
|
||||
'Kalman_Predicted': data_kf.loc[anomaly_idx, col],
|
||||
'Residual': sensor_residuals.loc[anomaly_idx]
|
||||
})
|
||||
cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col]
|
||||
anomaly_idx = data_filled.index[anomaly_mask.fillna(False)]
|
||||
|
||||
# 可选可视化(第一个传感器)
|
||||
plt.rcParams['font.sans-serif'] = ['SimHei']
|
||||
plt.rcParams['axes.unicode_minus'] = False
|
||||
anomalies_info[col] = pd.DataFrame(
|
||||
{
|
||||
"Observed": data_filled.loc[anomaly_idx, col],
|
||||
"Kalman_Predicted": data_kf.loc[anomaly_idx, col],
|
||||
"Residual": sensor_residuals.loc[anomaly_idx],
|
||||
}
|
||||
)
|
||||
|
||||
# 直接在原列上替换异常值为 Kalman 预测值
|
||||
cleaned_data.loc[anomaly_idx, col] = data_kf.loc[anomaly_idx, col]
|
||||
|
||||
# 可选可视化
|
||||
plt.rcParams["font.sans-serif"] = ["SimHei"]
|
||||
plt.rcParams["axes.unicode_minus"] = False
|
||||
if show_plot and len(data.columns) > 0:
|
||||
sensor_to_plot = data.columns[0]
|
||||
plt.figure(figsize=(12, 6))
|
||||
plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7)
|
||||
plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2)
|
||||
plt.figure(figsize=(12, 8))
|
||||
|
||||
plt.subplot(2, 1, 1)
|
||||
plt.plot(
|
||||
data.index,
|
||||
data[sensor_to_plot],
|
||||
label="原始监测值",
|
||||
marker="o",
|
||||
markersize=3,
|
||||
alpha=0.7,
|
||||
)
|
||||
abnormal_zero_idx = data.index[data_filled[sensor_to_plot].isna()]
|
||||
if len(abnormal_zero_idx) > 0:
|
||||
plt.plot(
|
||||
abnormal_zero_idx,
|
||||
data[sensor_to_plot].loc[abnormal_zero_idx],
|
||||
"mo",
|
||||
markersize=8,
|
||||
label="异常0值",
|
||||
)
|
||||
plt.plot(
|
||||
data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2
|
||||
)
|
||||
anomaly_idx = anomalies_info[sensor_to_plot].index
|
||||
if len(anomaly_idx) > 0:
|
||||
plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点")
|
||||
plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值")
|
||||
plt.plot(
|
||||
anomaly_idx,
|
||||
data_filled[sensor_to_plot].loc[anomaly_idx],
|
||||
"ro",
|
||||
markersize=8,
|
||||
label="IQR异常点",
|
||||
)
|
||||
plt.xlabel("时间点(序号)")
|
||||
plt.ylabel("监测值")
|
||||
plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)")
|
||||
plt.ylabel("流量值")
|
||||
plt.title(f"{sensor_to_plot}:原始数据与异常检测")
|
||||
plt.legend()
|
||||
|
||||
plt.subplot(2, 1, 2)
|
||||
plt.plot(
|
||||
data.index,
|
||||
cleaned_data[sensor_to_plot],
|
||||
label="修复后监测值",
|
||||
marker="o",
|
||||
markersize=3,
|
||||
color="green",
|
||||
)
|
||||
plt.xlabel("时间点(序号)")
|
||||
plt.ylabel("流量值")
|
||||
plt.title(f"{sensor_to_plot}:修复后数据")
|
||||
plt.legend()
|
||||
|
||||
plt.tight_layout()
|
||||
plt.show()
|
||||
|
||||
# 返回清洗后的字典
|
||||
return cleaned_data.to_dict(orient='list')
|
||||
# 返回完整的修复后字典
|
||||
return cleaned_data.to_dict(orient="list")
|
||||
|
||||
|
||||
# # 测试
|
||||
# if __name__ == "__main__":
|
||||
@@ -180,27 +261,28 @@ def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict:
|
||||
# 测试 clean_flow_data_dict 函数
|
||||
if __name__ == "__main__":
|
||||
import random
|
||||
|
||||
# 读取 szh_flow_scada.csv 文件
|
||||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
csv_path = os.path.join(script_dir, "szh_flow_scada.csv")
|
||||
data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8")
|
||||
|
||||
|
||||
# 排除 Time 列,随机选择 5 列
|
||||
columns_to_exclude = ['Time']
|
||||
columns_to_exclude = ["Time"]
|
||||
available_columns = [col for col in data.columns if col not in columns_to_exclude]
|
||||
selected_columns = random.sample(available_columns, 1)
|
||||
|
||||
|
||||
# 将选中的列转换为字典
|
||||
data_dict = {col: data[col].tolist() for col in selected_columns}
|
||||
|
||||
|
||||
print("选中的列:", selected_columns)
|
||||
print("原始数据长度:", len(data_dict[selected_columns[0]]))
|
||||
|
||||
|
||||
# 调用函数进行清洗
|
||||
cleaned_dict = clean_flow_data_dict(data_dict, show_plot=True)
|
||||
# 将清洗后的字典写回 CSV
|
||||
out_csv = os.path.join(script_dir, f"{selected_columns[0]}_clean.csv")
|
||||
pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding='utf-8-sig')
|
||||
pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding="utf-8-sig")
|
||||
print("已保存清洗结果到:", out_csv)
|
||||
print("清洗后的字典键:", list(cleaned_dict.keys()))
|
||||
print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]]))
|
||||
|
||||
@@ -105,8 +105,15 @@ def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dic
|
||||
data = pd.DataFrame(data_dict)
|
||||
# 填充NaN值
|
||||
data = data.ffill().bfill()
|
||||
# 标准化
|
||||
data_norm = (data - data.mean()) / data.std()
|
||||
# 异常值预处理
|
||||
# 将0值替换为NaN,然后用线性插值填充
|
||||
data_filled = data.replace(0, np.nan)
|
||||
data_filled = data_filled.interpolate(method="linear", limit_direction="both")
|
||||
# 如果仍有NaN(全为0的列),用前后值填充
|
||||
data_filled = data_filled.ffill().bfill()
|
||||
|
||||
# 标准化(使用填充后的数据)
|
||||
data_norm = (data_filled - data_filled.mean()) / data_filled.std()
|
||||
|
||||
# 聚类与异常检测
|
||||
k = 3
|
||||
@@ -130,46 +137,59 @@ def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dic
|
||||
anomaly_details[data.index[pos]] = main_sensor
|
||||
|
||||
# 修复:滚动平均(窗口可调)
|
||||
data_rolled = data.rolling(window=13, center=True, min_periods=1).mean()
|
||||
data_repaired = data.copy()
|
||||
data_rolled = data_filled.rolling(window=13, center=True, min_periods=1).mean()
|
||||
data_repaired = data_filled.copy()
|
||||
for pos in anomaly_pos:
|
||||
label = data.index[pos]
|
||||
sensor = anomaly_details[label]
|
||||
data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor]
|
||||
|
||||
# 可选可视化(使用位置作为 x 轴)
|
||||
plt.rcParams['font.sans-serif'] = ['SimHei']
|
||||
plt.rcParams['axes.unicode_minus'] = False
|
||||
plt.rcParams["font.sans-serif"] = ["SimHei"]
|
||||
plt.rcParams["axes.unicode_minus"] = False
|
||||
|
||||
if show_plot and len(data.columns) > 0:
|
||||
n = len(data)
|
||||
time = np.arange(n)
|
||||
plt.figure(figsize=(12, 8))
|
||||
for col in data.columns:
|
||||
plt.plot(time, data[col].values, marker='o', markersize=3, label=col)
|
||||
plt.plot(
|
||||
time, data[col].values, marker="o", markersize=3, label=col, alpha=0.5
|
||||
)
|
||||
for col in data_filled.columns:
|
||||
plt.plot(
|
||||
time,
|
||||
data_filled[col].values,
|
||||
marker="x",
|
||||
markersize=3,
|
||||
label=f"{col}_filled",
|
||||
linestyle="--",
|
||||
)
|
||||
for pos in anomaly_pos:
|
||||
sensor = anomaly_details[data.index[pos]]
|
||||
plt.plot(pos, data.iloc[pos][sensor], 'ro', markersize=8)
|
||||
plt.plot(pos, data_filled.iloc[pos][sensor], "ro", markersize=8)
|
||||
plt.xlabel("时间点(序号)")
|
||||
plt.ylabel("压力监测值")
|
||||
plt.title("各传感器折线图(红色标记主要异常点)")
|
||||
plt.title("各传感器折线图(红色标记主要异常点,虚线为0值填充后)")
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
plt.figure(figsize=(12, 8))
|
||||
for col in data_repaired.columns:
|
||||
plt.plot(time, data_repaired[col].values, marker='o', markersize=3, label=col)
|
||||
plt.plot(
|
||||
time, data_repaired[col].values, marker="o", markersize=3, label=col
|
||||
)
|
||||
for pos in anomaly_pos:
|
||||
sensor = anomaly_details[data.index[pos]]
|
||||
plt.plot(pos, data_repaired.iloc[pos][sensor], 'go', markersize=8)
|
||||
plt.xlabel("时间点(序号)")
|
||||
plt.ylabel("修复后压力监测值")
|
||||
plt.title("修复后各传感器折线图(绿色标记修复值)")
|
||||
plt.legend()
|
||||
plt.show()
|
||||
plt.plot(pos, data_repaired.iloc[pos][sensor], "go", markersize=8)
|
||||
plt.xlabel("时间点(序号)")
|
||||
plt.ylabel("修复后压力监测值")
|
||||
plt.title("修复后各传感器折线图(绿色标记修复值)")
|
||||
plt.legend()
|
||||
plt.show()
|
||||
|
||||
# 返回清洗后的字典
|
||||
return data_repaired.to_dict(orient='list')
|
||||
return data_repaired.to_dict(orient="list")
|
||||
|
||||
|
||||
# 测试
|
||||
|
||||
Reference in New Issue
Block a user