From b9410b0ff39e88a87fe85f6e8c2ea10d193ee216 Mon Sep 17 00:00:00 2001 From: Jiang Date: Wed, 3 Jun 2026 11:17:37 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80=E5=89=8D=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E6=97=B6=E5=8C=BA=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/api/v1/endpoints/simulation.py | 50 ++++++------- app/services/simulation.py | 12 ++-- cli/tests/unit/test_tjwater_cli.py | 3 +- cli/tjwater_cli/commands_analysis.py | 3 +- cli/tjwater_cli/registry.py | 2 +- scripts/main.py | 41 +++++------ tests/api/test_simulation_endpoints.py | 98 +++++++++++++++++++++++++- 7 files changed, 147 insertions(+), 62 deletions(-) diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index 3ce8a2d..18b6977 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -35,16 +35,22 @@ from app.services.simulation_ops import ( daily_scheduling_simulation, ) from app.services.valve_isolation import analyze_valve_isolation -from pydantic import BaseModel, Field +from app.services.time_api import parse_aware_time, parse_utc_time +from pydantic import BaseModel, Field, field_validator router = APIRouter() class RunSimulationManuallyByDate(BaseModel): name: str = Field(..., description="管网名称(或数据库名称)") - simulation_date: str = Field(..., description="模拟基准日期 (YYYY-MM-DD)") - start_time: str = Field(..., description="开始时间 (HH:MM 或 HH:MM:SS)") - duration: int = Field(..., description="持续时间 (分钟)") + start_time: str = Field(..., description="开始时间 (ISO 8601 / RFC3339,必须显式带时区)") + duration: int = Field(..., gt=0, description="持续时间 (分钟)") + + @field_validator("start_time") + @classmethod + def validate_start_time_timezone(cls, value: str) -> str: + parse_aware_time(value, field_name="start_time") + return value class BurstAnalysis(BaseModel): @@ -109,28 +115,15 @@ class PressureSensorPlacement(BaseModel): def run_simulation_manually_by_date( - network_name: str, base_date: datetime, start_time: str, duration: int + network_name: str, start_time: datetime, duration: int ) -> None: - time_parts = list(map(int, start_time.split(":"))) - if len(time_parts) == 2: - start_hour, start_minute = time_parts - start_second = 0 - elif len(time_parts) == 3: - start_hour, start_minute, start_second = time_parts - else: - raise ValueError("Invalid start_time format. Use HH:MM or HH:MM:SS") - - start_datetime = base_date.replace( - hour=start_hour, minute=start_minute, second=start_second - ) - end_datetime = start_datetime + timedelta(minutes=duration) - current_time = start_datetime + end_datetime = start_time + timedelta(minutes=duration) + current_time = start_time while current_time < end_datetime: - iso_time = current_time.strftime("%Y-%m-%dT%H:%M:%S") + "+08:00" simulation.run_simulation( name=network_name, simulation_type="realtime", - modify_pattern_start_time=iso_time, + modify_pattern_start_time=current_time.isoformat(timespec="seconds"), ) current_time += timedelta(minutes=15) @@ -767,7 +760,7 @@ async def fastapi_pressure_sensor_placement( return "success" -@router.post("/runsimulationmanuallybydate/", summary="手动运行日期指定模拟", description="根据指定的日期、开始时间和持续时间,手动运行水力模拟。系统将自动查询管网参数并执行模拟。") +@router.post("/runsimulationmanuallybydate/", summary="手动运行日期指定模拟", description="根据指定的开始时间和持续时间,手动运行水力模拟。开始时间必须是显式带时区的 ISO 8601 / RFC3339 时间。") async def fastapi_run_simulation_manually_by_date( data: RunSimulationManuallyByDate = Body(..., description="模拟运行参数"), ) -> dict[str, str]: @@ -776,14 +769,13 @@ async def fastapi_run_simulation_manually_by_date( 请求体参数: - **name**: 管网名称(或数据库名称) - - **simulation_date**: 模拟基准日期(YYYY-MM-DD格式) - - **start_time**: 开始时间(HH:MM或HH:MM:SS格式) + - **start_time**: 开始时间(ISO 8601 / RFC3339,必须显式带时区) - **duration**: 模拟持续时间(分钟) - 系统将从指定日期和时间开始,按15分钟间隔多次运行模拟。 + 系统将从指定时间开始,按15分钟间隔多次运行模拟。 每次模拟间隔15分钟,直至达到指定的总持续时间。 """ - item = data.dict() + item = data.model_dump() try: simulation.query_corresponding_element_id_and_query_id(item["name"]) simulation.query_corresponding_pattern_id_and_query_id(item["name"]) @@ -810,10 +802,10 @@ async def fastapi_run_simulation_manually_by_date( globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id, ) - base_date = datetime.strptime(item["simulation_date"], "%Y-%m-%d") + start_time = parse_utc_time(item["start_time"], field_name="start_time") run_simulation_manually_by_date( - item["name"], base_date, item["start_time"], item["duration"] + item["name"], start_time, item["duration"] ) return {"status": "success"} except Exception as exc: - return {"status": "error", "message": str(exc)} + raise HTTPException(status_code=500, detail=str(exc)) from exc diff --git a/app/services/simulation.py b/app/services/simulation.py index 204c572..ee49c09 100644 --- a/app/services/simulation.py +++ b/app/services/simulation.py @@ -34,6 +34,7 @@ import psycopg import logging import app.services.globals as globals import app.services.project_info as project_info +from app.services.time_api import parse_beijing_time from app.core.config import get_pgconn_string from app.infra.db.timescaledb.internal_queries import ( InternalQueries as TimescaleInternalQueries, @@ -661,13 +662,14 @@ def from_seconds_to_clock(secs: int) -> str: def convert_time_format(original_time: str) -> str: """ - 格式转换,将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00” - :param original_time: str, “2024-04-13T08:00:00+08:00"格式的时间 + 格式转换,将带时区的 ISO 8601 / RFC3339 时间转为北京时间的“YYYY-MM-DD HH:MM:SS” + :param original_time: str,带显式时区的时间 :return: str,“2024-04-13 08:00:00”格式的时间 """ - new_time = original_time.replace("T", " ") - new_time = new_time.replace("+08:00", "") - return new_time + normalized_time = parse_beijing_time( + original_time, field_name="modify_pattern_start_time" + ) + return normalized_time.replace(microsecond=0).strftime("%Y-%m-%d %H:%M:%S") def get_history_pattern_info(project_name, pattern_name): diff --git a/cli/tests/unit/test_tjwater_cli.py b/cli/tests/unit/test_tjwater_cli.py index 99c75d5..2a84941 100644 --- a/cli/tests/unit/test_tjwater_cli.py +++ b/cli/tests/unit/test_tjwater_cli.py @@ -313,8 +313,7 @@ def test_simulation_run_translates_rfc3339(monkeypatch): assert result.exit_code == 0 assert captured["json"] == { "name": "demo", - "simulation_date": "2025-01-02", - "start_time": "03:04:05+08:00", + "start_time": "2025-01-02T03:04:05+08:00", "duration": 30, } assert "tjwater-cli data timeseries realtime links" in result.stdout diff --git a/cli/tjwater_cli/commands_analysis.py b/cli/tjwater_cli/commands_analysis.py index 47164fa..231d69b 100644 --- a/cli/tjwater_cli/commands_analysis.py +++ b/cli/tjwater_cli/commands_analysis.py @@ -45,8 +45,7 @@ def simulation_run( end_time = (parsed + timedelta(minutes=duration)).isoformat() body = { "name": network, - "simulation_date": parsed.date().isoformat(), - "start_time": parsed.timetz().replace(microsecond=0).isoformat(), + "start_time": parsed.replace(microsecond=0).isoformat(), "duration": duration, } emit_api( diff --git a/cli/tjwater_cli/registry.py b/cli/tjwater_cli/registry.py index 6dd5f46..5270696 100644 --- a/cli/tjwater_cli/registry.py +++ b/cli/tjwater_cli/registry.py @@ -102,7 +102,7 @@ COMMAND_DOCS: dict[tuple[str, ...], CommandDoc] = { ("simulation", "run"): CommandDoc( path=("simulation", "run"), summary="触发指定绝对时间的模拟运行", - description="把 RFC3339 start-time 拆成 simulation_date 与 start_time 后调用 /runsimulationmanuallybydate/;接口本身只负责触发运行,结果需后续通过 data timeseries 在对应时间段查询。", + description="把显式带时区的 RFC3339 start-time 直接传给 /runsimulationmanuallybydate/;服务端按带时区时间处理并统一按 UTC 存储结果,实时数据需后续通过 data timeseries 在对应时间段查询。", options=( CommandOptionDoc("start-time", "显式带时区的开始时间", required=True), CommandOptionDoc("duration", "持续分钟数", required=True), diff --git a/scripts/main.py b/scripts/main.py index 070895c..7247e14 100644 --- a/scripts/main.py +++ b/scripts/main.py @@ -31,7 +31,7 @@ from fastapi.middleware.cors import CORSMiddleware from starlette.responses import FileResponse, JSONResponse from contextlib import asynccontextmanager -from pydantic import BaseModel +from pydantic import BaseModel, field_validator from multiprocessing import Value @@ -3654,40 +3654,35 @@ async def fastapi_download_history_data_manually( class Run_Simulation_Manually_by_Date(BaseModel): """ name:数据库名称 - simulation_date:样式如 2025-05-04 - start_time:开始时间,样式如 08:00:00 + start_time:开始时间,样式如 2025-05-04T08:00:00+08:00 duration:持续时间,单位为分钟 """ name: str - simulation_date: str start_time: str duration: int + @field_validator("start_time") + @classmethod + def validate_start_time_timezone(cls, value: str) -> str: + time_api.parse_aware_time(value, field_name="start_time") + return value + def run_simulation_manually_by_date( - network_name: str, base_date: datetime, start_time: str, duration: int + network_name: str, start_time: datetime, duration: int ) -> None: - # 解析开始时间 - start_hour, start_minute, start_second = map(int, start_time.split(":")) - start_datetime = base_date.replace( - hour=start_hour, minute=start_minute, second=start_second - ) - # 计算结束时间 - end_datetime = start_datetime + timedelta(minutes=duration) + end_datetime = start_time + timedelta(minutes=duration) # 生成时间点,每15分钟一个 - current_time = start_datetime + current_time = start_time while current_time < end_datetime: - # 格式化成ISO8601带时区格式 - iso_time = current_time.strftime("%Y-%m-%dT%H:%M:%S") + "+08:00" - ## 执行函数调用 simulation.run_simulation( name=network_name, simulation_type="realtime", - modify_pattern_start_time=iso_time, + modify_pattern_start_time=current_time.isoformat(timespec="seconds"), ) # 增加15分钟 @@ -3698,7 +3693,7 @@ def run_simulation_manually_by_date( async def fastapi_run_simulation_manually_by_date( data: Run_Simulation_Manually_by_Date, ) -> dict[str, str]: - item = data.dict() + item = data.model_dump() print(f"item: {item}") filename = "c:/lock.simulation" @@ -3740,11 +3735,13 @@ async def fastapi_run_simulation_manually_by_date( globals.realtime_region_pipe_flow_and_demand_id, ) - base_date = datetime.strptime(item["simulation_date"], "%Y-%m-%d") + start_time = time_api.parse_utc_time( + item["start_time"], field_name="start_time" + ) thread = threading.Thread( target=lambda: run_simulation_manually_by_date( - item["name"], base_date, item["start_time"], item["duration"] + item["name"], start_time, item["duration"] ) ) @@ -3753,11 +3750,11 @@ async def fastapi_run_simulation_manually_by_date( return {"status": "success"} except Exception as e: - return {"status": "error", "message": str(e)} + raise HTTPException(status_code=500, detail=str(e)) from e # thread.join() # DingZQ 08152025 - # matched_keys = redis_client.keys(f"*{item['simulation_date']}*") + # matched_keys = redis_client.keys(...) # redis_client.delete(*matched_keys) diff --git a/tests/api/test_simulation_endpoints.py b/tests/api/test_simulation_endpoints.py index e9cdb1c..38f777f 100644 --- a/tests/api/test_simulation_endpoints.py +++ b/tests/api/test_simulation_endpoints.py @@ -1,4 +1,5 @@ from pathlib import Path +from datetime import datetime, timezone from fastapi.testclient import TestClient @@ -7,10 +8,39 @@ from tests.conftest import build_test_app, install_stub, load_module_from_path def _load_simulation_module(monkeypatch): install_stub(monkeypatch, "app.services", package=True) + def parse_aware_time(value, field_name="datetime"): + dt = datetime.fromisoformat(str(value).replace("Z", "+00:00")) + if dt.tzinfo is None: + raise ValueError(f"{field_name} is missing timezone information.") + return dt + + def parse_utc_time(value, field_name="datetime"): + return parse_aware_time(value, field_name=field_name).astimezone( + timezone.utc + ) + + install_stub( + monkeypatch, + "app.services.time_api", + { + "parse_aware_time": parse_aware_time, + "parse_utc_time": parse_utc_time, + }, + ) install_stub( monkeypatch, "app.services.simulation", - {"run_simulation": lambda **kwargs: None}, + { + "run_simulation": lambda **kwargs: None, + "query_corresponding_element_id_and_query_id": lambda name: None, + "query_corresponding_pattern_id_and_query_id": lambda name: None, + "query_non_realtime_region": lambda name: [], + "get_source_outflow_region_id": lambda name, region_result: {}, + "query_realtime_region_pipe_flow_and_demand_id": lambda name, region_result: {}, + "query_pipe_flow_region_patterns": lambda name: {}, + "query_non_realtime_region_patterns": lambda name, region_result: {}, + "get_realtime_region_patterns": lambda name, source_outflow_region_id, realtime_region_pipe_flow_and_demand_id: ({}, {}), + }, ) install_stub(monkeypatch, "app.services.globals", {}) install_stub( @@ -173,3 +203,69 @@ def test_network_update_surfaces_service_error(monkeypatch, tmp_path): assert response.status_code == 500 assert "数据库操作失败: write failed" in response.json()["detail"] assert list(Path(tmp_path).glob("network_update_*")) + + +def test_run_simulation_manually_by_date_uses_utc_aware_timestamps(monkeypatch): + module = _load_simulation_module(monkeypatch) + captured_calls = [] + + monkeypatch.setattr( + module.simulation, + "run_simulation", + lambda **kwargs: captured_calls.append(kwargs), + ) + + module.run_simulation_manually_by_date( + "demo", + datetime(2025, 1, 1, 19, 4, 5, tzinfo=timezone.utc), + 30, + ) + + assert [call["modify_pattern_start_time"] for call in captured_calls] == [ + "2025-01-01T19:04:05+00:00", + "2025-01-01T19:19:05+00:00", + ] + + +def test_runsimulationmanuallybydate_endpoint_accepts_timezone_aware_start_time(monkeypatch): + module = _load_simulation_module(monkeypatch) + captured = {} + + def fake_run(network_name, start_time, duration): + captured["network_name"] = network_name + captured["start_time"] = start_time + captured["duration"] = duration + + monkeypatch.setattr(module, "run_simulation_manually_by_date", fake_run) + client = TestClient(build_test_app(module.router, "/api/v1")) + + response = client.post( + "/api/v1/runsimulationmanuallybydate/", + json={ + "name": "demo", + "start_time": "2025-01-02T03:04:05+08:00", + "duration": 30, + }, + ) + + assert response.status_code == 200 + assert response.json() == {"status": "success"} + assert captured["network_name"] == "demo" + assert captured["duration"] == 30 + assert captured["start_time"].isoformat() == "2025-01-01T19:04:05+00:00" + + +def test_runsimulationmanuallybydate_endpoint_rejects_naive_start_time(monkeypatch): + module = _load_simulation_module(monkeypatch) + client = TestClient(build_test_app(module.router, "/api/v1")) + + response = client.post( + "/api/v1/runsimulationmanuallybydate/", + json={ + "name": "demo", + "start_time": "2025-01-02T03:04:05", + "duration": 30, + }, + ) + + assert response.status_code == 422