From 3b712ea467c97f59eb76c4e7c6ae948c2c31c08b Mon Sep 17 00:00:00 2001 From: Jiang Date: Fri, 17 Apr 2026 17:21:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BC=A0=E6=84=9F=E5=99=A8?= =?UTF-8?q?=E5=B8=83=E7=BD=AE=E7=AE=97=E6=B3=95=EF=BC=8C=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E6=9B=B4=E6=96=B0=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/algorithms/cleaning/pressure.py | 654 +++++++++++++----- app/algorithms/sensor/kmeans.py | 88 +-- app/algorithms/sensor/sensitivity.py | 136 ++-- app/api/v1/endpoints/simulation.py | 4 +- .../db/timescaledb/repositories/scada.py | 9 +- tests/unit/test_pressure_cleaning.py | 108 +++ tests/unit/test_scada_repository.py | 87 +++ 7 files changed, 795 insertions(+), 291 deletions(-) create mode 100644 tests/unit/test_pressure_cleaning.py create mode 100644 tests/unit/test_scada_repository.py diff --git a/app/algorithms/cleaning/pressure.py b/app/algorithms/cleaning/pressure.py index 6fc545e..2287ba3 100644 --- a/app/algorithms/cleaning/pressure.py +++ b/app/algorithms/cleaning/pressure.py @@ -1,18 +1,435 @@ import pandas as pd import numpy as np import matplotlib.pyplot as plt -from sklearn.cluster import KMeans -from sklearn.impute import SimpleImputer import os -from app.algorithms._utils import fill_time_gaps +ID_LIKE_COLUMNS = { + "id", + "device_id", + "node_id", + "sensor_id", + "monitor_id", + "junction_id", +} + + +def _normalize_time_frame(data: pd.DataFrame) -> pd.DataFrame: + """返回按时间排序的副本,并尽量将 time 列解析为时间类型。""" + data = data.copy() + if "time" in data.columns: + data["time"] = pd.to_datetime(data["time"], errors="coerce") + data = data.sort_values(["time"]).reset_index(drop=True) + return data + + +def _select_pressure_columns(data: pd.DataFrame) -> tuple[list[str], list[str]]: + """区分需要清洗的数值列与需要原样保留的列。""" + value_cols: list[str] = [] + keep_cols: list[str] = [] + for col in data.columns: + if col == "time": + continue + col_key = col.lower() + if col_key in ID_LIKE_COLUMNS or col_key.endswith("_id"): + keep_cols.append(col) + continue + numeric = pd.to_numeric(data[col], errors="coerce") + if numeric.notna().sum() == 0 or numeric.nunique(dropna=True) <= 1: + keep_cols.append(col) + else: + value_cols.append(col) + return value_cols, keep_cols + + +def _robust_scale(values: pd.Series) -> float: + """基于 MAD 计算稳健尺度。""" + series = pd.to_numeric(values, errors="coerce").dropna() + if series.empty: + return 1.0 + median = series.median() + mad = (series - median).abs().median() + if pd.notna(mad) and mad > 0: + return float(1.4826 * mad) + iqr = series.quantile(0.75) - series.quantile(0.25) + if pd.notna(iqr) and iqr > 0: + return float(iqr / 1.349) + std = series.std() + if pd.notna(std) and std > 0: + return float(std) + return 1.0 + + +def _shrink_toward_baseline(observed: float, baseline: float, scale: float) -> float: + """把观测值向基线值收缩,scale 越小,修复越强。""" + if pd.isna(observed): + return baseline + if pd.isna(baseline): + return observed + diff = observed - baseline + weight = scale / (abs(diff) + scale) + return float(baseline + diff * weight) + + +def _infer_time_frequency(time_values: pd.Series | pd.Index) -> pd.Timedelta: + """从时间序列中推断采样频率,失败时默认 15 分钟。""" + parsed = pd.to_datetime(pd.Series(time_values), errors="coerce").dropna().sort_values() + if len(parsed) < 2: + return pd.Timedelta(minutes=15) + + diffs = parsed.diff().dropna() + diffs = diffs[diffs > pd.Timedelta(0)] + if diffs.empty: + return pd.Timedelta(minutes=15) + + mode = diffs.mode() + return mode.iloc[0] if not mode.empty else diffs.median() + + +def _build_local_pressure_baseline(series: pd.Series) -> pd.Series: + """基于局部插值与中值滤波构造平滑基线。""" + baseline = _safe_time_interpolate(series) + baseline = baseline.rolling(window=5, center=True, min_periods=1).median() + baseline = _safe_time_interpolate(baseline) + return baseline.ffill().bfill() + + +def _build_seasonal_pressure_baseline(series: pd.Series) -> pd.Series: + """按一天内的同一时刻构造季节性基线,适合日周期压力数据。""" + if not isinstance(series.index, pd.DatetimeIndex): + return pd.Series(np.nan, index=series.index, dtype=float) + + slot_labels = pd.Series(series.index.strftime("%H:%M:%S"), index=series.index) + return series.groupby(slot_labels).transform("median") + + +def _detect_pressure_spikes(series: pd.Series, local_baseline: pd.Series) -> pd.Series: + """识别单点异常上升/下降尖峰,避免过度修正正常波动。""" + residual = series - local_baseline + neighbor_center = (series.shift(1) + series.shift(-1)) / 2 + curvature = series - neighbor_center + + residual_scale = max(_robust_scale(residual), 1e-6) + curvature_scale = max(_robust_scale(curvature), 1e-6) + direction_flip = ((series - series.shift(1)) * (series.shift(-1) - series) < 0).fillna(False) + + return ( + residual.abs() > 3.5 * residual_scale + ) & ( + curvature.abs() > 3.0 * curvature_scale + ) & direction_flip + + +def _fill_pressure_gaps( + original: pd.Series, + repaired: pd.Series, + local_baseline: pd.Series, + seasonal_baseline: pd.Series, +) -> pd.Series: + """短缺口用局部插值,长缺口优先使用同一时刻的季节性轨迹。""" + missing_mask = original.isna() + if not missing_mask.any(): + return repaired + + gap_groups = (missing_mask != missing_mask.shift(fill_value=False)).cumsum() + gap_lengths = missing_mask.groupby(gap_groups).transform("sum").where(missing_mask, 0) + + filled = repaired.copy() + short_gap_mask = missing_mask & (gap_lengths < 4) + long_gap_mask = missing_mask & ~short_gap_mask + + filled[short_gap_mask] = local_baseline[short_gap_mask] + long_gap_fill = seasonal_baseline.where(seasonal_baseline.notna(), local_baseline) + filled[long_gap_mask] = long_gap_fill[long_gap_mask] + return filled + + +def _clean_pressure_series(series: pd.Series) -> pd.Series: + """清洗单个压力时间序列。""" + series = pd.to_numeric(series, errors="coerce").astype(float) + local_baseline = _build_local_pressure_baseline(series) + spike_mask = _detect_pressure_spikes(series, local_baseline) + + repaired = series.copy() + repaired[spike_mask] = local_baseline[spike_mask] + + seasonal_baseline = _build_seasonal_pressure_baseline(repaired) + repaired = _fill_pressure_gaps(series, repaired, local_baseline, seasonal_baseline) + + if repaired.isna().any(): + repaired = repaired.where(repaired.notna(), local_baseline) + return repaired.ffill().bfill() + + +def _format_time_column(data: pd.DataFrame) -> pd.DataFrame: + """统一输出时间格式,方便下游直接按 ISO 字符串解析。""" + if "time" not in data.columns: + return data + + formatted = data.copy() + time_values = pd.to_datetime(formatted["time"], errors="coerce") + if time_values.isna().all(): + return formatted + + if time_values.dt.tz is not None: + time_strings = time_values.dt.strftime("%Y-%m-%dT%H:%M:%S%z") + time_strings = time_strings.str.replace( + r"([+-]\d{2})(\d{2})$", + r"\1:\2", + regex=True, + ) + else: + time_strings = time_values.dt.strftime("%Y-%m-%dT%H:%M:%S") + + formatted["time"] = time_strings.where(time_values.notna(), formatted["time"]) + return formatted + + +def _expand_snapshot_time_grid(data: pd.DataFrame, freq: pd.Timedelta) -> pd.DataFrame: + """仅补齐时间轴,不提前填充值,避免长缺口丢失原始形状特征。""" + expanded = data.copy() + expanded["time"] = pd.to_datetime(expanded["time"], errors="coerce") + expanded = expanded.dropna(subset=["time"]).sort_values("time") + if expanded.empty: + return data + + indexed = expanded.set_index("time") + full_index = pd.date_range(indexed.index.min(), indexed.index.max(), freq=freq) + indexed = indexed.reindex(full_index) + indexed.index.name = "time" + return indexed.reset_index() + + +def _safe_datetime_index(values: pd.Series | pd.Index | list[object]) -> pd.DatetimeIndex | None: + """尽量把时间值标准化为 DatetimeIndex;失败则返回 None。""" + parsed = pd.to_datetime(values, errors="coerce") + try: + datetime_index = pd.DatetimeIndex(parsed) + except (TypeError, ValueError): + return None + + if datetime_index.isna().all(): + return None + return datetime_index + + +def _safe_time_interpolate(series: pd.Series) -> pd.Series: + """仅在索引确实是 DatetimeIndex 时使用 time interpolation。""" + if isinstance(series.index, pd.DatetimeIndex): + return series.interpolate(method="time", limit_direction="both") + return series.interpolate(limit_direction="both") + + +def _detect_long_form_identifier(data: pd.DataFrame, value_cols: list[str], keep_cols: list[str]) -> str | None: + """识别 time/id/value 长表结构。""" + if "time" not in data.columns or len(value_cols) != 1: + return None + + identifier_candidates = [ + col + for col in keep_cols + if col.lower() in ID_LIKE_COLUMNS or col.lower().endswith("_id") + ] + if len(identifier_candidates) != 1: + return None + if not data["time"].duplicated().any(): + return None + return identifier_candidates[0] + + +def _clean_long_form_pressure( + data: pd.DataFrame, + value_col: str, + identifier_col: str, + keep_cols: list[str], + fill_gaps: bool, +) -> pd.DataFrame: + """按测点拆分 long-form 压力数据,再逐列清洗后恢复原结构。""" + data = _normalize_time_frame(data) + wide_df = ( + data[[identifier_col, "time", value_col]] + .pivot(index="time", columns=identifier_col, values=value_col) + .reset_index() + ) + + sensor_cols = [col for col in wide_df.columns if col != "time"] + cleaned_wide = _clean_snapshot_pressure(wide_df, sensor_cols, keep_cols=[], fill_gaps=fill_gaps) + + cleaned_long = cleaned_wide.melt( + id_vars="time", + var_name=identifier_col, + value_name=value_col, + ) + + passthrough_cols = [col for col in keep_cols if col != identifier_col] + if passthrough_cols: + metadata = data[[identifier_col] + passthrough_cols].drop_duplicates(subset=[identifier_col]) + cleaned_long = cleaned_long.merge(metadata, on=identifier_col, how="left") + + try: + cleaned_long[identifier_col] = cleaned_long[identifier_col].astype(data[identifier_col].dtype) + except (TypeError, ValueError): + pass + + cleaned_long = cleaned_long.sort_values(["time", identifier_col]).reset_index(drop=True) + ordered_cols = ["time", identifier_col] + passthrough_cols + [value_col] + cleaned_long = cleaned_long[[col for col in ordered_cols if col in cleaned_long.columns]] + return cleaned_long + + +def _build_time_slot_frame( + data: pd.DataFrame, value_col: str, expected_slots: int +) -> pd.DataFrame: + """把重复时间点整理成 time x slot 的矩阵。""" + grouped = data.groupby("time", sort=True) + times = list(grouped.groups.keys()) + slot_frame = pd.DataFrame(index=pd.Index(times, name="time"), columns=range(expected_slots), dtype=float) + + for time_value, group in grouped: + values = pd.to_numeric(group[value_col], errors="coerce").tolist() + for slot_idx, value in enumerate(values[:expected_slots]): + slot_frame.loc[time_value, slot_idx] = value + return slot_frame + + +def _slot_baseline(slot_frame: pd.DataFrame) -> pd.DataFrame: + """对每个槽位做时间插值和平滑,得到基线轨迹。""" + baseline = pd.DataFrame(index=slot_frame.index, columns=slot_frame.columns, dtype=float) + for col in slot_frame.columns: + series = slot_frame[col].astype(float) + series = _safe_time_interpolate(series) + series = series.rolling(window=5, center=True, min_periods=1).median() + series = _safe_time_interpolate(series).ffill().bfill() + baseline[col] = series + return baseline + + +def _choose_insertion_position( + observed: list[float], baseline_row: pd.Series, expected_slots: int +) -> int: + """为少一个观测值的时间组选择最合理的插入位置。""" + missing_count = expected_slots - len(observed) + if missing_count <= 0: + return 0 + + best_pos = 0 + best_cost = float("inf") + for insert_pos in range(expected_slots): + cost = 0.0 + obs_idx = 0 + for slot_idx in range(expected_slots): + if slot_idx == insert_pos: + continue + obs_value = observed[obs_idx] + base_value = float(baseline_row.iloc[slot_idx]) + if pd.notna(obs_value) and pd.notna(base_value): + cost += abs(obs_value - base_value) + obs_idx += 1 + if cost < best_cost: + best_cost = cost + best_pos = insert_pos + return best_pos + + +def _clean_repeated_timestamp_pressure( + data: pd.DataFrame, value_col: str, keep_cols: list[str] +) -> pd.DataFrame: + """针对同一时间点重复采样的压力数据进行修复。""" + data = _normalize_time_frame(data) + grouped_sizes = data.groupby("time").size() + if grouped_sizes.empty: + return data + + expected_slots = int(grouped_sizes.mode().iloc[0]) if not grouped_sizes.mode().empty else int(grouped_sizes.max()) + expected_slots = max(expected_slots, int(grouped_sizes.max())) + slot_frame = _build_time_slot_frame(data, value_col, expected_slots) + baseline_frame = _slot_baseline(slot_frame) + + residuals = slot_frame - baseline_frame + slot_scales = { + col: max(_robust_scale(residuals[col]), 1e-6) for col in residuals.columns + } + + cleaned_rows: list[dict[str, object]] = [] + grouped = data.groupby("time", sort=True) + for time_value, group in grouped: + observed_values = pd.to_numeric(group[value_col], errors="coerce").tolist() + baseline_row = baseline_frame.loc[time_value] + insert_pos = _choose_insertion_position(observed_values, baseline_row, expected_slots) + + cleaned_values: list[float] = [] + obs_idx = 0 + for slot_idx in range(expected_slots): + if slot_idx == insert_pos and len(observed_values) < expected_slots: + cleaned_values.append(float(baseline_row.iloc[slot_idx])) + continue + + if obs_idx >= len(observed_values): + cleaned_values.append(float(baseline_row.iloc[slot_idx])) + continue + + observed = observed_values[obs_idx] + baseline = float(baseline_row.iloc[slot_idx]) + cleaned_values.append( + _shrink_toward_baseline(observed, baseline, slot_scales.get(slot_idx, 1.0)) + ) + obs_idx += 1 + + # 其余字段原样保留;常量列(如 id)直接复制第一条记录即可 + template_row = group.iloc[0].to_dict() + for slot_idx, cleaned_value in enumerate(cleaned_values): + row = dict(template_row) + row["time"] = time_value + row[value_col] = cleaned_value + cleaned_rows.append(row) + + cleaned_df = pd.DataFrame(cleaned_rows) + cleaned_df = cleaned_df.sort_values(["time"]).reset_index(drop=True) + ordered_cols = ["time"] + keep_cols + [value_col] + ordered_cols = [col for col in ordered_cols if col in cleaned_df.columns] + remaining_cols = [col for col in cleaned_df.columns if col not in ordered_cols] + cleaned_df = cleaned_df[ordered_cols + remaining_cols] + return _format_time_column(cleaned_df) + + +def _clean_snapshot_pressure( + data: pd.DataFrame, value_cols: list[str], keep_cols: list[str], fill_gaps: bool +) -> pd.DataFrame: + """针对单条时间序列或多列快照数据进行稳健修复。""" + data = _normalize_time_frame(data) + if fill_gaps and "time" in data.columns: + freq = _infer_time_frequency(data["time"]) + data = _expand_snapshot_time_grid(data, freq) + data["time"] = pd.to_datetime(data["time"], errors="coerce") + data = data.sort_values(["time"]).reset_index(drop=True) + + cleaned_df = data.copy() + time_index = ( + _safe_datetime_index(cleaned_df["time"]) + if "time" in cleaned_df.columns + else None + ) + if time_index is None: + time_index = pd.RangeIndex(start=0, stop=len(cleaned_df)) + for col in value_cols: + series = pd.Series( + pd.to_numeric(cleaned_df[col], errors="coerce").to_numpy(), + index=time_index, + dtype=float, + ) + cleaned_df[col] = _clean_pressure_series(series).to_numpy() + + ordered_cols = ["time"] + keep_cols + value_cols + ordered_cols = [col for col in ordered_cols if col in cleaned_df.columns] + remaining_cols = [col for col in cleaned_df.columns if col not in ordered_cols] + cleaned_df = cleaned_df[ordered_cols + remaining_cols] + return _format_time_column(cleaned_df) def clean_pressure_data_km( input_csv_path: str, show_plot: bool = False, fill_gaps: bool = True ) -> str: """ - 读取输入 CSV,基于 KMeans 检测异常并用滚动平均修复。输出为 _cleaned.xlsx(同目录)。 + 读取输入 CSV,基于时间结构进行稳健修复。输出为 _cleaned.xlsx(同目录)。 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 返回输出文件的绝对路径。 @@ -24,80 +441,38 @@ def clean_pressure_data_km( # 读取 CSV input_csv_path = os.path.abspath(input_csv_path) data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8") + data = _normalize_time_frame(data) + value_cols, keep_cols = _select_pressure_columns(data) + has_repeated_time = "time" in data.columns and data["time"].duplicated().any() + identifier_col = _detect_long_form_identifier(data, value_cols, keep_cols) - # 补齐时间缺口(如果数据包含 time 列) - if fill_gaps and "time" in data.columns: - data = fill_time_gaps( - data, time_col="time", freq="1min", short_gap_threshold=10 + if identifier_col is not None: + data_repaired = _clean_long_form_pressure( + data, + value_cols[0], + identifier_col, + keep_cols, + fill_gaps, ) + elif has_repeated_time and len(value_cols) == 1: + data_repaired = _clean_repeated_timestamp_pressure(data, value_cols[0], keep_cols) + else: + data_repaired = _clean_snapshot_pressure(data, value_cols, keep_cols, fill_gaps) - # 分离时间列和数值列 - time_col_data = None - if "time" in data.columns: - time_col_data = data["time"] - data = data.drop(columns=["time"]) - # 标准化 - data_norm = (data - data.mean()) / data.std() - - # 聚类与异常检测 - k = 3 - kmeans = KMeans(n_clusters=k, init="k-means++", n_init=50, random_state=42) - clusters = kmeans.fit_predict(data_norm) - centers = kmeans.cluster_centers_ - - distances = np.linalg.norm(data_norm.values - centers[clusters], axis=1) - threshold = distances.mean() + 3 * distances.std() - - anomaly_pos = np.where(distances > threshold)[0] - anomaly_indices = data.index[anomaly_pos] - - anomaly_details = {} - for pos in anomaly_pos: - row_norm = data_norm.iloc[pos] - cluster_idx = clusters[pos] - center = centers[cluster_idx] - diff = abs(row_norm - center) - main_sensor = diff.idxmax() - anomaly_details[data.index[pos]] = main_sensor - - # 修复:滚动平均(窗口可调) - data_rolled = data.rolling(window=13, center=True, min_periods=1).mean() - data_repaired = data.copy() - for pos in anomaly_pos: - label = data.index[pos] - sensor = anomaly_details[label] - data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] - - # 可选可视化(使用位置作为 x 轴) + # 可选可视化(只展示首个数值列) plt.rcParams["font.sans-serif"] = ["SimHei"] plt.rcParams["axes.unicode_minus"] = False - - if show_plot and len(data.columns) > 0: - n = len(data) - time = np.arange(n) - plt.figure(figsize=(12, 8)) - for col in data.columns: - plt.plot(time, data[col].values, marker="o", markersize=3, label=col) - for pos in anomaly_pos: - sensor = anomaly_details[data.index[pos]] - plt.plot(pos, data.iloc[pos][sensor], "ro", markersize=8) - plt.xlabel("时间点(序号)") + if show_plot and value_cols: + plot_col = value_cols[0] + if "time" in data_repaired.columns: + x = pd.to_datetime(data_repaired["time"], errors="coerce") + else: + x = np.arange(len(data_repaired)) + plt.figure(figsize=(12, 6)) + plt.plot(x, pd.to_numeric(data_repaired[plot_col], errors="coerce"), label="cleaned") + plt.xlabel("时间" if "time" in data_repaired.columns else "序号") plt.ylabel("压力监测值") - plt.title("各传感器折线图(红色标记主要异常点)") - plt.legend() - plt.show() - - plt.figure(figsize=(12, 8)) - for col in data_repaired.columns: - plt.plot( - time, data_repaired[col].values, marker="o", markersize=3, label=col - ) - for pos in anomaly_pos: - sensor = anomaly_details[data.index[pos]] - plt.plot(pos, data_repaired.iloc[pos][sensor], "go", markersize=8) - plt.xlabel("时间点(序号)") - plt.ylabel("修复后压力监测值") - plt.title("修复后各传感器折线图(绿色标记修复值)") + plt.title(f"{plot_col} 清洗结果") plt.legend() plt.show() @@ -110,9 +485,6 @@ def clean_pressure_data_km( # 如果原始数据包含时间列,将其添加回结果 data_for_save = data.copy() data_repaired_for_save = data_repaired.copy() - if time_col_data is not None: - data_for_save.insert(0, "time", time_col_data) - data_repaired_for_save.insert(0, "time", time_col_data) if os.path.exists(output_path): os.remove(output_path) # 覆盖同名文件 @@ -126,10 +498,10 @@ def clean_pressure_data_km( return os.path.abspath(output_path) -def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> dict: +def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> pd.DataFrame: """ - 接收一个 DataFrame 数据结构,使用KMeans聚类检测异常并用滚动平均修复。 - 返回清洗后的字典数据结构。 + 接收一个 DataFrame 数据结构,使用时间感知的稳健修复方法清洗压力数据。 + 返回清洗后的 DataFrame。 Args: data: 输入 DataFrame(可包含 time 列) @@ -137,113 +509,37 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di """ # 使用传入的 DataFrame data = data.copy() + data = _normalize_time_frame(data) + value_cols, keep_cols = _select_pressure_columns(data) + has_repeated_time = "time" in data.columns and data["time"].duplicated().any() + identifier_col = _detect_long_form_identifier(data, value_cols, keep_cols) - # 补齐时间缺口(如果启用且数据包含 time 列) - data_filled = fill_time_gaps( - data, time_col="time", freq="1min", short_gap_threshold=10 - ) + if identifier_col is not None: + data_repaired = _clean_long_form_pressure( + data, + value_cols[0], + identifier_col, + keep_cols, + fill_gaps=True, + ) + elif has_repeated_time and len(value_cols) == 1: + data_repaired = _clean_repeated_timestamp_pressure(data, value_cols[0], keep_cols) + else: + data_repaired = _clean_snapshot_pressure(data, value_cols, keep_cols, fill_gaps=True) - # 保存 time 列用于最后合并 - time_col_series = None - if "time" in data_filled.columns: - time_col_series = data_filled["time"] - - # 移除 time 列用于后续清洗 - data_filled = data_filled.drop(columns=["time"]) - - # 标准化(使用填充后的数据) - data_norm = (data_filled - data_filled.mean()) / data_filled.std() - - # 添加:处理标准化后的 NaN(例如,标准差为0的列),防止异常数据,时间段内所有数据都相同导致计算结果为 NaN - imputer = SimpleImputer( - strategy="constant", fill_value=0, keep_empty_features=True - ) # 用 0 填充 NaN,包括全 NaN,并保留空特征 - data_norm = pd.DataFrame( - imputer.fit_transform(data_norm), - columns=data_norm.columns, - index=data_norm.index, - ) - - # 聚类与异常检测 - k = 3 - kmeans = KMeans(n_clusters=k, init="k-means++", n_init=50, random_state=42) - clusters = kmeans.fit_predict(data_norm) - centers = kmeans.cluster_centers_ - - distances = np.linalg.norm(data_norm.values - centers[clusters], axis=1) - threshold = distances.mean() + 3 * distances.std() - - anomaly_pos = np.where(distances > threshold)[0] - anomaly_indices = data_filled.index[anomaly_pos] - - anomaly_details = {} - for pos in anomaly_pos: - row_norm = data_norm.iloc[pos] - cluster_idx = clusters[pos] - center = centers[cluster_idx] - diff = abs(row_norm - center) - main_sensor = diff.idxmax() - anomaly_details[data_filled.index[pos]] = main_sensor - - # 修复:滚动平均(窗口可调) - data_rolled = data_filled.rolling(window=13, center=True, min_periods=1).mean() - data_repaired = data_filled.copy() - for pos in anomaly_pos: - label = data_filled.index[pos] - sensor = anomaly_details[label] - data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] - - # 可选可视化(使用位置作为 x 轴) - plt.rcParams["font.sans-serif"] = ["SimHei"] - plt.rcParams["axes.unicode_minus"] = False - - if show_plot and len(data.columns) > 0: - n = len(data) - time = np.arange(n) - n_filled = len(data_filled) - time_filled = np.arange(n_filled) - plt.figure(figsize=(12, 8)) - for col in data.columns: - plt.plot( - time, data[col].values, marker="o", markersize=3, label=col, alpha=0.5 - ) - for col in data_filled.columns: - plt.plot( - time_filled, - data_filled[col].values, - marker="x", - markersize=3, - label=f"{col}_filled", - linestyle="--", - ) - for pos in anomaly_pos: - sensor = anomaly_details[data_filled.index[pos]] - plt.plot(pos, data_filled.iloc[pos][sensor], "ro", markersize=8) - plt.xlabel("时间点(序号)") + if show_plot and value_cols: + plt.rcParams["font.sans-serif"] = ["SimHei"] + plt.rcParams["axes.unicode_minus"] = False + plot_col = value_cols[0] + x = pd.to_datetime(data_repaired["time"], errors="coerce") if "time" in data_repaired.columns else np.arange(len(data_repaired)) + plt.figure(figsize=(12, 6)) + plt.plot(x, pd.to_numeric(data_repaired[plot_col], errors="coerce"), label="cleaned") + plt.xlabel("时间" if "time" in data_repaired.columns else "序号") plt.ylabel("压力监测值") - plt.title("各传感器折线图(红色标记主要异常点,虚线为0值填充后)") + plt.title(f"{plot_col} 清洗结果") plt.legend() plt.show() - plt.figure(figsize=(12, 8)) - for col in data_repaired.columns: - plt.plot( - time_filled, data_repaired[col].values, marker="o", markersize=3, label=col - ) - for pos in anomaly_pos: - sensor = anomaly_details[data_filled.index[pos]] - plt.plot(pos, data_repaired.iloc[pos][sensor], "go", markersize=8) - plt.xlabel("时间点(序号)") - plt.ylabel("修复后压力监测值") - plt.title("修复后各传感器折线图(绿色标记修复值)") - plt.legend() - plt.show() - - # 将 time 列添加回结果 - if time_col_series is not None: - data_repaired.insert(0, "time", time_col_series) - - # 返回清洗后的字典 return data_repaired diff --git a/app/algorithms/sensor/kmeans.py b/app/algorithms/sensor/kmeans.py index e30c70e..84c37d3 100644 --- a/app/algorithms/sensor/kmeans.py +++ b/app/algorithms/sensor/kmeans.py @@ -6,104 +6,66 @@ import sklearn.cluster import os - class QD_KMeans(object): def __init__(self, wn, num_monitors): # self.inp = inp - self.cluster_num = num_monitors # 聚类中心个数,也即测压点个数 - self.wn=wn + self.cluster_num = num_monitors # 聚类中心个数,也即测压点个数 + self.wn = wn self.monitor_nodes = [] self.coords = [] self.junction_nodes = {} # Added missing initialization - def get_junctions_coordinates(self): - - for junction_name in self.wn.junction_name_list: + + for junction_name in self.wn.junction_name_list: junction = self.wn.get_node(junction_name) self.junction_nodes[junction_name] = junction.coordinates - self.coords.append(junction.coordinates ) + self.coords.append(junction.coordinates) - # print(f"Total junctions: {self.junction_coordinates}") + # print(f"Total junctions: {self.junction_coordinates}") def select_monitoring_points(self): if not self.coords: # Add check if coordinates are collected self.get_junctions_coordinates() coords = np.array(self.coords) - coords_normalized = (coords - coords.min(axis=0)) / (coords.max(axis=0) - coords.min(axis=0)) - kmeans = sklearn.cluster.KMeans(n_clusters= self.cluster_num, random_state=42) - kmeans.fit(coords_normalized) + coords_normalized = (coords - coords.min(axis=0)) / ( + coords.max(axis=0) - coords.min(axis=0) + ) + kmeans = sklearn.cluster.KMeans(n_clusters=self.cluster_num, random_state=42) + kmeans.fit(coords_normalized) for center in kmeans.cluster_centers_: distances = np.sum((coords_normalized - center) ** 2, axis=1) nearest_node = self.wn.junction_name_list[np.argmin(distances)] - self.monitor_nodes.append(nearest_node) + self.monitor_nodes.append(nearest_node) return self.monitor_nodes - def visualize_network(self): """Visualize network with monitoring points""" - ax=wntr.graphics.plot_network(self.wn, - node_attribute=self.monitor_nodes, - node_size=30, - title='Optimal sensor') - plt.show() + ax = wntr.graphics.plot_network( + self.wn, + node_attribute=self.monitor_nodes, + node_size=30, + title="Optimal sensor", + ) + plt.show() - - def kmeans_sensor_placement(name: str, sensor_num: int, min_diameter: int) -> list: - inp_name = f'./db_inp/{name}.db.inp' - wn= wntr.network.WaterNetworkModel(inp_name) - wn_cluster=QD_KMeans(wn, sensor_num) + inp_name = f"./db_inp/{name}.db.inp" + wn = wntr.network.WaterNetworkModel(inp_name) + wn_cluster = QD_KMeans(wn, sensor_num) # Select monitoring pointse - sensor_ids= wn_cluster.select_monitoring_points() + sensor_ids = wn_cluster.select_monitoring_points() # wn_cluster.visualize_network() return sensor_ids - if __name__ == "__main__": - #sensorindex = get_ID(name='suzhouhe_2024_cloud_0817', sensor_num=30, min_diameter=500) - sensorindex = kmeans_sensor_placement(name='szh', sensor_num=50, min_diameter=300) + # sensorindex = get_ID(name='suzhouhe_2024_cloud_0817', sensor_num=30, min_diameter=500) + sensorindex = kmeans_sensor_placement(name="szh", sensor_num=50, min_diameter=300) print(sensorindex) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/app/algorithms/sensor/sensitivity.py b/app/algorithms/sensor/sensitivity.py index 1f19925..ef09fad 100644 --- a/app/algorithms/sensor/sensitivity.py +++ b/app/algorithms/sensor/sensitivity.py @@ -20,6 +20,7 @@ import geopandas as gpd from sklearn.metrics import pairwise_distances import app.services.project_info as project_info + # 2025/03/12 # Step1: 获取节点坐标 def getCoor(wn: wntr.network.WaterNetworkModel) -> pandas.DataFrame: @@ -31,7 +32,7 @@ def getCoor(wn: wntr.network.WaterNetworkModel) -> pandas.DataFrame: # site: pandas.Series # index:节点名称(wn.node_name_list) # values:每个节点的坐标,格式为 tuple(如 (x, y) 或 (x, y, z)) - site = wn.query_node_attribute('coordinates') + site = wn.query_node_attribute("coordinates") # Coor: pandas.Series # index:与site相同(节点名称)。 # values:坐标转换为numpy.ndarray(如array([10.5, 20.3])) @@ -43,9 +44,9 @@ def getCoor(wn: wntr.network.WaterNetworkModel) -> pandas.DataFrame: x.append(Coor.values[i][0]) # 将 x 坐标存入 x 列表。 y.append(Coor.values[i][1]) # 将 y 坐标存入 y 列表 # xy: dict[str, list], x、y 坐标的字典 - xy = {'x': x, 'y': y} + xy = {"x": x, "y": y} # Coor_node: pandas.DataFrame, 存储节点 x, y 坐标的 DataFrame - Coor_node = pd.DataFrame(xy, index=wn.node_name_list, columns=['x', 'y']) + Coor_node = pd.DataFrame(xy, index=wn.node_name_list, columns=["x", "y"]) return Coor_node @@ -87,23 +88,23 @@ def skater_partition(G, n_clusters): 字典形式的聚类结果,键为区域编号,值为该区域内的节点列表。 """ # 1. 获取所有节点坐标,假设每个节点都有 'pos' 属性 - pos = nx.get_node_attributes(G, 'pos') + pos = nx.get_node_attributes(G, "pos") nodes = list(G.nodes()) # 构造坐标数组:每行为 [x, y] coords = np.array([pos[node] for node in nodes]) # 2. 构造 GeoDataFrame:创建 DataFrame 并生成 geometry 列 - df = pd.DataFrame(coords, columns=['x', 'y'], index=nodes) + df = pd.DataFrame(coords, columns=["x", "y"], index=nodes) # 利用 shapely 的 Point 构造空间位置 - df['geometry'] = df.apply(lambda row: Point(row['x'], row['y']), axis=1) - gdf = gpd.GeoDataFrame(df, geometry='geometry') + df["geometry"] = df.apply(lambda row: Point(row["x"], row["y"]), axis=1) + gdf = gpd.GeoDataFrame(df, geometry="geometry") # 3. 构造空间权重矩阵,使用 4 近邻方法(k=4,可根据实际情况调整) w = ps.weights.KNN.from_array(coords, k=4) - w.transform = 'R' + w.transform = "R" # 4. 调用 SKATER:新版本 API 要求传入 gdf, w 以及 attrs_name(这里使用 'x' 和 'y' 作为属性) - skater = Skater(gdf, w, attrs_name=['x', 'y'], n_clusters=n_clusters) + skater = Skater(gdf, w, attrs_name=["x", "y"], n_clusters=n_clusters) skater.solve() # 5. 获取聚类标签,构造成字典格式 @@ -133,24 +134,24 @@ def spectral_partition(G, n_clusters): 键为聚类标签,值为该聚类对应的节点列表。 """ # 1. 获取节点空间坐标,注意保证每个节点都有 'pos' 属性 - pos_dict = nx.get_node_attributes(G, 'pos') + pos_dict = nx.get_node_attributes(G, "pos") nodes = list(G.nodes()) coords = np.array([pos_dict[node] for node in nodes]) # 2. 计算节点之间的欧氏距离矩阵 - D = pairwise_distances(coords, metric='euclidean') + D = pairwise_distances(coords, metric="euclidean") # 3. 计算 sigma 值:这里取所有距离的均值,当然也可以根据实际情况调整 sigma = np.mean(D) # 4. 构造相似度矩阵:使用高斯核函数 # A(i, j) = exp( -d(i,j)^2 / (2*sigma^2) ) - A = np.exp(- (D ** 2) / (2 * sigma ** 2)) + A = np.exp(-(D**2) / (2 * sigma**2)) # 5. 使用谱聚类进行图分区 - clustering = SpectralClustering(n_clusters=n_clusters, - affinity='precomputed', - random_state=0) + clustering = SpectralClustering( + n_clusters=n_clusters, affinity="precomputed", random_state=0 + ) labels = clustering.fit_predict(A) # 6. 构造字典形式的分区结果 @@ -160,6 +161,7 @@ def spectral_partition(G, n_clusters): return groups + # 2025/03/12 # Step3: wn_func类,水力计算 # wn_func 主要用于计算: @@ -181,7 +183,7 @@ class wn_func(object): self.results = wntr.sim.EpanetSimulator(wn).run_sim() # 存储运行结果 self.wn = wn # self.q:pandas.DataFrame,管道流量,索引为时间步长,列为管道名称 - self.q = self.results.link['flowrate'] + self.q = self.results.link["flowrate"] # ReservoirIndex / Tankindex: list[str],水库 / 水箱节点名称列表 ReservoirIndex = wn.reservoir_name_list Tankindex = wn.tank_name_list @@ -191,7 +193,7 @@ class wn_func(object): # self.nodes: list[str],所有节点的名称 self.nodes = wn.node_name_list # self.coordinates:pandas.Series,节点坐标,索引为节点名,值为 (x, y) 坐标的 tuple - self.coordinates = wn.query_node_attribute('coordinates') + self.coordinates = wn.query_node_attribute("coordinates") # allpumps / allvalves: list[str],所有泵/阀门名称列表 allpumps = wn.pump_name_list allvalves = wn.valve_name_list @@ -222,17 +224,27 @@ class wn_func(object): # 泵的起终点、tank、reservoir # self.delnodes: list[str],需要删除的节点(包括水库、泵、阀门连接的节点) self.delnodes = list( - set(ReservoirIndex).union(Tankindex, pumpstnode, pumpednode, valvestnode, valveednode, Reservoirednode)) + set(ReservoirIndex).union( + Tankindex, + pumpstnode, + pumpednode, + valvestnode, + valveednode, + Reservoirednode, + ) + ) # 泵、起终点为tank、reservoir的管道 # self.delpipes: list[str],需要删除的管道(包括水库、泵、阀门连接的管道) - self.delpipes = list(set(wn.pump_name_list).union(wn.valve_name_list).union(Reservoirpipe)) + self.delpipes = list( + set(wn.pump_name_list).union(wn.valve_name_list).union(Reservoirpipe) + ) self.pipes = [pipe for pipe in wn.pipe_name_list if pipe not in self.delpipes] # self.L: list[float],所有管道的长度(以米为单位) - self.L = wn.query_link_attribute('length')[self.pipes].tolist() + self.L = wn.query_link_attribute("length")[self.pipes].tolist() self.n = len(self.nodes) self.m = len(self.pipes) # self.unit_headloss: list[float],单位水头损失(headloss 数据的第一行,单位:米/km) - self.unit_headloss = self.results.link['headloss'].iloc[0, :].tolist() + self.unit_headloss = self.results.link["headloss"].iloc[0, :].tolist() ## self.delnodes1 = list(set(ReservoirIndex).union(Tankindex)) @@ -245,7 +257,9 @@ class wn_func(object): end_node = wn.links[pipe].end_node.name self.less_than_min_diameter_junction_list.extend([start_node, end_node]) # 去重 - self.less_than_min_diameter_junction_list = list(set(self.less_than_min_diameter_junction_list)) + self.less_than_min_diameter_junction_list = list( + set(self.less_than_min_diameter_junction_list) + ) # Step3.2: 计算水力距离 def CtoS(self): @@ -266,7 +280,7 @@ class wn_func(object): q = self.q L = self.L # H1:pandas.DataFrame,水头数据,索引为时间步长,列为节点名 - H1 = self.results.node['head'].T + H1 = self.results.node["head"].T # hh:list[float],计算管道两端水头之差 hh = [] # 水头损失 @@ -280,8 +294,18 @@ class wn_func(object): # headloss:pandas.DataFrame,管道水头损失矩阵 headloss = pd.DataFrame(hh, index=pipes).T # s1:管道阻力系数,s2:将管道阻力系数与管道的起始节点和终止节点对应 - hf = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) - weightL = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) + hf = pd.DataFrame( + np.array([0] * (n**2)).reshape(n, n), + index=nodes, + columns=nodes, + dtype=float, + ) + weightL = pd.DataFrame( + np.array([0] * (n**2)).reshape(n, n), + index=nodes, + columns=nodes, + dtype=float, + ) # s2为对应管道起始节点与终止节点的粗糙度系数矩阵,index代表起始节点,columns代表终止节点 G = nx.DiGraph() for i in range(0, m): @@ -298,11 +322,16 @@ class wn_func(object): weightL.loc[b, a] = headloss.loc[0, pipe] * L[i] G.add_weighted_edges_from([(b, a, weightL.loc[b, a])]) - hydraulicL = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) + hydraulicL = pd.DataFrame( + np.array([0] * (n**2)).reshape(n, n), + index=nodes, + columns=nodes, + dtype=float, + ) for a in nodes: if a in G.nodes: - d = nx.shortest_path_length(G, source=a, weight='weight') + d = nx.shortest_path_length(G, source=a, weight="weight") for b in list(d.keys()): hydraulicL.loc[a, b] = d[b] @@ -331,11 +360,17 @@ class wn_func(object): for t in self.wn.tanks(): self.nonjunc_index.append(t[0]) # Conn:numpy.matrix,节点-管道连接矩阵,起点 -1,终点 1 - Conn = np.mat(np.zeros([n, m - p - v])) # 节点和管道的关系矩阵,行为节点,列为管道,起点为-1,终点为1 + Conn = np.mat( + np.zeros([n, m - p - v]) + ) # 节点和管道的关系矩阵,行为节点,列为管道,起点为-1,终点为1 # NConn:numpy.matrix,节点-节点连接矩阵,有管道相连的地方设为 1 NConn = np.mat(np.zeros([n, n])) # 节点之间的关系,之间有管道为1,反之为0 # pipes:list[str],去除泵和阀门的管道列表 - pipes = [pipe for pipe in self.wn.pipes() if pipe not in self.wn.pumps() and pipe not in self.wn.valves()] + pipes = [ + pipe + for pipe in self.wn.pipes() + if pipe not in self.wn.pumps() and pipe not in self.wn.valves() + ] for pipe_name, pipe in pipes: start = self.wn.node_name_list.index(pipe.start_node_name) end = self.wn.node_name_list.index(pipe.end_node_name) @@ -345,12 +380,21 @@ class wn_func(object): NConn[start, end] = 1 NConn[end, start] = 1 self.A = Conn - link_name_list = [link for link in self.wn.link_name_list if - link not in self.wn.pump_name_list and link not in self.wn.valve_name_list] - self.A2 = pd.DataFrame(self.A, index=self.wn.node_name_list, columns=link_name_list) + link_name_list = [ + link + for link in self.wn.link_name_list + if link not in self.wn.pump_name_list + and link not in self.wn.valve_name_list + ] + self.A2 = pd.DataFrame( + self.A, index=self.wn.node_name_list, columns=link_name_list + ) self.A2 = self.A2.drop(self.delnodes) for pipe in self.delpipes: - if pipe not in self.wn.pump_name_list and pipe not in self.wn.valve_name_list: + if ( + pipe not in self.wn.pump_name_list + and pipe not in self.wn.valve_name_list + ): self.A2 = self.A2.drop(columns=pipe) self.junc_list = self.A2.index self.A2 = np.mat(self.A2) # 节点管道关系 @@ -372,10 +416,10 @@ class wn_func(object): except EpanetException: pass finally: - h = result.link['headloss'][self.pipes].values[0] - q = result.link['flowrate'][self.pipes].values[0] - l = self.wn.query_link_attribute('length')[self.pipes] - C = self.wn.query_link_attribute('roughness')[self.pipes] + h = result.link["headloss"][self.pipes].values[0] + q = result.link["flowrate"][self.pipes].values[0] + l = self.wn.query_link_attribute("length")[self.pipes] + C = self.wn.query_link_attribute("roughness")[self.pipes] # headloss:numpy.ndarray,水头损失数组 headloss = np.array(h) # 调整流量方向 @@ -393,7 +437,7 @@ class wn_func(object): try: det = np.linalg.det(X) except RuntimeError as e: - sign, logdet = slogdet(X) # 防止溢出 + sign, logdet = slogdet(X) # 防止溢出 det = sign * np.exp(logdet) if det != 0: J_H_Cw = X.I * A * S @@ -430,7 +474,10 @@ class Sensorplacement(wn_func): """ Sensorplacement 类继承了 wn_func 类,并且用于计算和优化传感器布置的位置。 """ - def __init__(self, wn: wntr.network.WaterNetworkModel, sensornum: int, min_diameter: int): + + def __init__( + self, wn: wntr.network.WaterNetworkModel, sensornum: int, min_diameter: int + ): """ :param wn: 由wntr生成的模型 @@ -442,7 +489,9 @@ class Sensorplacement(wn_func): # 1.某个节点到所有节点的加权距离之和 # 2.某个节点到该组内所有节点的加权距离之和 - def sensor(self, SS: pandas.DataFrame, G: networkx.Graph, group: dict[int, list[str]]): + def sensor( + self, SS: pandas.DataFrame, G: networkx.Graph, group: dict[int, list[str]] + ): """ sensor 方法是用来根据灵敏度矩阵 SS 和加权图 G 来确定传感器布置位置的 :param SS: 灵敏度矩阵,每个节点的行和列代表不同节点,矩阵元素表示节点间的灵敏度。SS.iloc[i, :] 表示第 i 行对应节点 i 到所有其他节点的灵敏度 @@ -527,7 +576,7 @@ def get_ID(name: str, sensor_num: int, min_diameter: int) -> list[str]: :return: 测压点节点ID """ # inp_file_real:str,输入文件名,表示原始水力模型文件的路径,该文件格式为 EPANET 输入文件(.inp),包含管网的结构信息、节点、管道、泵等数据 - inp_file_real = f'./db_inp/{name}.db.inp' + inp_file_real = f"./db_inp/{name}.db.inp" # sensornum:int,需要布置的传感器数量 # sensornum = sensor_num # wn_real:wntr.network.WaterNetworkModel,加载 EPANET 水力模型 @@ -538,7 +587,7 @@ def get_ID(name: str, sensor_num: int, min_diameter: int) -> list[str]: results_real = sim_real.run_sim() # real_C:list[float],包含所有管道粗糙度的列表 - real_C = wn_real.query_link_attribute('roughness').tolist() + real_C = wn_real.query_link_attribute("roughness").tolist() # wn_fun1:wn_func(继承自 object),创建 wn_func 类的实例,传入 wn_real 水力模型对象。wn_func 用于计算管网相关的水力属性,比如水力距离、灵敏度等 wn_fun1 = wn_func(wn_real, min_diameter=min_diameter) # nodes:list[str],管网的节点名称列表 @@ -598,7 +647,6 @@ def get_ID(name: str, sensor_num: int, min_diameter: int) -> list[str]: sensorindex, sensorindex_2 = wn_fun.sensor(SS, G, group) # 初始的sensorindex # print(str(sensor_num), "个测压点,测压点位置:", sensorindex) - # 重新打开数据库 # if is_project_open(name=name): # close_project(name=name) @@ -637,7 +685,7 @@ def get_ID(name: str, sensor_num: int, min_diameter: int) -> list[str]: return sensorindex -if __name__ == '__main__': +if __name__ == "__main__": sensorindex = get_ID(name=project_info.name, sensor_num=20, min_diameter=300) print(sensorindex) diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index ac3a679..25d4e83 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -6,7 +6,6 @@ import shutil import threading from fastapi import APIRouter, HTTPException, File, UploadFile, Query, Path, Body from fastapi.responses import PlainTextResponse -import app.infra.db.influxdb.api as influxdb_api import app.services.simulation as simulation import app.services.globals as globals from app.services.tjnetwork import ( @@ -28,8 +27,7 @@ from app.algorithms.sensor import ( pressure_sensor_placement_sensitivity, pressure_sensor_placement_kmeans, ) -import app.algorithms.cleaning.flow as flow_data_clean -import app.algorithms.cleaning.pressure as pressure_data_clean + from app.services.network_import import network_update from app.services.simulation_ops import ( project_management, diff --git a/app/infra/db/timescaledb/repositories/scada.py b/app/infra/db/timescaledb/repositories/scada.py index bc8717f..d5a6348 100644 --- a/app/infra/db/timescaledb/repositories/scada.py +++ b/app/infra/db/timescaledb/repositories/scada.py @@ -89,12 +89,17 @@ class ScadaRepository: if field not in valid_fields: raise ValueError(f"Invalid field: {field}") - query = sql.SQL( + update_query = sql.SQL( "UPDATE scada.scada_data SET {} = %s WHERE time = %s AND device_id = %s" ).format(sql.Identifier(field)) + insert_query = sql.SQL( + "INSERT INTO scada.scada_data (time, device_id, {}) VALUES (%s, %s, %s)" + ).format(sql.Identifier(field)) async with conn.cursor() as cur: - await cur.execute(query, (value, time, device_id)) + await cur.execute(update_query, (value, time, device_id)) + if cur.rowcount == 0: + await cur.execute(insert_query, (time, device_id, value)) @staticmethod async def delete_scada_by_id_time_range( diff --git a/tests/unit/test_pressure_cleaning.py b/tests/unit/test_pressure_cleaning.py new file mode 100644 index 0000000..9ccd9d8 --- /dev/null +++ b/tests/unit/test_pressure_cleaning.py @@ -0,0 +1,108 @@ +import importlib.util +import sys +import types +from pathlib import Path + +import numpy as np +import pandas as pd + + +def _load_pressure_cleaning_module(): + project_root = Path(__file__).resolve().parents[2] + utils_path = project_root / "app" / "algorithms" / "_utils.py" + pressure_path = project_root / "app" / "algorithms" / "cleaning" / "pressure.py" + + app_module = sys.modules.setdefault("app", types.ModuleType("app")) + algorithms_module = sys.modules.setdefault( + "app.algorithms", + types.ModuleType("app.algorithms"), + ) + setattr(app_module, "algorithms", algorithms_module) + + utils_spec = importlib.util.spec_from_file_location("app.algorithms._utils", utils_path) + assert utils_spec and utils_spec.loader + utils_module = importlib.util.module_from_spec(utils_spec) + sys.modules["app.algorithms._utils"] = utils_module + utils_spec.loader.exec_module(utils_module) + + pressure_spec = importlib.util.spec_from_file_location( + "tests_pressure_under_test", + pressure_path, + ) + assert pressure_spec and pressure_spec.loader + pressure_module = importlib.util.module_from_spec(pressure_spec) + pressure_spec.loader.exec_module(pressure_module) + return pressure_module + + +def test_clean_pressure_data_df_km_repairs_long_form_pressure_series(): + module = _load_pressure_cleaning_module() + repo_root = Path(__file__).resolve().parents[3] + + raw_df = pd.read_csv(repo_root / "data" / "node_simulation.csv") + noisy_df = pd.read_csv(repo_root / "data" / "node_simulation_noisy.csv") + cleaned_df = module.clean_pressure_data_df_km(noisy_df) + + for df in (raw_df, noisy_df, cleaned_df): + df["time"] = pd.to_datetime(df["time"]) + + assert len(cleaned_df) == len(raw_df) + assert set(cleaned_df.columns) == {"time", "id", "pressure"} + assert cleaned_df["pressure"].isna().sum() == 0 + + noisy_joined = raw_df.merge(noisy_df, on=["time", "id"], how="inner", suffixes=("_raw", "_noisy")) + cleaned_joined = raw_df.merge( + cleaned_df, + on=["time", "id"], + how="inner", + suffixes=("_raw", "_clean"), + ) + + noisy_rmse = float( + np.sqrt(np.mean((noisy_joined["pressure_raw"] - noisy_joined["pressure_noisy"]) ** 2)) + ) + cleaned_rmse = float( + np.sqrt(np.mean((cleaned_joined["pressure_raw"] - cleaned_joined["pressure_clean"]) ** 2)) + ) + noisy_mae = float( + np.mean(np.abs(noisy_joined["pressure_raw"] - noisy_joined["pressure_noisy"])) + ) + cleaned_mae = float( + np.mean(np.abs(cleaned_joined["pressure_raw"] - cleaned_joined["pressure_clean"])) + ) + + assert cleaned_rmse < 0.35 + assert cleaned_rmse < noisy_rmse * 0.5 + assert cleaned_mae < noisy_mae + + repaired_gap = cleaned_df[ + (cleaned_df["id"] == 170490) + & (cleaned_df["time"] == pd.Timestamp("2026-01-01T05:00:00+08:00")) + ]["pressure"].iloc[0] + assert abs(repaired_gap - 30.62433433532715) < 1.0 + + spike_row = cleaned_df[ + (cleaned_df["id"] == 42563) + & (cleaned_df["time"] == pd.Timestamp("2026-01-01T03:45:00+08:00")) + ]["pressure"].iloc[0] + assert abs(spike_row - 28.018701553344727) < 2.0 + + +def test_clean_pressure_data_df_km_accepts_single_sensor_wide_frame_with_utc_strings(): + module = _load_pressure_cleaning_module() + repo_root = Path(__file__).resolve().parents[3] + + noisy_df = pd.read_csv(repo_root / "data" / "node_simulation_noisy.csv") + single_sensor = ( + noisy_df[noisy_df["id"] == 170490][["time", "pressure"]] + .rename(columns={"pressure": "170490"}) + .copy() + ) + single_sensor["time"] = ( + pd.to_datetime(single_sensor["time"], utc=True).dt.strftime("%Y-%m-%dT%H:%M:%SZ") + ) + + cleaned_df = module.clean_pressure_data_df_km(single_sensor) + + assert len(cleaned_df) == 192 + assert cleaned_df["170490"].isna().sum() == 0 diff --git a/tests/unit/test_scada_repository.py b/tests/unit/test_scada_repository.py new file mode 100644 index 0000000..98fbcbb --- /dev/null +++ b/tests/unit/test_scada_repository.py @@ -0,0 +1,87 @@ +from datetime import datetime, timezone +import importlib.util +from pathlib import Path + +import pytest + + +def _load_scada_repository(): + module_path = ( + Path(__file__).resolve().parents[2] + / "app" + / "infra" + / "db" + / "timescaledb" + / "repositories" + / "scada.py" + ) + spec = importlib.util.spec_from_file_location("tests_scada_repo_under_test", module_path) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module.ScadaRepository + + +class _FakeCursor: + def __init__(self, initial_rowcount: int): + self.initial_rowcount = initial_rowcount + self.rowcount = 0 + self.calls: list[tuple[str, tuple]] = [] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def execute(self, query, params): + self.calls.append((str(query), params)) + if len(self.calls) == 1: + self.rowcount = self.initial_rowcount + else: + self.rowcount = 1 + + +class _FakeConnection: + def __init__(self, initial_rowcount: int): + self.cursor_instance = _FakeCursor(initial_rowcount) + + def cursor(self): + return self.cursor_instance + + +@pytest.mark.asyncio +async def test_update_scada_field_inserts_when_update_hits_no_rows(): + ScadaRepository = _load_scada_repository() + conn = _FakeConnection(initial_rowcount=0) + point_time = datetime(2026, 1, 1, 0, 0, tzinfo=timezone.utc) + + await ScadaRepository.update_scada_field( + conn, + point_time, + "170490", + "cleaned_value", + 26.5, + ) + + assert len(conn.cursor_instance.calls) == 2 + assert "UPDATE scada.scada_data SET" in conn.cursor_instance.calls[0][0] + assert "INSERT INTO scada.scada_data" in conn.cursor_instance.calls[1][0] + + +@pytest.mark.asyncio +async def test_update_scada_field_skips_insert_when_update_succeeds(): + ScadaRepository = _load_scada_repository() + conn = _FakeConnection(initial_rowcount=1) + point_time = datetime(2026, 1, 1, 0, 0, tzinfo=timezone.utc) + + await ScadaRepository.update_scada_field( + conn, + point_time, + "170490", + "cleaned_value", + 26.5, + ) + + assert len(conn.cursor_instance.calls) == 1 + assert "UPDATE scada.scada_data SET" in conn.cursor_instance.calls[0][0]