diff --git a/app/api/v1/endpoints/timeseries/realtime.py b/app/api/v1/endpoints/timeseries/realtime.py index 5725eb6..eb6ee8b 100644 --- a/app/api/v1/endpoints/timeseries/realtime.py +++ b/app/api/v1/endpoints/timeseries/realtime.py @@ -8,6 +8,10 @@ from .dependencies import get_timescale_connection router = APIRouter() +TIME_WITH_TZ_DESC = "ISO 8601 / RFC 3339 时间,必须显式带时区;可直接传 UTC+8,服务端会先转换为 UTC 再处理。" +TIME_RANGE_START_DESC = f"时间范围开始时间。{TIME_WITH_TZ_DESC}" +TIME_RANGE_END_DESC = f"时间范围结束时间。{TIME_WITH_TZ_DESC}" + @router.post("/realtime/links/batch", status_code=201, summary="批量插入实时管道数据") async def insert_realtime_links( @@ -29,16 +33,21 @@ async def insert_realtime_links( return {"message": f"Inserted {len(data)} records"} -@router.get("/realtime/links", summary="查询实时管道数据") +@router.get( + "/realtime/links", + summary="查询实时管道数据", + description="按时间范围查询实时管道数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。", +) async def get_realtime_links( - start_time: datetime = Query(..., description="查询开始时间"), - end_time: datetime = Query(..., description="查询结束时间"), + start_time: datetime = Query(..., description=TIME_RANGE_START_DESC), + end_time: datetime = Query(..., description=TIME_RANGE_END_DESC), conn: AsyncConnection = Depends(get_timescale_connection), ): """ 查询指定时间范围内的实时管道数据 - 根据时间范围查询所有实时管道的监测值。 + 根据时间范围查询所有实时管道的监测值。传入时间必须显式包含时区, + 可以直接使用 UTC+8,服务端会先统一转换为 UTC 再参与数据库查询。 Args: start_time: 查询开始时间 @@ -50,10 +59,14 @@ async def get_realtime_links( return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time) -@router.delete("/realtime/links", summary="删除实时管道数据") +@router.delete( + "/realtime/links", + summary="删除实时管道数据", + description="按时间范围删除实时管道数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端按请求中的绝对时间删除对应 UTC 数据。", +) async def delete_realtime_links( - start_time: datetime = Query(..., description="删除开始时间"), - end_time: datetime = Query(..., description="删除结束时间"), + start_time: datetime = Query(..., description=TIME_RANGE_START_DESC), + end_time: datetime = Query(..., description=TIME_RANGE_END_DESC), conn: AsyncConnection = Depends(get_timescale_connection), ): """ @@ -75,7 +88,7 @@ async def delete_realtime_links( @router.patch("/realtime/links/{link_id}/field", summary="更新实时管道字段") async def update_realtime_link_field( link_id: str = Path(..., description="管道ID"), - time: datetime = Query(..., description="更新数据的时间戳"), + time: datetime = Query(..., description=f"要更新记录的时间戳。{TIME_WITH_TZ_DESC}"), field: str = Query(..., description="要更新的字段名称"), value: float = Query(..., description="更新的字段值"), conn: AsyncConnection = Depends(get_timescale_connection), @@ -124,16 +137,21 @@ async def insert_realtime_nodes( return {"message": f"Inserted {len(data)} records"} -@router.get("/realtime/nodes", summary="查询实时节点数据") +@router.get( + "/realtime/nodes", + summary="查询实时节点数据", + description="按时间范围查询实时节点数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。", +) async def get_realtime_nodes( - start_time: datetime = Query(..., description="查询开始时间"), - end_time: datetime = Query(..., description="查询结束时间"), + start_time: datetime = Query(..., description=TIME_RANGE_START_DESC), + end_time: datetime = Query(..., description=TIME_RANGE_END_DESC), conn: AsyncConnection = Depends(get_timescale_connection), ): """ 查询指定时间范围内的实时节点数据 - 根据时间范围查询所有实时节点的监测值。 + 根据时间范围查询所有实时节点的监测值。传入时间必须显式包含时区, + 可以直接使用 UTC+8,服务端会先统一转换为 UTC 再参与数据库查询。 Args: start_time: 查询开始时间 @@ -145,10 +163,14 @@ async def get_realtime_nodes( return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time) -@router.delete("/realtime/nodes", summary="删除实时节点数据") +@router.delete( + "/realtime/nodes", + summary="删除实时节点数据", + description="按时间范围删除实时节点数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端按请求中的绝对时间删除对应 UTC 数据。", +) async def delete_realtime_nodes( - start_time: datetime = Query(..., description="删除开始时间"), - end_time: datetime = Query(..., description="删除结束时间"), + start_time: datetime = Query(..., description=TIME_RANGE_START_DESC), + end_time: datetime = Query(..., description=TIME_RANGE_END_DESC), conn: AsyncConnection = Depends(get_timescale_connection), ): """ @@ -173,7 +195,7 @@ async def delete_realtime_nodes( async def store_realtime_simulation_result( node_result_list: List[dict] = Body(..., description="节点模拟结果列表"), link_result_list: List[dict] = Body(..., description="管道模拟结果列表"), - result_start_time: str = Query(..., description="模拟结果开始时间"), + result_start_time: str = Query(..., description=f"模拟结果开始时间。{TIME_WITH_TZ_DESC}"), conn: AsyncConnection = Depends(get_timescale_connection), ): """ @@ -195,9 +217,13 @@ async def store_realtime_simulation_result( return {"message": "Simulation results stored successfully"} -@router.get("/realtime/query/by-time-property", summary="按时间和属性查询实时数据") +@router.get( + "/realtime/query/by-time-property", + summary="按时间和属性查询实时数据", + description="查询指定时间点的实时属性值。query_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。", +) async def query_realtime_records_by_time_property( - query_time: str = Query(..., description="查询时间"), + query_time: str = Query(..., description=f"查询时间。{TIME_WITH_TZ_DESC}"), type: str = Query(..., description="数据类型,pipe(管道)或 junction(节点)"), property: str = Query(..., description="要查询的属性名称"), conn: AsyncConnection = Depends(get_timescale_connection), @@ -227,11 +253,15 @@ async def query_realtime_records_by_time_property( raise HTTPException(status_code=400, detail=str(e)) -@router.get("/realtime/query/by-id-time", summary="按ID和时间查询实时模拟数据") +@router.get( + "/realtime/query/by-id-time", + summary="按ID和时间查询实时模拟数据", + description="查询指定元素在某一时间点的实时模拟结果。query_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。", +) async def query_realtime_simulation_by_id_time( id: str = Query(..., description="元素ID(管道ID或节点ID)"), type: str = Query(..., description="元素类型,pipe(管道)或 junction(节点)"), - query_time: str = Query(..., description="查询时间"), + query_time: str = Query(..., description=f"查询时间。{TIME_WITH_TZ_DESC}"), conn: AsyncConnection = Depends(get_timescale_connection), ): """ diff --git a/app/infra/db/timescaledb/repositories/realtime.py b/app/infra/db/timescaledb/repositories/realtime.py index 7c0facb..6b26fa4 100644 --- a/app/infra/db/timescaledb/repositories/realtime.py +++ b/app/infra/db/timescaledb/repositories/realtime.py @@ -100,10 +100,12 @@ class RealtimeRepository: async def get_links_by_time_range( conn: AsyncConnection, start_time: datetime, end_time: datetime ) -> List[dict]: + normalized_start_time = parse_utc_time(start_time, field_name="start_time") + normalized_end_time = parse_utc_time(end_time, field_name="end_time") async with conn.cursor() as cur: await cur.execute( "SELECT * FROM realtime.link_simulation WHERE time >= %s AND time <= %s", - (start_time, end_time), + (normalized_start_time, normalized_end_time), ) return await cur.fetchall() @@ -296,10 +298,12 @@ class RealtimeRepository: async def get_nodes_by_time_range( conn: AsyncConnection, start_time: datetime, end_time: datetime ) -> List[dict]: + normalized_start_time = parse_utc_time(start_time, field_name="start_time") + normalized_end_time = parse_utc_time(end_time, field_name="end_time") async with conn.cursor() as cur: await cur.execute( "SELECT * FROM realtime.node_simulation WHERE time >= %s AND time <= %s", - (start_time, end_time), + (normalized_start_time, normalized_end_time), ) return await cur.fetchall() diff --git a/tests/unit/test_realtime_repository.py b/tests/unit/test_realtime_repository.py new file mode 100644 index 0000000..31f64f6 --- /dev/null +++ b/tests/unit/test_realtime_repository.py @@ -0,0 +1,110 @@ +import asyncio +from datetime import datetime, timezone +import importlib.util +from pathlib import Path +import sys +from types import ModuleType + + +def _load_time_api_module(): + module_path = ( + Path(__file__).resolve().parents[2] / "app" / "services" / "time_api.py" + ) + spec = importlib.util.spec_from_file_location("tests_time_api_under_test", module_path) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module + + +def _load_realtime_repository(): + time_api_module = _load_time_api_module() + app_module = ModuleType("app") + services_module = ModuleType("app.services") + services_module.time_api = time_api_module + app_module.services = services_module + sys.modules["app"] = app_module + sys.modules["app.services"] = services_module + sys.modules["app.services.time_api"] = time_api_module + + module_path = ( + Path(__file__).resolve().parents[2] + / "app" + / "infra" + / "db" + / "timescaledb" + / "repositories" + / "realtime.py" + ) + spec = importlib.util.spec_from_file_location( + "tests_realtime_repo_under_test", module_path + ) + module = importlib.util.module_from_spec(spec) + assert spec and spec.loader + spec.loader.exec_module(module) + return module.RealtimeRepository + + +class _FakeCursor: + def __init__(self): + self.calls: list[tuple[str, tuple]] = [] + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + async def execute(self, query, params): + self.calls.append((str(query), params)) + + async def fetchall(self): + return [] + + +class _FakeConnection: + def __init__(self): + self.cursor_instance = _FakeCursor() + + def cursor(self): + return self.cursor_instance + + +def test_get_links_by_time_range_normalizes_inputs_to_utc(): + RealtimeRepository = _load_realtime_repository() + conn = _FakeConnection() + + asyncio.run( + RealtimeRepository.get_links_by_time_range( + conn, + datetime.fromisoformat("2026-06-01T08:00:00+08:00"), + datetime.fromisoformat("2026-06-01T09:00:00+08:00"), + ) + ) + + assert len(conn.cursor_instance.calls) == 1 + _, params = conn.cursor_instance.calls[0] + assert params == ( + datetime(2026, 6, 1, 0, 0, tzinfo=timezone.utc), + datetime(2026, 6, 1, 1, 0, tzinfo=timezone.utc), + ) + + +def test_get_nodes_by_time_range_normalizes_inputs_to_utc(): + RealtimeRepository = _load_realtime_repository() + conn = _FakeConnection() + + asyncio.run( + RealtimeRepository.get_nodes_by_time_range( + conn, + datetime.fromisoformat("2026-06-01T08:00:00+08:00"), + datetime.fromisoformat("2026-06-01T09:00:00+08:00"), + ) + ) + + assert len(conn.cursor_instance.calls) == 1 + _, params = conn.cursor_instance.calls[0] + assert params == ( + datetime(2026, 6, 1, 0, 0, tzinfo=timezone.utc), + datetime(2026, 6, 1, 1, 0, tzinfo=timezone.utc), + )