修复数据清洗时间轴填补后的对齐问题

This commit is contained in:
2026-02-02 15:16:23 +08:00
parent 3c7e2c5806
commit 9be2028e4c
3 changed files with 38 additions and 20 deletions

View File

@@ -37,8 +37,9 @@ def fill_time_gaps(
start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq
) )
# 重索引以补齐缺失时间点 # 重索引以补齐缺失时间点,同时保留原始时间戳
data_reindexed = data_indexed.reindex(full_range) combined_index = data_indexed.index.union(full_range).sort_values().unique()
data_reindexed = data_indexed.reindex(combined_index)
# 按列处理缺口 # 按列处理缺口
for col in data_reindexed.columns: for col in data_reindexed.columns:
@@ -49,12 +50,12 @@ def fill_time_gaps(
missing_groups = (is_missing != is_missing.shift()).cumsum() missing_groups = (is_missing != is_missing.shift()).cumsum()
gap_lengths = is_missing.groupby(missing_groups).transform("sum") gap_lengths = is_missing.groupby(missing_groups).transform("sum")
# 短缺口:线性插值 # 短缺口:时间插值
short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold)
if short_gap_mask.any(): if short_gap_mask.any():
data_reindexed.loc[short_gap_mask, col] = ( data_reindexed.loc[short_gap_mask, col] = (
data_reindexed[col] data_reindexed[col]
.interpolate(method="linear", limit_area="inside") .interpolate(method="time", limit_area="inside")
.loc[short_gap_mask] .loc[short_gap_mask]
) )
@@ -231,6 +232,12 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict:
data_filled = fill_time_gaps( data_filled = fill_time_gaps(
data, time_col="time", freq="1min", short_gap_threshold=10 data, time_col="time", freq="1min", short_gap_threshold=10
) )
# 保存 time 列用于最后合并
time_col_series = None
if "time" in data_filled.columns:
time_col_series = data_filled["time"]
# 移除 time 列用于后续清洗 # 移除 time 列用于后续清洗
data_filled = data_filled.drop(columns=["time"]) data_filled = data_filled.drop(columns=["time"])
@@ -292,7 +299,7 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict:
plt.rcParams["axes.unicode_minus"] = False plt.rcParams["axes.unicode_minus"] = False
if show_plot and len(data.columns) > 0: if show_plot and len(data.columns) > 0:
sensor_to_plot = data.columns[0] sensor_to_plot = data.columns[0]
# 定义x轴 # 定义x轴
n = len(data) n = len(data)
time = np.arange(n) time = np.arange(n)
@@ -310,7 +317,7 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict:
markersize=3, markersize=3,
alpha=0.7, alpha=0.7,
) )
# 修正:检查 data_filled 的异常值,绘制在 time_filled 上 # 修正:检查 data_filled 的异常值,绘制在 time_filled 上
abnormal_zero_mask = data_filled[sensor_to_plot].isna() abnormal_zero_mask = data_filled[sensor_to_plot].isna()
# 如果目的是检查0值应该用 == 0。这里保留 isna() 但修正索引引用防止crash。 # 如果目的是检查0值应该用 == 0。这里保留 isna() 但修正索引引用防止crash。
@@ -318,11 +325,11 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict:
# 基于 "异常0值" 的标签,改为检查 0 值更合理,但为了保险起见, # 基于 "异常0值" 的标签,改为检查 0 值更合理,但为了保险起见,
# 如果 isna() 返回空,就不画。防止索引越界是主要的。 # 如果 isna() 返回空,就不画。防止索引越界是主要的。
abnormal_zero_idx = data_filled.index[abnormal_zero_mask] abnormal_zero_idx = data_filled.index[abnormal_zero_mask]
if len(abnormal_zero_idx) > 0: if len(abnormal_zero_idx) > 0:
# 注意:如果 abnormal_zero_idx 是基于 data_filled 的索引0..M-1 # 注意:如果 abnormal_zero_idx 是基于 data_filled 的索引0..M-1
# 直接作为 x 坐标即可,因为 time_filled 也是 0..M-1 # 直接作为 x 坐标即可,因为 time_filled 也是 0..M-1
# 而 y 值应该取自 data_filled 或 data_kf取 data 会越界 # 而 y 值应该取自 data_filled 或 data_kf取 data 会越界
plt.plot( plt.plot(
abnormal_zero_idx, abnormal_zero_idx,
data_filled[sensor_to_plot].loc[abnormal_zero_idx], data_filled[sensor_to_plot].loc[abnormal_zero_idx],
@@ -365,6 +372,10 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict:
plt.tight_layout() plt.tight_layout()
plt.show() plt.show()
# 将 time 列添加回结果
if time_col_series is not None:
cleaned_data.insert(0, "time", time_col_series)
# 返回完整的修复后字典 # 返回完整的修复后字典
return cleaned_data return cleaned_data

View File

@@ -37,8 +37,9 @@ def fill_time_gaps(
start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq
) )
# 重索引以补齐缺失时间点 # 重索引以补齐缺失时间点,同时保留原始时间戳
data_reindexed = data_indexed.reindex(full_range) combined_index = data_indexed.index.union(full_range).sort_values().unique()
data_reindexed = data_indexed.reindex(combined_index)
# 按列处理缺口 # 按列处理缺口
for col in data_reindexed.columns: for col in data_reindexed.columns:
@@ -49,12 +50,12 @@ def fill_time_gaps(
missing_groups = (is_missing != is_missing.shift()).cumsum() missing_groups = (is_missing != is_missing.shift()).cumsum()
gap_lengths = is_missing.groupby(missing_groups).transform("sum") gap_lengths = is_missing.groupby(missing_groups).transform("sum")
# 短缺口:线性插值 # 短缺口:时间插值
short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold)
if short_gap_mask.any(): if short_gap_mask.any():
data_reindexed.loc[short_gap_mask, col] = ( data_reindexed.loc[short_gap_mask, col] = (
data_reindexed[col] data_reindexed[col]
.interpolate(method="linear", limit_area="inside") .interpolate(method="time", limit_area="inside")
.loc[short_gap_mask] .loc[short_gap_mask]
) )
@@ -213,6 +214,12 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di
data_filled = fill_time_gaps( data_filled = fill_time_gaps(
data, time_col="time", freq="1min", short_gap_threshold=10 data, time_col="time", freq="1min", short_gap_threshold=10
) )
# 保存 time 列用于最后合并
time_col_series = None
if "time" in data_filled.columns:
time_col_series = data_filled["time"]
# 移除 time 列用于后续清洗 # 移除 time 列用于后续清洗
data_filled = data_filled.drop(columns=["time"]) data_filled = data_filled.drop(columns=["time"])
@@ -304,6 +311,10 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di
plt.legend() plt.legend()
plt.show() plt.show()
# 将 time 列添加回结果
if time_col_series is not None:
data_repaired.insert(0, "time", time_col_series)
# 返回清洗后的字典 # 返回清洗后的字典
return data_repaired return data_repaired

View File

@@ -406,9 +406,7 @@ class CompositeQueries:
# 重置索引,将 time 变为普通列 # 重置索引,将 time 变为普通列
pressure_df = pressure_df.reset_index() pressure_df = pressure_df.reset_index()
# 调用清洗方法 # 调用清洗方法
cleaned_value_df = clean_pressure_data_df_km(pressure_df) cleaned_df = clean_pressure_data_df_km(pressure_df)
# 添加 time 列到首列
cleaned_df = pd.concat([pressure_df["time"], cleaned_value_df], axis=1)
# 将清洗后的数据写回数据库 # 将清洗后的数据写回数据库
for device_id in pressure_ids: for device_id in pressure_ids:
if device_id in cleaned_df.columns: if device_id in cleaned_df.columns:
@@ -431,9 +429,7 @@ class CompositeQueries:
# 重置索引,将 time 变为普通列 # 重置索引,将 time 变为普通列
flow_df = flow_df.reset_index() flow_df = flow_df.reset_index()
# 调用清洗方法 # 调用清洗方法
cleaned_value_df = clean_flow_data_df_kf(flow_df) cleaned_df = clean_flow_data_df_kf(flow_df)
# 添加 time 列到首列
cleaned_df = pd.concat([flow_df["time"], cleaned_value_df], axis=1)
# 将清洗后的数据写回数据库 # 将清洗后的数据写回数据库
for device_id in flow_ids: for device_id in flow_ids:
if device_id in cleaned_df.columns: if device_id in cleaned_df.columns: