新增features查询simulation的方法和路由

This commit is contained in:
JIANG
2025-12-17 11:12:47 +08:00
parent 9b5707841b
commit 1fad2fde2c
3 changed files with 170 additions and 5 deletions

View File

@@ -1,4 +1,4 @@
from typing import List, Optional, Any, Dict
from typing import List, Optional, Any, Dict, Tuple
from datetime import datetime
from psycopg import AsyncConnection
import pandas as pd
@@ -36,7 +36,6 @@ class CompositeQueries:
device_ids: SCADA 设备ID列表
start_time: 开始时间
end_time: 结束时间
field: 要查询的字段名
Returns:
模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id
@@ -92,7 +91,7 @@ class CompositeQueries:
scheme_name: str,
) -> Dict[str, List[Dict[str, Any]]]:
"""
获取 SCADA 关联的 link/node 模拟值
获取 SCADA 关联的 link/node scheme 模拟值
根据传入的 SCADA device_ids找到关联的 link/node
并根据对应的 type查询对应的模拟数据
@@ -103,7 +102,6 @@ class CompositeQueries:
device_ids: SCADA 设备ID列表
start_time: 开始时间
end_time: 结束时间
field: 要查询的字段名
Returns:
模拟数据字典,以 device_id 为键,值为数据列表,每个数据包含 time, value 和 scada_id
@@ -160,6 +158,113 @@ class CompositeQueries:
result[device_id] = res
return result
@staticmethod
async def get_realtime_simulation_data(
timescale_conn: AsyncConnection,
featureInfos: List[Tuple[str, str]],
start_time: datetime,
end_time: datetime,
) -> Dict[str, List[Dict[str, Any]]]:
"""
获取 link/node 模拟值
根据传入的 featureInfos找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
featureInfos: 传入的 feature 信息列表,包含 (element_id, type)
start_time: 开始时间
end_time: 结束时间
Returns:
模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id
Raises:
ValueError: 当 SCADA 设备未找到或字段无效时
"""
result = {}
for feature_id, type in featureInfos:
if type.lower() == "pipe":
# 查询 link 模拟数据
res = await RealtimeRepository.get_link_field_by_time_range(
timescale_conn, start_time, end_time, feature_id, "flow"
)
elif type.lower() == "junction":
# 查询 node 模拟数据
res = await RealtimeRepository.get_node_field_by_time_range(
timescale_conn, start_time, end_time, feature_id, "pressure"
)
else:
raise ValueError(f"Unknown type: {type}")
# 添加 scada_id 到每个数据项
for item in res:
item["feature_id"] = feature_id
result[feature_id] = res
return result
@staticmethod
async def get_scheme_simulation_data(
timescale_conn: AsyncConnection,
featureInfos: List[Tuple[str, str]],
start_time: datetime,
end_time: datetime,
scheme_type: str,
scheme_name: str,
) -> Dict[str, List[Dict[str, Any]]]:
"""
获取 link/node scheme 模拟值
根据传入的 featureInfos找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
featureInfos: 传入的 feature 信息列表,包含 (element_id, type)
start_time: 开始时间
end_time: 结束时间
scheme_type: 工况类型
scheme_name: 工况名称
Returns:
模拟数据字典,以 feature_id 为键,值为数据列表,每个数据包含 time, value 和 feature_id
Raises:
ValueError: 当类型无效时
"""
result = {}
for feature_id, type in featureInfos:
if type.lower() == "pipe":
# 查询 link 模拟数据
res = await SchemeRepository.get_link_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
feature_id,
"flow",
)
elif type.lower() == "junction":
# 查询 node 模拟数据
res = await SchemeRepository.get_node_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
feature_id,
"pressure",
)
else:
raise ValueError(f"Unknown type: {type}")
# 添加 feature_id 到每个数据项
for item in res:
item["feature_id"] = feature_id
result[feature_id] = res
return result
@staticmethod
async def get_element_associated_scada_data(
timescale_conn: AsyncConnection,