Files
TJWaterServerBinary/tests/unit/test_realtime_repository.py
T

111 lines
3.0 KiB
Python

import asyncio
from datetime import datetime, timezone
import importlib.util
from pathlib import Path
import sys
from types import ModuleType
def _load_time_api_module():
module_path = (
Path(__file__).resolve().parents[2] / "app" / "services" / "time_api.py"
)
spec = importlib.util.spec_from_file_location("tests_time_api_under_test", module_path)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
return module
def _load_realtime_repository():
time_api_module = _load_time_api_module()
app_module = ModuleType("app")
services_module = ModuleType("app.services")
services_module.time_api = time_api_module
app_module.services = services_module
sys.modules["app"] = app_module
sys.modules["app.services"] = services_module
sys.modules["app.services.time_api"] = time_api_module
module_path = (
Path(__file__).resolve().parents[2]
/ "app"
/ "infra"
/ "db"
/ "timescaledb"
/ "repositories"
/ "realtime.py"
)
spec = importlib.util.spec_from_file_location(
"tests_realtime_repo_under_test", module_path
)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
return module.RealtimeRepository
class _FakeCursor:
def __init__(self):
self.calls: list[tuple[str, tuple]] = []
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def execute(self, query, params):
self.calls.append((str(query), params))
async def fetchall(self):
return []
class _FakeConnection:
def __init__(self):
self.cursor_instance = _FakeCursor()
def cursor(self):
return self.cursor_instance
def test_get_links_by_time_range_normalizes_inputs_to_utc():
RealtimeRepository = _load_realtime_repository()
conn = _FakeConnection()
asyncio.run(
RealtimeRepository.get_links_by_time_range(
conn,
datetime.fromisoformat("2026-06-01T08:00:00+08:00"),
datetime.fromisoformat("2026-06-01T09:00:00+08:00"),
)
)
assert len(conn.cursor_instance.calls) == 1
_, params = conn.cursor_instance.calls[0]
assert params == (
datetime(2026, 6, 1, 0, 0, tzinfo=timezone.utc),
datetime(2026, 6, 1, 1, 0, tzinfo=timezone.utc),
)
def test_get_nodes_by_time_range_normalizes_inputs_to_utc():
RealtimeRepository = _load_realtime_repository()
conn = _FakeConnection()
asyncio.run(
RealtimeRepository.get_nodes_by_time_range(
conn,
datetime.fromisoformat("2026-06-01T08:00:00+08:00"),
datetime.fromisoformat("2026-06-01T09:00:00+08:00"),
)
)
assert len(conn.cursor_instance.calls) == 1
_, params = conn.cursor_instance.calls[0]
assert params == (
datetime(2026, 6, 1, 0, 0, tzinfo=timezone.utc),
datetime(2026, 6, 1, 1, 0, tzinfo=timezone.utc),
)