diff --git a/app/algorithms/api_ex/Fdataclean.py b/app/algorithms/api_ex/Fdataclean.py index 33228f3..88ed379 100644 --- a/app/algorithms/api_ex/Fdataclean.py +++ b/app/algorithms/api_ex/Fdataclean.py @@ -6,14 +6,107 @@ from pykalman import KalmanFilter import os -def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: +def fill_time_gaps( + data: pd.DataFrame, + time_col: str = "time", + freq: str = "1min", + short_gap_threshold: int = 10, +) -> pd.DataFrame: + """ + 补齐缺失时间戳并填补数据缺口。 + + Args: + data: 包含时间列的 DataFrame + time_col: 时间列名(默认 'time') + freq: 重采样频率(默认 '1min') + short_gap_threshold: 短缺口阈值(分钟),<=此值用线性插值,>此值用前向填充 + + Returns: + 补齐时间后的 DataFrame(保留原时间列格式) + """ + if time_col not in data.columns: + raise ValueError(f"时间列 '{time_col}' 不存在于数据中") + + # 解析时间列并设为索引 + data = data.copy() + data[time_col] = pd.to_datetime(data[time_col], utc=True) + data_indexed = data.set_index(time_col) + + # 生成完整时间范围 + full_range = pd.date_range( + start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq + ) + + # 重索引以补齐缺失时间点 + data_reindexed = data_indexed.reindex(full_range) + + # 按列处理缺口 + for col in data_reindexed.columns: + # 识别缺失值位置 + is_missing = data_reindexed[col].isna() + + # 计算连续缺失的长度 + missing_groups = (is_missing != is_missing.shift()).cumsum() + gap_lengths = is_missing.groupby(missing_groups).transform("sum") + + # 短缺口:线性插值 + short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) + if short_gap_mask.any(): + data_reindexed.loc[short_gap_mask, col] = ( + data_reindexed[col] + .interpolate(method="linear", limit_area="inside") + .loc[short_gap_mask] + ) + + # 长缺口:前向填充 + long_gap_mask = is_missing & (gap_lengths > short_gap_threshold) + if long_gap_mask.any(): + data_reindexed.loc[long_gap_mask, col] = ( + data_reindexed[col].ffill().loc[long_gap_mask] + ) + + # 重置索引并恢复时间列(保留原格式) + data_result = data_reindexed.reset_index() + data_result.rename(columns={"index": time_col}, inplace=True) + + # 保留时区信息 + data_result[time_col] = data_result[time_col].dt.strftime("%Y-%m-%dT%H:%M:%S%z") + # 修正时区格式(Python的%z输出为+0000,需转为+00:00) + data_result[time_col] = data_result[time_col].str.replace( + r"(\+\d{2})(\d{2})$", r"\1:\2", regex=True + ) + + return data_result + + +def clean_flow_data_kf( + input_csv_path: str, show_plot: bool = False, fill_gaps: bool = True +) -> str: """ 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。 仅保留输入文件路径作为参数(按要求)。 + + Args: + input_csv_path: CSV 文件路径 + show_plot: 是否显示可视化 + fill_gaps: 是否先补齐时间缺口(默认 True) """ # 读取 CSV data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8") + + # 补齐时间缺口(如果数据包含 time 列) + if fill_gaps and "time" in data.columns: + data = fill_time_gaps( + data, time_col="time", freq="1min", short_gap_threshold=10 + ) + + # 分离时间列和数值列 + time_col_data = None + if "time" in data.columns: + time_col_data = data["time"] + data = data.drop(columns=["time"]) + # 存储 Kalman 平滑结果 data_kf = pd.DataFrame(index=data.index, columns=data.columns) # 平滑每一列 @@ -63,6 +156,10 @@ def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: ) cleaned_data.loc[anomaly_idx, f"{col}_cleaned"] = data_kf.loc[anomaly_idx, col] + # 如果原始数据包含时间列,将其添加回结果 + if time_col_data is not None: + cleaned_data.insert(0, "time", time_col_data) + # 构造输出文件名:在输入文件名基础上加后缀 _cleaned.xlsx input_dir = os.path.dirname(os.path.abspath(input_csv_path)) input_base = os.path.splitext(os.path.basename(input_csv_path))[0] @@ -122,17 +219,20 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: 接收一个 DataFrame 数据结构,使用一维 Kalman 滤波平滑并用预测值替换基于 IQR 检测出的异常点。 区分合理的0值(流量转换)和异常的0值(连续多个0或孤立0)。 返回完整的清洗后的字典数据结构。 + + Args: + data: 输入 DataFrame(可包含 time 列) + show_plot: 是否显示可视化 """ # 使用传入的 DataFrame data = data.copy() - # 替换0值,填充NaN值 - data_filled = data.replace(0, np.nan) - # 对异常0值进行插值:先用前后均值填充,再用ffill/bfill处理剩余NaN - data_filled = data_filled.interpolate(method="linear", limit_direction="both") - - # 处理剩余的0值和NaN值 - data_filled = data_filled.ffill().bfill() + # 补齐时间缺口(如果启用且数据包含 time 列) + data_filled = fill_time_gaps( + data, time_col="time", freq="1min", short_gap_threshold=10 + ) + # 移除 time 列用于后续清洗 + data_filled = data_filled.drop(columns=["time"]) # 存储 Kalman 平滑结果 data_kf = pd.DataFrame(index=data_filled.index, columns=data_filled.columns) diff --git a/app/algorithms/api_ex/Pdataclean.py b/app/algorithms/api_ex/Pdataclean.py index f62ecbe..af4bab5 100644 --- a/app/algorithms/api_ex/Pdataclean.py +++ b/app/algorithms/api_ex/Pdataclean.py @@ -6,15 +6,107 @@ from sklearn.impute import SimpleImputer import os -def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: +def fill_time_gaps( + data: pd.DataFrame, + time_col: str = "time", + freq: str = "1min", + short_gap_threshold: int = 10, +) -> pd.DataFrame: + """ + 补齐缺失时间戳并填补数据缺口。 + + Args: + data: 包含时间列的 DataFrame + time_col: 时间列名(默认 'time') + freq: 重采样频率(默认 '1min') + short_gap_threshold: 短缺口阈值(分钟),<=此值用线性插值,>此值用前向填充 + + Returns: + 补齐时间后的 DataFrame(保留原时间列格式) + """ + if time_col not in data.columns: + raise ValueError(f"时间列 '{time_col}' 不存在于数据中") + + # 解析时间列并设为索引 + data = data.copy() + data[time_col] = pd.to_datetime(data[time_col], utc=True) + data_indexed = data.set_index(time_col) + + # 生成完整时间范围 + full_range = pd.date_range( + start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq + ) + + # 重索引以补齐缺失时间点 + data_reindexed = data_indexed.reindex(full_range) + + # 按列处理缺口 + for col in data_reindexed.columns: + # 识别缺失值位置 + is_missing = data_reindexed[col].isna() + + # 计算连续缺失的长度 + missing_groups = (is_missing != is_missing.shift()).cumsum() + gap_lengths = is_missing.groupby(missing_groups).transform("sum") + + # 短缺口:线性插值 + short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) + if short_gap_mask.any(): + data_reindexed.loc[short_gap_mask, col] = ( + data_reindexed[col] + .interpolate(method="linear", limit_area="inside") + .loc[short_gap_mask] + ) + + # 长缺口:前向填充 + long_gap_mask = is_missing & (gap_lengths > short_gap_threshold) + if long_gap_mask.any(): + data_reindexed.loc[long_gap_mask, col] = ( + data_reindexed[col].ffill().loc[long_gap_mask] + ) + + # 重置索引并恢复时间列(保留原格式) + data_result = data_reindexed.reset_index() + data_result.rename(columns={"index": time_col}, inplace=True) + + # 保留时区信息 + data_result[time_col] = data_result[time_col].dt.strftime("%Y-%m-%dT%H:%M:%S%z") + # 修正时区格式(Python的%z输出为+0000,需转为+00:00) + data_result[time_col] = data_result[time_col].str.replace( + r"(\+\d{2})(\d{2})$", r"\1:\2", regex=True + ) + + return data_result + + +def clean_pressure_data_km( + input_csv_path: str, show_plot: bool = False, fill_gaps: bool = True +) -> str: """ 读取输入 CSV,基于 KMeans 检测异常并用滚动平均修复。输出为 _cleaned.xlsx(同目录)。 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 返回输出文件的绝对路径。 + + Args: + input_csv_path: CSV 文件路径 + show_plot: 是否显示可视化 + fill_gaps: 是否先补齐时间缺口(默认 True) """ # 读取 CSV input_csv_path = os.path.abspath(input_csv_path) data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8") + + # 补齐时间缺口(如果数据包含 time 列) + if fill_gaps and "time" in data.columns: + data = fill_time_gaps( + data, time_col="time", freq="1min", short_gap_threshold=10 + ) + + # 分离时间列和数值列 + time_col_data = None + if "time" in data.columns: + time_col_data = data["time"] + data = data.drop(columns=["time"]) # 标准化 data_norm = (data - data.mean()) / data.std() @@ -86,11 +178,20 @@ def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: output_filename = f"{input_base}_cleaned.xlsx" output_path = os.path.join(input_dir, output_filename) + # 如果原始数据包含时间列,将其添加回结果 + data_for_save = data.copy() + data_repaired_for_save = data_repaired.copy() + if time_col_data is not None: + data_for_save.insert(0, "time", time_col_data) + data_repaired_for_save.insert(0, "time", time_col_data) + if os.path.exists(output_path): os.remove(output_path) # 覆盖同名文件 with pd.ExcelWriter(output_path, engine="openpyxl") as writer: - data.to_excel(writer, sheet_name="raw_pressure_data", index=False) - data_repaired.to_excel(writer, sheet_name="cleaned_pressusre_data", index=False) + data_for_save.to_excel(writer, sheet_name="raw_pressure_data", index=False) + data_repaired_for_save.to_excel( + writer, sheet_name="cleaned_pressusre_data", index=False + ) # 返回输出文件的绝对路径 return os.path.abspath(output_path) @@ -100,17 +201,20 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di """ 接收一个 DataFrame 数据结构,使用KMeans聚类检测异常并用滚动平均修复。 返回清洗后的字典数据结构。 + + Args: + data: 输入 DataFrame(可包含 time 列) + show_plot: 是否显示可视化 """ # 使用传入的 DataFrame data = data.copy() - # 填充NaN值 - data = data.ffill().bfill() - # 异常值预处理 - # 将0值替换为NaN,然后用线性插值填充 - data_filled = data.replace(0, np.nan) - data_filled = data_filled.interpolate(method="linear", limit_direction="both") - # 如果仍有NaN(全为0的列),用前后值填充 - data_filled = data_filled.ffill().bfill() + + # 补齐时间缺口(如果启用且数据包含 time 列) + data_filled = fill_time_gaps( + data, time_col="time", freq="1min", short_gap_threshold=10 + ) + # 移除 time 列用于后续清洗 + data_filled = data_filled.drop(columns=["time"]) # 标准化(使用填充后的数据) data_norm = (data_filled - data_filled.mean()) / data_filled.std() diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index b2893f8..5289088 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -237,7 +237,7 @@ async def fastapi_valve_close_analysis( return result or "success" -@router.get("/valveisolation/") +@router.get("/valve_isolation_analysis/") async def valve_isolation_endpoint(network: str, accident_element: str): return analyze_valve_isolation(network, accident_element) @@ -587,11 +587,10 @@ async def fastapi_scada_device_data_cleaning( if device_id in type_scada_data: values = [record["value"] for record in type_scada_data[device_id]] df[device_id] = values - value_df = df.drop(columns=["time"]) if device_type == "pressure": - cleaned_value_df = Pdataclean.clean_pressure_data_df_km(value_df) + cleaned_value_df = Pdataclean.clean_pressure_data_df_km(df) elif device_type == "pipe_flow": - cleaned_value_df = Fdataclean.clean_flow_data_df_kf(value_df) + cleaned_value_df = Fdataclean.clean_flow_data_df_kf(df) cleaned_value_df = pd.DataFrame(cleaned_value_df) cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1) influxdb_api.import_multicolumn_data_from_dict( diff --git a/app/infra/db/timescaledb/composite_queries.py b/app/infra/db/timescaledb/composite_queries.py index 47a75f9..412d93a 100644 --- a/app/infra/db/timescaledb/composite_queries.py +++ b/app/infra/db/timescaledb/composite_queries.py @@ -405,10 +405,8 @@ class CompositeQueries: pressure_df = df[pressure_ids] # 重置索引,将 time 变为普通列 pressure_df = pressure_df.reset_index() - # 移除 time 列,准备输入给清洗方法 - value_df = pressure_df.drop(columns=["time"]) # 调用清洗方法 - cleaned_value_df = clean_pressure_data_df_km(value_df) + cleaned_value_df = clean_pressure_data_df_km(pressure_df) # 添加 time 列到首列 cleaned_df = pd.concat([pressure_df["time"], cleaned_value_df], axis=1) # 将清洗后的数据写回数据库 @@ -432,10 +430,8 @@ class CompositeQueries: flow_df = df[flow_ids] # 重置索引,将 time 变为普通列 flow_df = flow_df.reset_index() - # 移除 time 列,准备输入给清洗方法 - value_df = flow_df.drop(columns=["time"]) # 调用清洗方法 - cleaned_value_df = clean_flow_data_df_kf(value_df) + cleaned_value_df = clean_flow_data_df_kf(flow_df) # 添加 time 列到首列 cleaned_df = pd.concat([flow_df["time"], cleaned_value_df], axis=1) # 将清洗后的数据写回数据库