260 lines
9.8 KiB
Python
260 lines
9.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
from scipy.fft import fft, ifft
|
|
from sklearn.ensemble import IsolationForest
|
|
|
|
PressureDataInput = (
|
|
pd.DataFrame
|
|
| dict[str, list[Any]]
|
|
| list[dict[str, Any]]
|
|
| list[list[Any]]
|
|
| np.ndarray
|
|
)
|
|
IGNORED_OBSERVATION_COLUMNS = {"time", "timestamp", "datetime", "date"}
|
|
|
|
|
|
class BurstDetector:
|
|
"""FFT + IsolationForest based burst detection for daily aligned pressure data."""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
mu: int = 100,
|
|
points_per_day: int = 1440,
|
|
iforest_params: dict[str, Any] | None = None,
|
|
) -> None:
|
|
if points_per_day <= 0:
|
|
raise ValueError("points_per_day 必须大于 0。")
|
|
if mu <= 0:
|
|
raise ValueError("mu 必须大于 0。")
|
|
|
|
self.mu = int(mu)
|
|
self.points_per_day = int(points_per_day)
|
|
self.iforest_params = {
|
|
"n_estimators": 50,
|
|
"random_state": 42,
|
|
"contamination": "auto",
|
|
}
|
|
if iforest_params:
|
|
self.iforest_params.update(iforest_params)
|
|
|
|
self.data: np.ndarray | None = None
|
|
self.sensor_names: list[str] = []
|
|
self.high_freq_features: np.ndarray | None = None
|
|
|
|
def load_data(
|
|
self,
|
|
data_source: PressureDataInput,
|
|
*,
|
|
sensor_nodes: list[str] | None = None,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
标准化输入观测数据为 DataFrame。
|
|
|
|
支持的 `data_source` 格式:
|
|
- `pd.DataFrame`
|
|
每一列代表一个传感器,每一行代表一个时间点。
|
|
- `dict[str, list[Any]]`
|
|
键为传感器 ID,值为该传感器按时间顺序排列的压力序列。
|
|
例如:`{"J1": [101.2, 101.0], "J2": [99.8, 99.7]}`。
|
|
- `list[dict[str, Any]]`
|
|
每个字典代表一个时间点,键为传感器 ID,值为该时刻压力。
|
|
例如:`[{"J1": 101.2, "J2": 99.8}, {"J1": 101.0, "J2": 99.7}]`。
|
|
- `list[list[Any]]`
|
|
二维列表,格式为 `(时间点数, 传感器数)`。
|
|
例如:`[[101.2, 99.8], [101.0, 99.7]]`。
|
|
- `np.ndarray`
|
|
二维数组,形状必须为 `(时间点数, 传感器数)`。
|
|
|
|
参数:
|
|
- `sensor_nodes`:
|
|
可选的传感器列筛选列表。传入后,数据中必须包含这些列名。
|
|
|
|
返回:
|
|
- 标准化后的 `pd.DataFrame`,列为传感器,行为时间点。
|
|
"""
|
|
if isinstance(data_source, np.ndarray):
|
|
observation_df = pd.DataFrame(data_source)
|
|
elif isinstance(data_source, pd.DataFrame):
|
|
observation_df = data_source.copy()
|
|
else:
|
|
observation_df = pd.DataFrame(data_source)
|
|
|
|
return self._normalize_observation_frame(
|
|
observation_df=observation_df, sensor_nodes=sensor_nodes
|
|
)
|
|
|
|
def process(
|
|
self,
|
|
observed_pressure_data: PressureDataInput,
|
|
*,
|
|
sensor_nodes: list[str] | None = None,
|
|
) -> np.ndarray:
|
|
"""
|
|
对输入压力序列按天切片,并提取每天末时刻的高频特征。
|
|
|
|
`observed_pressure_data` 的格式与 `load_data()` 一致,统一要求:
|
|
- 数据必须表示为“行=时间点、列=传感器”。
|
|
- 总行数必须是 `points_per_day` 的整数倍。
|
|
- 至少需要 2 天数据,即总行数 `>= 2 * points_per_day`。
|
|
|
|
例如:
|
|
- 当 `points_per_day=1440` 时,15 天数据的形状通常为 `(21600, 传感器数)`。
|
|
- 若传入 `sensor_nodes=["J1", "J2"]`,则输入中必须存在 `J1/J2` 两列。
|
|
|
|
返回:
|
|
- `np.ndarray`,形状为 `(天数, 传感器数)`,
|
|
每个值表示对应传感器在当天末时刻提取出的高频分量。
|
|
"""
|
|
observation_df = self.load_data(
|
|
observed_pressure_data,
|
|
sensor_nodes=sensor_nodes,
|
|
)
|
|
matrix = observation_df.to_numpy(dtype=float)
|
|
total_points, sensor_count = matrix.shape
|
|
if sensor_count == 0:
|
|
raise ValueError("压力观测数据中未找到可用传感器列。")
|
|
if total_points < self.points_per_day * 2:
|
|
raise ValueError("至少需要 2 天的观测数据才能执行爆管侦测。")
|
|
if total_points % self.points_per_day != 0:
|
|
raise ValueError("观测数据长度必须能被每日采样点数整除,以便按天切分。")
|
|
|
|
day_count = total_points // self.points_per_day
|
|
high_freq_features = np.zeros((day_count, sensor_count), dtype=float)
|
|
|
|
for sensor_idx in range(sensor_count):
|
|
sensor_series = matrix[:, sensor_idx]
|
|
for day_idx in range(day_count):
|
|
start = day_idx * self.points_per_day
|
|
end = (day_idx + 1) * self.points_per_day
|
|
day_data = sensor_series[start:end]
|
|
mirrored_data = np.concatenate([day_data, day_data[::-1]])
|
|
transformed = fft(mirrored_data)
|
|
transformed[self.mu : len(mirrored_data) - self.mu + 1] = 0
|
|
low_freq = ifft(transformed).real
|
|
high_freq = day_data - low_freq[: self.points_per_day]
|
|
high_freq_features[day_idx, sensor_idx] = float(high_freq[-1])
|
|
|
|
self.data = matrix
|
|
self.sensor_names = [str(column) for column in observation_df.columns]
|
|
self.high_freq_features = high_freq_features
|
|
return high_freq_features
|
|
|
|
def detect(self) -> pd.DataFrame:
|
|
if self.high_freq_features is None:
|
|
raise ValueError("特征未提取。请先调用 process()。")
|
|
|
|
day_count = self.high_freq_features.shape[0]
|
|
if day_count < 2:
|
|
raise ValueError("孤立森林至少需要 2 天特征数据。")
|
|
|
|
clf = IsolationForest(
|
|
n_estimators=self.iforest_params.get("n_estimators", 50),
|
|
max_samples=day_count,
|
|
random_state=self.iforest_params.get("random_state", 42),
|
|
contamination=self.iforest_params.get("contamination", "auto"),
|
|
**{
|
|
key: value
|
|
for key, value in self.iforest_params.items()
|
|
if key not in {"n_estimators", "random_state", "contamination"}
|
|
},
|
|
)
|
|
clf.fit(self.high_freq_features)
|
|
|
|
scores = clf.decision_function(self.high_freq_features)
|
|
predictions = clf.predict(self.high_freq_features)
|
|
result_df = pd.DataFrame(
|
|
{
|
|
"Day": range(1, day_count + 1),
|
|
"Score": scores.astype(float),
|
|
"Prediction": predictions.astype(int),
|
|
}
|
|
)
|
|
result_df["IsBurst"] = result_df["Prediction"].eq(-1)
|
|
result_df.attrs["sensor_nodes"] = self.sensor_names.copy()
|
|
result_df.attrs["high_freq_features"] = self.high_freq_features.copy()
|
|
result_df.attrs["day_count"] = day_count
|
|
result_df.attrs["points_per_day"] = self.points_per_day
|
|
result_df.attrs["sample_count"] = (
|
|
int(self.data.shape[0]) if self.data is not None else 0
|
|
)
|
|
return result_df
|
|
|
|
def run_detection(
|
|
self,
|
|
observed_pressure_data: PressureDataInput,
|
|
*,
|
|
sensor_nodes: list[str] | None = None,
|
|
) -> pd.DataFrame:
|
|
"""
|
|
执行完整爆管侦测流程。
|
|
|
|
输入格式与 `process()` 相同:
|
|
- `DataFrame` / `dict[str, list[Any]]` / `list[dict[str, Any]]` / `list[list[Any]]` / `np.ndarray`
|
|
- 行表示时间点,列表示传感器
|
|
- 总行数必须能被 `points_per_day` 整除
|
|
|
|
返回结果包含列:
|
|
- `Day`: 第几天(从 1 开始)
|
|
- `Score`: IsolationForest 异常分数,越小越异常
|
|
- `Prediction`: `-1` 表示异常,`1` 表示正常
|
|
- `IsBurst`: 是否判定为异常日
|
|
"""
|
|
self.process(observed_pressure_data, sensor_nodes=sensor_nodes)
|
|
return self.detect()
|
|
|
|
@staticmethod
|
|
def _normalize_observation_frame(
|
|
*,
|
|
observation_df: pd.DataFrame,
|
|
sensor_nodes: list[str] | None,
|
|
) -> pd.DataFrame:
|
|
if observation_df.empty:
|
|
raise ValueError("压力观测数据为空。")
|
|
|
|
normalized_df = observation_df.copy()
|
|
normalized_df.columns = [str(column) for column in normalized_df.columns]
|
|
normalized_df = normalized_df.drop(
|
|
columns=[
|
|
column
|
|
for column in normalized_df.columns
|
|
if column.lower() in IGNORED_OBSERVATION_COLUMNS
|
|
or column.lower().startswith("unnamed:")
|
|
],
|
|
errors="ignore",
|
|
)
|
|
|
|
if sensor_nodes:
|
|
selected_columns = [str(node) for node in sensor_nodes]
|
|
missing_columns = [
|
|
column
|
|
for column in selected_columns
|
|
if column not in normalized_df.columns
|
|
]
|
|
if missing_columns:
|
|
preview = ", ".join(missing_columns[:10])
|
|
raise ValueError(f"观测数据缺少传感器列: {preview}")
|
|
normalized_df = normalized_df.loc[:, selected_columns]
|
|
else:
|
|
candidate_df = normalized_df.apply(pd.to_numeric, errors="coerce")
|
|
normalized_df = candidate_df.loc[:, candidate_df.notna().any(axis=0)]
|
|
|
|
if normalized_df.empty:
|
|
raise ValueError("未识别到可用的数值型压力观测列。")
|
|
|
|
normalized_df = normalized_df.apply(pd.to_numeric, errors="coerce")
|
|
invalid_columns = [
|
|
column
|
|
for column in normalized_df.columns
|
|
if normalized_df[column].isna().any()
|
|
]
|
|
if invalid_columns:
|
|
preview = ", ".join(invalid_columns[:10])
|
|
raise ValueError(f"压力观测数据包含非数值或缺失值: {preview}")
|
|
|
|
return normalized_df.reset_index(drop=True)
|