优化传感器布置算法,修复数据库更新逻辑
This commit is contained in:
@@ -0,0 +1,87 @@
|
||||
from datetime import datetime, timezone
|
||||
import importlib.util
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _load_scada_repository():
|
||||
module_path = (
|
||||
Path(__file__).resolve().parents[2]
|
||||
/ "app"
|
||||
/ "infra"
|
||||
/ "db"
|
||||
/ "timescaledb"
|
||||
/ "repositories"
|
||||
/ "scada.py"
|
||||
)
|
||||
spec = importlib.util.spec_from_file_location("tests_scada_repo_under_test", module_path)
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
assert spec and spec.loader
|
||||
spec.loader.exec_module(module)
|
||||
return module.ScadaRepository
|
||||
|
||||
|
||||
class _FakeCursor:
|
||||
def __init__(self, initial_rowcount: int):
|
||||
self.initial_rowcount = initial_rowcount
|
||||
self.rowcount = 0
|
||||
self.calls: list[tuple[str, tuple]] = []
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
async def execute(self, query, params):
|
||||
self.calls.append((str(query), params))
|
||||
if len(self.calls) == 1:
|
||||
self.rowcount = self.initial_rowcount
|
||||
else:
|
||||
self.rowcount = 1
|
||||
|
||||
|
||||
class _FakeConnection:
|
||||
def __init__(self, initial_rowcount: int):
|
||||
self.cursor_instance = _FakeCursor(initial_rowcount)
|
||||
|
||||
def cursor(self):
|
||||
return self.cursor_instance
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_scada_field_inserts_when_update_hits_no_rows():
|
||||
ScadaRepository = _load_scada_repository()
|
||||
conn = _FakeConnection(initial_rowcount=0)
|
||||
point_time = datetime(2026, 1, 1, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
await ScadaRepository.update_scada_field(
|
||||
conn,
|
||||
point_time,
|
||||
"170490",
|
||||
"cleaned_value",
|
||||
26.5,
|
||||
)
|
||||
|
||||
assert len(conn.cursor_instance.calls) == 2
|
||||
assert "UPDATE scada.scada_data SET" in conn.cursor_instance.calls[0][0]
|
||||
assert "INSERT INTO scada.scada_data" in conn.cursor_instance.calls[1][0]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_scada_field_skips_insert_when_update_succeeds():
|
||||
ScadaRepository = _load_scada_repository()
|
||||
conn = _FakeConnection(initial_rowcount=1)
|
||||
point_time = datetime(2026, 1, 1, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
await ScadaRepository.update_scada_field(
|
||||
conn,
|
||||
point_time,
|
||||
"170490",
|
||||
"cleaned_value",
|
||||
26.5,
|
||||
)
|
||||
|
||||
assert len(conn.cursor_instance.calls) == 1
|
||||
assert "UPDATE scada.scada_data SET" in conn.cursor_instance.calls[0][0]
|
||||
Reference in New Issue
Block a user