Files
TJWaterServerBinary/timescaledb/internal_queries.py

46 lines
1.4 KiB
Python

from typing import List
from timescaledb.schemas.scheme import SchemeRepository
from timescaledb.schemas.realtime import RealtimeRepository
from timescaledb.database import get_database_instance
# 内部使用存储类
class InternalStorage:
@staticmethod
async def store_realtime_simulation(
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
db_name: str = None,
):
"""存储实时模拟结果"""
instance = await get_database_instance(db_name)
async with instance.get_connection() as conn:
await RealtimeRepository.store_realtime_simulation_result(
conn, node_result_list, link_result_list, result_start_time
)
@staticmethod
async def store_scheme_simulation(
scheme_type: str,
scheme_name: str,
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
num_periods: int = 1,
db_name: str = None,
):
"""存储方案模拟结果"""
instance = await get_database_instance(db_name)
async with instance.get_connection() as conn:
await SchemeRepository.store_scheme_simulation_result(
conn,
scheme_type,
scheme_name,
node_result_list,
link_result_list,
result_start_time,
num_periods,
)