Files
TJWaterServerBinary/app/infra/db/timescaledb/internal_queries.py
2026-01-22 17:00:10 +08:00

123 lines
4.6 KiB
Python

from typing import List
from fastapi.logger import logger
from datetime import datetime, timedelta
import psycopg
import time
from app.infra.db.timescaledb.schemas.scheme import SchemeRepository
from app.infra.db.timescaledb.schemas.realtime import RealtimeRepository
import app.infra.db.timescaledb.timescaledb_info as timescaledb_info
from app.infra.db.timescaledb.schemas.scada import ScadaRepository
class InternalStorage:
@staticmethod
def store_realtime_simulation(
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
db_name: str = None,
max_retries: int = 3,
):
"""存储实时模拟结果"""
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
RealtimeRepository.store_realtime_simulation_result_sync(
conn, node_result_list, link_result_list, result_start_time
)
break # 成功
except Exception as e:
logger.error(f"存储尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1) # 重试前等待
else:
raise # 达到最大重试次数后抛出异常
@staticmethod
def store_scheme_simulation(
scheme_type: str,
scheme_name: str,
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
num_periods: int = 1,
db_name: str = None,
max_retries: int = 3,
):
"""存储方案模拟结果"""
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
SchemeRepository.store_scheme_simulation_result_sync(
conn,
scheme_type,
scheme_name,
node_result_list,
link_result_list,
result_start_time,
num_periods,
)
break # 成功
except Exception as e:
logger.error(f"存储尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1) # 重试前等待
else:
raise # 达到最大重试次数后抛出异常
class InternalQueries:
@staticmethod
def query_scada_by_ids_time(
device_ids: List[str],
query_time: str,
db_name: str = None,
max_retries: int = 3,
) -> dict:
"""查询指定时间点的 SCADA 数据"""
# 解析时间,假设是北京时间
beijing_time = datetime.fromisoformat(query_time)
start_time = beijing_time - timedelta(seconds=1)
end_time = beijing_time + timedelta(seconds=1)
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
rows = ScadaRepository.get_scada_by_ids_time_range_sync(
conn, device_ids, start_time, end_time
)
# 处理结果,返回每个 device_id 的第一个值
result = {}
for device_id in device_ids:
device_rows = [
row for row in rows if row["device_id"] == device_id
]
if device_rows:
result[device_id] = device_rows[0]["monitored_value"]
else:
result[device_id] = None
return result
except Exception as e:
logger.error(f"查询尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
raise