Files
TJWaterServerBinary/tests/api/test_simulation_endpoints.py
T

272 lines
8.8 KiB
Python

from pathlib import Path
from datetime import datetime, timezone
from fastapi.testclient import TestClient
from tests.conftest import build_test_app, install_stub, load_module_from_path
def _load_simulation_module(monkeypatch):
install_stub(monkeypatch, "app.services", package=True)
def parse_aware_time(value, field_name="datetime"):
dt = datetime.fromisoformat(str(value).replace("Z", "+00:00"))
if dt.tzinfo is None:
raise ValueError(f"{field_name} is missing timezone information.")
return dt
def parse_utc_time(value, field_name="datetime"):
return parse_aware_time(value, field_name=field_name).astimezone(
timezone.utc
)
install_stub(
monkeypatch,
"app.services.time_api",
{
"parse_aware_time": parse_aware_time,
"parse_utc_time": parse_utc_time,
},
)
install_stub(
monkeypatch,
"app.services.simulation",
{
"run_simulation": lambda **kwargs: None,
"query_corresponding_element_id_and_query_id": lambda name: None,
"query_corresponding_pattern_id_and_query_id": lambda name: None,
"query_non_realtime_region": lambda name: [],
"get_source_outflow_region_id": lambda name, region_result: {},
"query_realtime_region_pipe_flow_and_demand_id": lambda name, region_result: {},
"query_pipe_flow_region_patterns": lambda name: {},
"query_non_realtime_region_patterns": lambda name, region_result: {},
"get_realtime_region_patterns": lambda name, source_outflow_region_id, realtime_region_pipe_flow_and_demand_id: ({}, {}),
},
)
install_stub(monkeypatch, "app.services.globals", {})
install_stub(
monkeypatch,
"app.services.tjnetwork",
{
"run_project": lambda network: "report",
"run_project_return_dict": lambda network: {"output": {}, "report": "ok"},
"run_inp": lambda network: "inp-report",
"dump_output": lambda output: f"dump::{output}",
},
)
install_stub(monkeypatch, "app.algorithms", package=True)
install_stub(monkeypatch, "app.algorithms.simulation", package=True)
install_stub(
monkeypatch,
"app.algorithms.simulation.scenarios",
{
"burst_analysis": lambda *args, **kwargs: "burst",
"valve_close_analysis": lambda *args, **kwargs: "valve",
"flushing_analysis": lambda *args, **kwargs: "flush",
"contaminant_simulation": lambda *args, **kwargs: "contaminant",
"age_analysis": lambda *args, **kwargs: "age",
"pressure_regulation": lambda *args, **kwargs: "pressure",
},
)
install_stub(
monkeypatch,
"app.algorithms.sensor",
{
"pressure_sensor_placement_sensitivity": lambda *args, **kwargs: [],
"pressure_sensor_placement_kmeans": lambda *args, **kwargs: [],
},
)
install_stub(
monkeypatch,
"app.services.network_import",
{"network_update": lambda *args, **kwargs: "updated"},
)
install_stub(
monkeypatch,
"app.services.simulation_ops",
{
"project_management": lambda *args, **kwargs: "managed",
"scheduling_simulation": lambda *args, **kwargs: "scheduled",
"daily_scheduling_simulation": lambda *args, **kwargs: "daily",
},
)
install_stub(
monkeypatch,
"app.services.valve_isolation",
{"analyze_valve_isolation": lambda *args, **kwargs: {}},
)
return load_module_from_path(
"tests_simulation_endpoints_module",
"app/api/v1/endpoints/simulation.py",
)
def test_run_project_endpoint_returns_plain_text(monkeypatch):
module = _load_simulation_module(monkeypatch)
monkeypatch.setattr(module, "run_project", lambda network: f"report::{network}")
client = TestClient(build_test_app(module.router, "/api/v1"))
response = client.get("/api/v1/runproject/", params={"network": "demo"})
assert response.status_code == 200
assert response.text == "report::demo"
def test_scheduling_analysis_maps_request_body(monkeypatch):
module = _load_simulation_module(monkeypatch)
captured = {}
def fake_schedule(network, start_time, pump_control, tank_id, water_plant_output_id, time_delta):
captured["args"] = (
network,
start_time,
pump_control,
tank_id,
water_plant_output_id,
time_delta,
)
return "scheduled"
monkeypatch.setattr(module, "scheduling_simulation", fake_schedule)
client = TestClient(build_test_app(module.router, "/api/v1"))
response = client.post(
"/api/v1/scheduling_analysis/",
json={
"network": "demo",
"start_time": "2025-01-01T08:00:00+08:00",
"pump_control": {"P1": [1, 0, 1]},
"tank_id": "T1",
"water_plant_output_id": "R1",
},
)
assert response.status_code == 200
assert response.json() == "scheduled"
assert captured["args"] == (
"demo",
"2025-01-01T08:00:00+08:00",
{"P1": [1, 0, 1]},
"T1",
"R1",
300,
)
def test_project_management_maps_named_arguments(monkeypatch):
module = _load_simulation_module(monkeypatch)
captured = {}
def fake_project_management(**kwargs):
captured.update(kwargs)
return "managed"
monkeypatch.setattr(module, "project_management", fake_project_management)
client = TestClient(build_test_app(module.router, "/api/v1"))
response = client.post(
"/api/v1/project_management/",
json={
"network": "demo",
"start_time": "2025-01-01T08:00:00+08:00",
"pump_control": {"P1": [1]},
"tank_init_level": {"T1": 10.0},
"region_demand": {"R1": 20.0},
},
)
assert response.status_code == 200
assert response.json() == "managed"
assert captured == {
"prj_name": "demo",
"start_datetime": "2025-01-01T08:00:00+08:00",
"pump_control": {"P1": [1]},
"tank_initial_level_control": {"T1": 10.0},
"region_demand_control": {"R1": 20.0},
}
def test_network_update_surfaces_service_error(monkeypatch, tmp_path):
module = _load_simulation_module(monkeypatch)
monkeypatch.chdir(tmp_path)
def boom(_path):
raise RuntimeError("write failed")
monkeypatch.setattr(module, "network_update", boom)
client = TestClient(build_test_app(module.router, "/api/v1"))
response = client.post(
"/api/v1/network_update/",
files={"file": ("update.txt", b"payload")},
)
assert response.status_code == 500
assert "数据库操作失败: write failed" in response.json()["detail"]
assert list(Path(tmp_path).glob("network_update_*"))
def test_run_simulation_manually_by_date_uses_utc_aware_timestamps(monkeypatch):
module = _load_simulation_module(monkeypatch)
captured_calls = []
monkeypatch.setattr(
module.simulation,
"run_simulation",
lambda **kwargs: captured_calls.append(kwargs),
)
module.run_simulation_manually_by_date(
"demo",
datetime(2025, 1, 1, 19, 4, 5, tzinfo=timezone.utc),
30,
)
assert [call["modify_pattern_start_time"] for call in captured_calls] == [
"2025-01-01T19:04:05+00:00",
"2025-01-01T19:19:05+00:00",
]
def test_runsimulationmanuallybydate_endpoint_accepts_timezone_aware_start_time(monkeypatch):
module = _load_simulation_module(monkeypatch)
captured = {}
def fake_run(network_name, start_time, duration):
captured["network_name"] = network_name
captured["start_time"] = start_time
captured["duration"] = duration
monkeypatch.setattr(module, "run_simulation_manually_by_date", fake_run)
client = TestClient(build_test_app(module.router, "/api/v1"))
response = client.post(
"/api/v1/runsimulationmanuallybydate/",
json={
"name": "demo",
"start_time": "2025-01-02T03:04:05+08:00",
"duration": 30,
},
)
assert response.status_code == 200
assert response.json() == {"status": "success"}
assert captured["network_name"] == "demo"
assert captured["duration"] == 30
assert captured["start_time"].isoformat() == "2025-01-01T19:04:05+00:00"
def test_runsimulationmanuallybydate_endpoint_rejects_naive_start_time(monkeypatch):
module = _load_simulation_module(monkeypatch)
client = TestClient(build_test_app(module.router, "/api/v1"))
response = client.post(
"/api/v1/runsimulationmanuallybydate/",
json={
"name": "demo",
"start_time": "2025-01-02T03:04:05",
"duration": 30,
},
)
assert response.status_code == 422