from __future__ import annotations from typing import Any import numpy as np import pandas as pd from scipy.fft import fft, ifft from sklearn.ensemble import IsolationForest PressureDataInput = ( pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]] | np.ndarray ) IGNORED_OBSERVATION_COLUMNS = {"time", "timestamp", "datetime", "date"} class BurstDetector: """FFT + IsolationForest based burst detection for daily aligned pressure data.""" def __init__( self, *, mu: int = 100, points_per_day: int = 1440, iforest_params: dict[str, Any] | None = None, ) -> None: if points_per_day <= 0: raise ValueError("points_per_day 必须大于 0。") if mu <= 0: raise ValueError("mu 必须大于 0。") self.mu = int(mu) self.points_per_day = int(points_per_day) self.iforest_params = { "n_estimators": 50, "random_state": 42, "contamination": "auto", } if iforest_params: self.iforest_params.update(iforest_params) self.data: np.ndarray | None = None self.sensor_names: list[str] = [] self.high_freq_features: np.ndarray | None = None def load_data( self, data_source: PressureDataInput, *, sensor_nodes: list[str] | None = None, ) -> pd.DataFrame: """ 标准化输入观测数据为 DataFrame。 支持的 `data_source` 格式: - `pd.DataFrame` 每一列代表一个传感器,每一行代表一个时间点。 - `dict[str, list[Any]]` 键为传感器 ID,值为该传感器按时间顺序排列的压力序列。 例如:`{"J1": [101.2, 101.0], "J2": [99.8, 99.7]}`。 - `list[dict[str, Any]]` 每个字典代表一个时间点,键为传感器 ID,值为该时刻压力。 例如:`[{"J1": 101.2, "J2": 99.8}, {"J1": 101.0, "J2": 99.7}]`。 - `list[list[Any]]` 二维列表,格式为 `(时间点数, 传感器数)`。 例如:`[[101.2, 99.8], [101.0, 99.7]]`。 - `np.ndarray` 二维数组,形状必须为 `(时间点数, 传感器数)`。 参数: - `sensor_nodes`: 可选的传感器列筛选列表。传入后,数据中必须包含这些列名。 返回: - 标准化后的 `pd.DataFrame`,列为传感器,行为时间点。 """ if isinstance(data_source, np.ndarray): observation_df = pd.DataFrame(data_source) elif isinstance(data_source, pd.DataFrame): observation_df = data_source.copy() else: observation_df = pd.DataFrame(data_source) return self._normalize_observation_frame( observation_df=observation_df, sensor_nodes=sensor_nodes ) def process( self, observed_pressure_data: PressureDataInput, *, sensor_nodes: list[str] | None = None, ) -> np.ndarray: """ 对输入压力序列按天切片,并提取每天末时刻的高频特征。 `observed_pressure_data` 的格式与 `load_data()` 一致,统一要求: - 数据必须表示为“行=时间点、列=传感器”。 - 总行数必须是 `points_per_day` 的整数倍。 - 至少需要 2 天数据,即总行数 `>= 2 * points_per_day`。 例如: - 当 `points_per_day=1440` 时,15 天数据的形状通常为 `(21600, 传感器数)`。 - 若传入 `sensor_nodes=["J1", "J2"]`,则输入中必须存在 `J1/J2` 两列。 返回: - `np.ndarray`,形状为 `(天数, 传感器数)`, 每个值表示对应传感器在当天末时刻提取出的高频分量。 """ observation_df = self.load_data( observed_pressure_data, sensor_nodes=sensor_nodes, ) matrix = observation_df.to_numpy(dtype=float) total_points, sensor_count = matrix.shape if sensor_count == 0: raise ValueError("压力观测数据中未找到可用传感器列。") if total_points < self.points_per_day * 2: raise ValueError("至少需要 2 天的观测数据才能执行爆管侦测。") if total_points % self.points_per_day != 0: raise ValueError("观测数据长度必须能被每日采样点数整除,以便按天切分。") day_count = total_points // self.points_per_day high_freq_features = np.zeros((day_count, sensor_count), dtype=float) for sensor_idx in range(sensor_count): sensor_series = matrix[:, sensor_idx] for day_idx in range(day_count): start = day_idx * self.points_per_day end = (day_idx + 1) * self.points_per_day day_data = sensor_series[start:end] mirrored_data = np.concatenate([day_data, day_data[::-1]]) transformed = fft(mirrored_data) transformed[self.mu : len(mirrored_data) - self.mu + 1] = 0 low_freq = ifft(transformed).real high_freq = day_data - low_freq[: self.points_per_day] high_freq_features[day_idx, sensor_idx] = float(high_freq[-1]) self.data = matrix self.sensor_names = [str(column) for column in observation_df.columns] self.high_freq_features = high_freq_features return high_freq_features def detect(self) -> pd.DataFrame: if self.high_freq_features is None: raise ValueError("特征未提取。请先调用 process()。") day_count = self.high_freq_features.shape[0] if day_count < 2: raise ValueError("孤立森林至少需要 2 天特征数据。") clf = IsolationForest( n_estimators=self.iforest_params.get("n_estimators", 50), max_samples=day_count, random_state=self.iforest_params.get("random_state", 42), contamination=self.iforest_params.get("contamination", "auto"), **{ key: value for key, value in self.iforest_params.items() if key not in {"n_estimators", "random_state", "contamination"} }, ) clf.fit(self.high_freq_features) scores = clf.decision_function(self.high_freq_features) predictions = clf.predict(self.high_freq_features) result_df = pd.DataFrame( { "Day": range(1, day_count + 1), "Score": scores.astype(float), "Prediction": predictions.astype(int), } ) result_df["IsBurst"] = result_df["Prediction"].eq(-1) result_df.attrs["sensor_nodes"] = self.sensor_names.copy() result_df.attrs["high_freq_features"] = self.high_freq_features.copy() result_df.attrs["day_count"] = day_count result_df.attrs["points_per_day"] = self.points_per_day result_df.attrs["sample_count"] = ( int(self.data.shape[0]) if self.data is not None else 0 ) return result_df def run_detection( self, observed_pressure_data: PressureDataInput, *, sensor_nodes: list[str] | None = None, ) -> pd.DataFrame: """ 执行完整爆管侦测流程。 输入格式与 `process()` 相同: - `DataFrame` / `dict[str, list[Any]]` / `list[dict[str, Any]]` / `list[list[Any]]` / `np.ndarray` - 行表示时间点,列表示传感器 - 总行数必须能被 `points_per_day` 整除 返回结果包含列: - `Day`: 第几天(从 1 开始) - `Score`: IsolationForest 异常分数,越小越异常 - `Prediction`: `-1` 表示异常,`1` 表示正常 - `IsBurst`: 是否判定为异常日 """ self.process(observed_pressure_data, sensor_nodes=sensor_nodes) return self.detect() @staticmethod def _normalize_observation_frame( *, observation_df: pd.DataFrame, sensor_nodes: list[str] | None, ) -> pd.DataFrame: if observation_df.empty: raise ValueError("压力观测数据为空。") normalized_df = observation_df.copy() normalized_df.columns = [str(column) for column in normalized_df.columns] normalized_df = normalized_df.drop( columns=[ column for column in normalized_df.columns if column.lower() in IGNORED_OBSERVATION_COLUMNS or column.lower().startswith("unnamed:") ], errors="ignore", ) if sensor_nodes: selected_columns = [str(node) for node in sensor_nodes] missing_columns = [ column for column in selected_columns if column not in normalized_df.columns ] if missing_columns: preview = ", ".join(missing_columns[:10]) raise ValueError(f"观测数据缺少传感器列: {preview}") normalized_df = normalized_df.loc[:, selected_columns] else: candidate_df = normalized_df.apply(pd.to_numeric, errors="coerce") normalized_df = candidate_df.loc[:, candidate_df.notna().any(axis=0)] if normalized_df.empty: raise ValueError("未识别到可用的数值型压力观测列。") normalized_df = normalized_df.apply(pd.to_numeric, errors="coerce") invalid_columns = [ column for column in normalized_df.columns if normalized_df[column].isna().any() ] if invalid_columns: preview = ", ".join(invalid_columns[:10]) raise ValueError(f"压力观测数据包含非数值或缺失值: {preview}") return normalized_df.reset_index(drop=True)