优化时间范围查询,添加 UTC 时间标准化处理
This commit is contained in:
@@ -8,6 +8,10 @@ from .dependencies import get_timescale_connection
|
|||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
TIME_WITH_TZ_DESC = "ISO 8601 / RFC 3339 时间,必须显式带时区;可直接传 UTC+8,服务端会先转换为 UTC 再处理。"
|
||||||
|
TIME_RANGE_START_DESC = f"时间范围开始时间。{TIME_WITH_TZ_DESC}"
|
||||||
|
TIME_RANGE_END_DESC = f"时间范围结束时间。{TIME_WITH_TZ_DESC}"
|
||||||
|
|
||||||
|
|
||||||
@router.post("/realtime/links/batch", status_code=201, summary="批量插入实时管道数据")
|
@router.post("/realtime/links/batch", status_code=201, summary="批量插入实时管道数据")
|
||||||
async def insert_realtime_links(
|
async def insert_realtime_links(
|
||||||
@@ -29,16 +33,21 @@ async def insert_realtime_links(
|
|||||||
return {"message": f"Inserted {len(data)} records"}
|
return {"message": f"Inserted {len(data)} records"}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/realtime/links", summary="查询实时管道数据")
|
@router.get(
|
||||||
|
"/realtime/links",
|
||||||
|
summary="查询实时管道数据",
|
||||||
|
description="按时间范围查询实时管道数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。",
|
||||||
|
)
|
||||||
async def get_realtime_links(
|
async def get_realtime_links(
|
||||||
start_time: datetime = Query(..., description="查询开始时间"),
|
start_time: datetime = Query(..., description=TIME_RANGE_START_DESC),
|
||||||
end_time: datetime = Query(..., description="查询结束时间"),
|
end_time: datetime = Query(..., description=TIME_RANGE_END_DESC),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
查询指定时间范围内的实时管道数据
|
查询指定时间范围内的实时管道数据
|
||||||
|
|
||||||
根据时间范围查询所有实时管道的监测值。
|
根据时间范围查询所有实时管道的监测值。传入时间必须显式包含时区,
|
||||||
|
可以直接使用 UTC+8,服务端会先统一转换为 UTC 再参与数据库查询。
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
start_time: 查询开始时间
|
start_time: 查询开始时间
|
||||||
@@ -50,10 +59,14 @@ async def get_realtime_links(
|
|||||||
return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time)
|
return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time)
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/realtime/links", summary="删除实时管道数据")
|
@router.delete(
|
||||||
|
"/realtime/links",
|
||||||
|
summary="删除实时管道数据",
|
||||||
|
description="按时间范围删除实时管道数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端按请求中的绝对时间删除对应 UTC 数据。",
|
||||||
|
)
|
||||||
async def delete_realtime_links(
|
async def delete_realtime_links(
|
||||||
start_time: datetime = Query(..., description="删除开始时间"),
|
start_time: datetime = Query(..., description=TIME_RANGE_START_DESC),
|
||||||
end_time: datetime = Query(..., description="删除结束时间"),
|
end_time: datetime = Query(..., description=TIME_RANGE_END_DESC),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
@@ -75,7 +88,7 @@ async def delete_realtime_links(
|
|||||||
@router.patch("/realtime/links/{link_id}/field", summary="更新实时管道字段")
|
@router.patch("/realtime/links/{link_id}/field", summary="更新实时管道字段")
|
||||||
async def update_realtime_link_field(
|
async def update_realtime_link_field(
|
||||||
link_id: str = Path(..., description="管道ID"),
|
link_id: str = Path(..., description="管道ID"),
|
||||||
time: datetime = Query(..., description="更新数据的时间戳"),
|
time: datetime = Query(..., description=f"要更新记录的时间戳。{TIME_WITH_TZ_DESC}"),
|
||||||
field: str = Query(..., description="要更新的字段名称"),
|
field: str = Query(..., description="要更新的字段名称"),
|
||||||
value: float = Query(..., description="更新的字段值"),
|
value: float = Query(..., description="更新的字段值"),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
@@ -124,16 +137,21 @@ async def insert_realtime_nodes(
|
|||||||
return {"message": f"Inserted {len(data)} records"}
|
return {"message": f"Inserted {len(data)} records"}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/realtime/nodes", summary="查询实时节点数据")
|
@router.get(
|
||||||
|
"/realtime/nodes",
|
||||||
|
summary="查询实时节点数据",
|
||||||
|
description="按时间范围查询实时节点数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。",
|
||||||
|
)
|
||||||
async def get_realtime_nodes(
|
async def get_realtime_nodes(
|
||||||
start_time: datetime = Query(..., description="查询开始时间"),
|
start_time: datetime = Query(..., description=TIME_RANGE_START_DESC),
|
||||||
end_time: datetime = Query(..., description="查询结束时间"),
|
end_time: datetime = Query(..., description=TIME_RANGE_END_DESC),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
查询指定时间范围内的实时节点数据
|
查询指定时间范围内的实时节点数据
|
||||||
|
|
||||||
根据时间范围查询所有实时节点的监测值。
|
根据时间范围查询所有实时节点的监测值。传入时间必须显式包含时区,
|
||||||
|
可以直接使用 UTC+8,服务端会先统一转换为 UTC 再参与数据库查询。
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
start_time: 查询开始时间
|
start_time: 查询开始时间
|
||||||
@@ -145,10 +163,14 @@ async def get_realtime_nodes(
|
|||||||
return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time)
|
return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time)
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/realtime/nodes", summary="删除实时节点数据")
|
@router.delete(
|
||||||
|
"/realtime/nodes",
|
||||||
|
summary="删除实时节点数据",
|
||||||
|
description="按时间范围删除实时节点数据。start_time 和 end_time 必须显式带时区;允许传 UTC+8,服务端按请求中的绝对时间删除对应 UTC 数据。",
|
||||||
|
)
|
||||||
async def delete_realtime_nodes(
|
async def delete_realtime_nodes(
|
||||||
start_time: datetime = Query(..., description="删除开始时间"),
|
start_time: datetime = Query(..., description=TIME_RANGE_START_DESC),
|
||||||
end_time: datetime = Query(..., description="删除结束时间"),
|
end_time: datetime = Query(..., description=TIME_RANGE_END_DESC),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
@@ -173,7 +195,7 @@ async def delete_realtime_nodes(
|
|||||||
async def store_realtime_simulation_result(
|
async def store_realtime_simulation_result(
|
||||||
node_result_list: List[dict] = Body(..., description="节点模拟结果列表"),
|
node_result_list: List[dict] = Body(..., description="节点模拟结果列表"),
|
||||||
link_result_list: List[dict] = Body(..., description="管道模拟结果列表"),
|
link_result_list: List[dict] = Body(..., description="管道模拟结果列表"),
|
||||||
result_start_time: str = Query(..., description="模拟结果开始时间"),
|
result_start_time: str = Query(..., description=f"模拟结果开始时间。{TIME_WITH_TZ_DESC}"),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
@@ -195,9 +217,13 @@ async def store_realtime_simulation_result(
|
|||||||
return {"message": "Simulation results stored successfully"}
|
return {"message": "Simulation results stored successfully"}
|
||||||
|
|
||||||
|
|
||||||
@router.get("/realtime/query/by-time-property", summary="按时间和属性查询实时数据")
|
@router.get(
|
||||||
|
"/realtime/query/by-time-property",
|
||||||
|
summary="按时间和属性查询实时数据",
|
||||||
|
description="查询指定时间点的实时属性值。query_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。",
|
||||||
|
)
|
||||||
async def query_realtime_records_by_time_property(
|
async def query_realtime_records_by_time_property(
|
||||||
query_time: str = Query(..., description="查询时间"),
|
query_time: str = Query(..., description=f"查询时间。{TIME_WITH_TZ_DESC}"),
|
||||||
type: str = Query(..., description="数据类型,pipe(管道)或 junction(节点)"),
|
type: str = Query(..., description="数据类型,pipe(管道)或 junction(节点)"),
|
||||||
property: str = Query(..., description="要查询的属性名称"),
|
property: str = Query(..., description="要查询的属性名称"),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
@@ -227,11 +253,15 @@ async def query_realtime_records_by_time_property(
|
|||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
@router.get("/realtime/query/by-id-time", summary="按ID和时间查询实时模拟数据")
|
@router.get(
|
||||||
|
"/realtime/query/by-id-time",
|
||||||
|
summary="按ID和时间查询实时模拟数据",
|
||||||
|
description="查询指定元素在某一时间点的实时模拟结果。query_time 必须显式带时区;允许传 UTC+8,服务端会先归一化为 UTC 再执行查询。",
|
||||||
|
)
|
||||||
async def query_realtime_simulation_by_id_time(
|
async def query_realtime_simulation_by_id_time(
|
||||||
id: str = Query(..., description="元素ID(管道ID或节点ID)"),
|
id: str = Query(..., description="元素ID(管道ID或节点ID)"),
|
||||||
type: str = Query(..., description="元素类型,pipe(管道)或 junction(节点)"),
|
type: str = Query(..., description="元素类型,pipe(管道)或 junction(节点)"),
|
||||||
query_time: str = Query(..., description="查询时间"),
|
query_time: str = Query(..., description=f"查询时间。{TIME_WITH_TZ_DESC}"),
|
||||||
conn: AsyncConnection = Depends(get_timescale_connection),
|
conn: AsyncConnection = Depends(get_timescale_connection),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -100,10 +100,12 @@ class RealtimeRepository:
|
|||||||
async def get_links_by_time_range(
|
async def get_links_by_time_range(
|
||||||
conn: AsyncConnection, start_time: datetime, end_time: datetime
|
conn: AsyncConnection, start_time: datetime, end_time: datetime
|
||||||
) -> List[dict]:
|
) -> List[dict]:
|
||||||
|
normalized_start_time = parse_utc_time(start_time, field_name="start_time")
|
||||||
|
normalized_end_time = parse_utc_time(end_time, field_name="end_time")
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
await cur.execute(
|
await cur.execute(
|
||||||
"SELECT * FROM realtime.link_simulation WHERE time >= %s AND time <= %s",
|
"SELECT * FROM realtime.link_simulation WHERE time >= %s AND time <= %s",
|
||||||
(start_time, end_time),
|
(normalized_start_time, normalized_end_time),
|
||||||
)
|
)
|
||||||
return await cur.fetchall()
|
return await cur.fetchall()
|
||||||
|
|
||||||
@@ -296,10 +298,12 @@ class RealtimeRepository:
|
|||||||
async def get_nodes_by_time_range(
|
async def get_nodes_by_time_range(
|
||||||
conn: AsyncConnection, start_time: datetime, end_time: datetime
|
conn: AsyncConnection, start_time: datetime, end_time: datetime
|
||||||
) -> List[dict]:
|
) -> List[dict]:
|
||||||
|
normalized_start_time = parse_utc_time(start_time, field_name="start_time")
|
||||||
|
normalized_end_time = parse_utc_time(end_time, field_name="end_time")
|
||||||
async with conn.cursor() as cur:
|
async with conn.cursor() as cur:
|
||||||
await cur.execute(
|
await cur.execute(
|
||||||
"SELECT * FROM realtime.node_simulation WHERE time >= %s AND time <= %s",
|
"SELECT * FROM realtime.node_simulation WHERE time >= %s AND time <= %s",
|
||||||
(start_time, end_time),
|
(normalized_start_time, normalized_end_time),
|
||||||
)
|
)
|
||||||
return await cur.fetchall()
|
return await cur.fetchall()
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,110 @@
|
|||||||
|
import asyncio
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
import importlib.util
|
||||||
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
from types import ModuleType
|
||||||
|
|
||||||
|
|
||||||
|
def _load_time_api_module():
|
||||||
|
module_path = (
|
||||||
|
Path(__file__).resolve().parents[2] / "app" / "services" / "time_api.py"
|
||||||
|
)
|
||||||
|
spec = importlib.util.spec_from_file_location("tests_time_api_under_test", module_path)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
assert spec and spec.loader
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
return module
|
||||||
|
|
||||||
|
|
||||||
|
def _load_realtime_repository():
|
||||||
|
time_api_module = _load_time_api_module()
|
||||||
|
app_module = ModuleType("app")
|
||||||
|
services_module = ModuleType("app.services")
|
||||||
|
services_module.time_api = time_api_module
|
||||||
|
app_module.services = services_module
|
||||||
|
sys.modules["app"] = app_module
|
||||||
|
sys.modules["app.services"] = services_module
|
||||||
|
sys.modules["app.services.time_api"] = time_api_module
|
||||||
|
|
||||||
|
module_path = (
|
||||||
|
Path(__file__).resolve().parents[2]
|
||||||
|
/ "app"
|
||||||
|
/ "infra"
|
||||||
|
/ "db"
|
||||||
|
/ "timescaledb"
|
||||||
|
/ "repositories"
|
||||||
|
/ "realtime.py"
|
||||||
|
)
|
||||||
|
spec = importlib.util.spec_from_file_location(
|
||||||
|
"tests_realtime_repo_under_test", module_path
|
||||||
|
)
|
||||||
|
module = importlib.util.module_from_spec(spec)
|
||||||
|
assert spec and spec.loader
|
||||||
|
spec.loader.exec_module(module)
|
||||||
|
return module.RealtimeRepository
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeCursor:
|
||||||
|
def __init__(self):
|
||||||
|
self.calls: list[tuple[str, tuple]] = []
|
||||||
|
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __aexit__(self, exc_type, exc, tb):
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def execute(self, query, params):
|
||||||
|
self.calls.append((str(query), params))
|
||||||
|
|
||||||
|
async def fetchall(self):
|
||||||
|
return []
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeConnection:
|
||||||
|
def __init__(self):
|
||||||
|
self.cursor_instance = _FakeCursor()
|
||||||
|
|
||||||
|
def cursor(self):
|
||||||
|
return self.cursor_instance
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_links_by_time_range_normalizes_inputs_to_utc():
|
||||||
|
RealtimeRepository = _load_realtime_repository()
|
||||||
|
conn = _FakeConnection()
|
||||||
|
|
||||||
|
asyncio.run(
|
||||||
|
RealtimeRepository.get_links_by_time_range(
|
||||||
|
conn,
|
||||||
|
datetime.fromisoformat("2026-06-01T08:00:00+08:00"),
|
||||||
|
datetime.fromisoformat("2026-06-01T09:00:00+08:00"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(conn.cursor_instance.calls) == 1
|
||||||
|
_, params = conn.cursor_instance.calls[0]
|
||||||
|
assert params == (
|
||||||
|
datetime(2026, 6, 1, 0, 0, tzinfo=timezone.utc),
|
||||||
|
datetime(2026, 6, 1, 1, 0, tzinfo=timezone.utc),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_nodes_by_time_range_normalizes_inputs_to_utc():
|
||||||
|
RealtimeRepository = _load_realtime_repository()
|
||||||
|
conn = _FakeConnection()
|
||||||
|
|
||||||
|
asyncio.run(
|
||||||
|
RealtimeRepository.get_nodes_by_time_range(
|
||||||
|
conn,
|
||||||
|
datetime.fromisoformat("2026-06-01T08:00:00+08:00"),
|
||||||
|
datetime.fromisoformat("2026-06-01T09:00:00+08:00"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(conn.cursor_instance.calls) == 1
|
||||||
|
_, params = conn.cursor_instance.calls[0]
|
||||||
|
assert params == (
|
||||||
|
datetime(2026, 6, 1, 0, 0, tzinfo=timezone.utc),
|
||||||
|
datetime(2026, 6, 1, 1, 0, tzinfo=timezone.utc),
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user