From 9be2028e4c712874454089d5098307f8ecc24dc6 Mon Sep 17 00:00:00 2001 From: Jiang Date: Mon, 2 Feb 2026 15:16:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=95=B0=E6=8D=AE=E6=B8=85?= =?UTF-8?q?=E6=B4=97=E6=97=B6=E9=97=B4=E8=BD=B4=E5=A1=AB=E8=A1=A5=E5=90=8E?= =?UTF-8?q?=E7=9A=84=E5=AF=B9=E9=BD=90=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/algorithms/api_ex/flow_data_clean.py | 31 +++++++++++++------ app/algorithms/api_ex/pressure_data_clean.py | 19 +++++++++--- app/infra/db/timescaledb/composite_queries.py | 8 ++--- 3 files changed, 38 insertions(+), 20 deletions(-) diff --git a/app/algorithms/api_ex/flow_data_clean.py b/app/algorithms/api_ex/flow_data_clean.py index 1e689a9..526275c 100644 --- a/app/algorithms/api_ex/flow_data_clean.py +++ b/app/algorithms/api_ex/flow_data_clean.py @@ -37,8 +37,9 @@ def fill_time_gaps( start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq ) - # 重索引以补齐缺失时间点 - data_reindexed = data_indexed.reindex(full_range) + # 重索引以补齐缺失时间点,同时保留原始时间戳 + combined_index = data_indexed.index.union(full_range).sort_values().unique() + data_reindexed = data_indexed.reindex(combined_index) # 按列处理缺口 for col in data_reindexed.columns: @@ -49,12 +50,12 @@ def fill_time_gaps( missing_groups = (is_missing != is_missing.shift()).cumsum() gap_lengths = is_missing.groupby(missing_groups).transform("sum") - # 短缺口:线性插值 + # 短缺口:时间插值 short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) if short_gap_mask.any(): data_reindexed.loc[short_gap_mask, col] = ( data_reindexed[col] - .interpolate(method="linear", limit_area="inside") + .interpolate(method="time", limit_area="inside") .loc[short_gap_mask] ) @@ -231,6 +232,12 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: data_filled = fill_time_gaps( data, time_col="time", freq="1min", short_gap_threshold=10 ) + + # 保存 time 列用于最后合并 + time_col_series = None + if "time" in data_filled.columns: + time_col_series = data_filled["time"] + # 移除 time 列用于后续清洗 data_filled = data_filled.drop(columns=["time"]) @@ -292,7 +299,7 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: plt.rcParams["axes.unicode_minus"] = False if show_plot and len(data.columns) > 0: sensor_to_plot = data.columns[0] - + # 定义x轴 n = len(data) time = np.arange(n) @@ -310,7 +317,7 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: markersize=3, alpha=0.7, ) - + # 修正:检查 data_filled 的异常值,绘制在 time_filled 上 abnormal_zero_mask = data_filled[sensor_to_plot].isna() # 如果目的是检查0值,应该用 == 0。这里保留 isna() 但修正索引引用,防止crash。 @@ -318,11 +325,11 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: # 基于 "异常0值" 的标签,改为检查 0 值更合理,但为了保险起见, # 如果 isna() 返回空,就不画。防止索引越界是主要的。 abnormal_zero_idx = data_filled.index[abnormal_zero_mask] - + if len(abnormal_zero_idx) > 0: - # 注意:如果 abnormal_zero_idx 是基于 data_filled 的索引(0..M-1), - # 直接作为 x 坐标即可,因为 time_filled 也是 0..M-1 - # 而 y 值应该取自 data_filled 或 data_kf,取 data 会越界 + # 注意:如果 abnormal_zero_idx 是基于 data_filled 的索引(0..M-1), + # 直接作为 x 坐标即可,因为 time_filled 也是 0..M-1 + # 而 y 值应该取自 data_filled 或 data_kf,取 data 会越界 plt.plot( abnormal_zero_idx, data_filled[sensor_to_plot].loc[abnormal_zero_idx], @@ -365,6 +372,10 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: plt.tight_layout() plt.show() + # 将 time 列添加回结果 + if time_col_series is not None: + cleaned_data.insert(0, "time", time_col_series) + # 返回完整的修复后字典 return cleaned_data diff --git a/app/algorithms/api_ex/pressure_data_clean.py b/app/algorithms/api_ex/pressure_data_clean.py index 4afdcf4..fcfa20c 100644 --- a/app/algorithms/api_ex/pressure_data_clean.py +++ b/app/algorithms/api_ex/pressure_data_clean.py @@ -37,8 +37,9 @@ def fill_time_gaps( start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq ) - # 重索引以补齐缺失时间点 - data_reindexed = data_indexed.reindex(full_range) + # 重索引以补齐缺失时间点,同时保留原始时间戳 + combined_index = data_indexed.index.union(full_range).sort_values().unique() + data_reindexed = data_indexed.reindex(combined_index) # 按列处理缺口 for col in data_reindexed.columns: @@ -49,12 +50,12 @@ def fill_time_gaps( missing_groups = (is_missing != is_missing.shift()).cumsum() gap_lengths = is_missing.groupby(missing_groups).transform("sum") - # 短缺口:线性插值 + # 短缺口:时间插值 short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) if short_gap_mask.any(): data_reindexed.loc[short_gap_mask, col] = ( data_reindexed[col] - .interpolate(method="linear", limit_area="inside") + .interpolate(method="time", limit_area="inside") .loc[short_gap_mask] ) @@ -213,6 +214,12 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di data_filled = fill_time_gaps( data, time_col="time", freq="1min", short_gap_threshold=10 ) + + # 保存 time 列用于最后合并 + time_col_series = None + if "time" in data_filled.columns: + time_col_series = data_filled["time"] + # 移除 time 列用于后续清洗 data_filled = data_filled.drop(columns=["time"]) @@ -304,6 +311,10 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di plt.legend() plt.show() + # 将 time 列添加回结果 + if time_col_series is not None: + data_repaired.insert(0, "time", time_col_series) + # 返回清洗后的字典 return data_repaired diff --git a/app/infra/db/timescaledb/composite_queries.py b/app/infra/db/timescaledb/composite_queries.py index 5d2536c..c62efe1 100644 --- a/app/infra/db/timescaledb/composite_queries.py +++ b/app/infra/db/timescaledb/composite_queries.py @@ -406,9 +406,7 @@ class CompositeQueries: # 重置索引,将 time 变为普通列 pressure_df = pressure_df.reset_index() # 调用清洗方法 - cleaned_value_df = clean_pressure_data_df_km(pressure_df) - # 添加 time 列到首列 - cleaned_df = pd.concat([pressure_df["time"], cleaned_value_df], axis=1) + cleaned_df = clean_pressure_data_df_km(pressure_df) # 将清洗后的数据写回数据库 for device_id in pressure_ids: if device_id in cleaned_df.columns: @@ -431,9 +429,7 @@ class CompositeQueries: # 重置索引,将 time 变为普通列 flow_df = flow_df.reset_index() # 调用清洗方法 - cleaned_value_df = clean_flow_data_df_kf(flow_df) - # 添加 time 列到首列 - cleaned_df = pd.concat([flow_df["time"], cleaned_value_df], axis=1) + cleaned_df = clean_flow_data_df_kf(flow_df) # 将清洗后的数据写回数据库 for device_id in flow_ids: if device_id in cleaned_df.columns: