Files
TJWaterServerBinary/tests/unit/test_pressure_cleaning.py

109 lines
3.8 KiB
Python

import importlib.util
import sys
import types
from pathlib import Path
import numpy as np
import pandas as pd
def _load_pressure_cleaning_module():
project_root = Path(__file__).resolve().parents[2]
utils_path = project_root / "app" / "algorithms" / "_utils.py"
pressure_path = project_root / "app" / "algorithms" / "cleaning" / "pressure.py"
app_module = sys.modules.setdefault("app", types.ModuleType("app"))
algorithms_module = sys.modules.setdefault(
"app.algorithms",
types.ModuleType("app.algorithms"),
)
setattr(app_module, "algorithms", algorithms_module)
utils_spec = importlib.util.spec_from_file_location("app.algorithms._utils", utils_path)
assert utils_spec and utils_spec.loader
utils_module = importlib.util.module_from_spec(utils_spec)
sys.modules["app.algorithms._utils"] = utils_module
utils_spec.loader.exec_module(utils_module)
pressure_spec = importlib.util.spec_from_file_location(
"tests_pressure_under_test",
pressure_path,
)
assert pressure_spec and pressure_spec.loader
pressure_module = importlib.util.module_from_spec(pressure_spec)
pressure_spec.loader.exec_module(pressure_module)
return pressure_module
def test_clean_pressure_data_df_km_repairs_long_form_pressure_series():
module = _load_pressure_cleaning_module()
repo_root = Path(__file__).resolve().parents[3]
raw_df = pd.read_csv(repo_root / "data" / "node_simulation.csv")
noisy_df = pd.read_csv(repo_root / "data" / "node_simulation_noisy.csv")
cleaned_df = module.clean_pressure_data_df_km(noisy_df)
for df in (raw_df, noisy_df, cleaned_df):
df["time"] = pd.to_datetime(df["time"])
assert len(cleaned_df) == len(raw_df)
assert set(cleaned_df.columns) == {"time", "id", "pressure"}
assert cleaned_df["pressure"].isna().sum() == 0
noisy_joined = raw_df.merge(noisy_df, on=["time", "id"], how="inner", suffixes=("_raw", "_noisy"))
cleaned_joined = raw_df.merge(
cleaned_df,
on=["time", "id"],
how="inner",
suffixes=("_raw", "_clean"),
)
noisy_rmse = float(
np.sqrt(np.mean((noisy_joined["pressure_raw"] - noisy_joined["pressure_noisy"]) ** 2))
)
cleaned_rmse = float(
np.sqrt(np.mean((cleaned_joined["pressure_raw"] - cleaned_joined["pressure_clean"]) ** 2))
)
noisy_mae = float(
np.mean(np.abs(noisy_joined["pressure_raw"] - noisy_joined["pressure_noisy"]))
)
cleaned_mae = float(
np.mean(np.abs(cleaned_joined["pressure_raw"] - cleaned_joined["pressure_clean"]))
)
assert cleaned_rmse < 0.35
assert cleaned_rmse < noisy_rmse * 0.5
assert cleaned_mae < noisy_mae
repaired_gap = cleaned_df[
(cleaned_df["id"] == 170490)
& (cleaned_df["time"] == pd.Timestamp("2026-01-01T05:00:00+08:00"))
]["pressure"].iloc[0]
assert abs(repaired_gap - 30.62433433532715) < 1.0
spike_row = cleaned_df[
(cleaned_df["id"] == 42563)
& (cleaned_df["time"] == pd.Timestamp("2026-01-01T03:45:00+08:00"))
]["pressure"].iloc[0]
assert abs(spike_row - 28.018701553344727) < 2.0
def test_clean_pressure_data_df_km_accepts_single_sensor_wide_frame_with_utc_strings():
module = _load_pressure_cleaning_module()
repo_root = Path(__file__).resolve().parents[3]
noisy_df = pd.read_csv(repo_root / "data" / "node_simulation_noisy.csv")
single_sensor = (
noisy_df[noisy_df["id"] == 170490][["time", "pressure"]]
.rename(columns={"pressure": "170490"})
.copy()
)
single_sensor["time"] = (
pd.to_datetime(single_sensor["time"], utc=True).dt.strftime("%Y-%m-%dT%H:%M:%SZ")
)
cleaned_df = module.clean_pressure_data_df_km(single_sensor)
assert len(cleaned_df) == 192
assert cleaned_df["170490"].isna().sum() == 0