新增pg和timescaledb的复合查询方法,用于查询 scada 关联的模拟、原始、清洗数据
This commit is contained in:
@@ -1,3 +1,4 @@
|
|||||||
from .router import router
|
from .router import router
|
||||||
from .database import *
|
from .database import *
|
||||||
from .timescaledb_info import *
|
from .timescaledb_info import *
|
||||||
|
from .composite_queries import CompositeQueries
|
||||||
@@ -0,0 +1,126 @@
|
|||||||
|
from typing import List, Optional, Dict, Any
|
||||||
|
from datetime import datetime
|
||||||
|
from psycopg import AsyncConnection
|
||||||
|
|
||||||
|
from postgresql.scada_info import ScadaRepository as PostgreScadaRepository
|
||||||
|
from timescaledb.schemas.realtime import RealtimeRepository
|
||||||
|
from timescaledb.schemas.scada import ScadaRepository
|
||||||
|
|
||||||
|
|
||||||
|
class CompositeQueries:
|
||||||
|
"""
|
||||||
|
复合查询类,提供跨表查询功能
|
||||||
|
"""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def get_scada_associated_simulation_data(
|
||||||
|
timescale_conn: AsyncConnection,
|
||||||
|
postgres_conn: AsyncConnection,
|
||||||
|
device_id: str,
|
||||||
|
start_time: datetime,
|
||||||
|
end_time: datetime,
|
||||||
|
field: str,
|
||||||
|
) -> Optional[Any]:
|
||||||
|
"""
|
||||||
|
获取 SCADA 关联的 link/node 模拟值
|
||||||
|
|
||||||
|
根据传入的 SCADA device_id,找到关联的 link/node,
|
||||||
|
并根据对应的 type,查询对应的模拟数据
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timescale_conn: TimescaleDB 异步连接
|
||||||
|
postgres_conn: PostgreSQL 异步连接
|
||||||
|
device_id: SCADA 设备ID
|
||||||
|
start_time: 开始时间
|
||||||
|
end_time: 结束时间
|
||||||
|
field: 要查询的字段名
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
模拟数据值,如果没有找到则返回 None
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: 当 SCADA 设备未找到或字段无效时
|
||||||
|
"""
|
||||||
|
# 1. 查询所有 SCADA 信息
|
||||||
|
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
|
||||||
|
|
||||||
|
# 2. 根据 device_id 找到对应的 SCADA 信息
|
||||||
|
target_scada = None
|
||||||
|
for scada in scada_infos:
|
||||||
|
if scada["id"] == device_id:
|
||||||
|
target_scada = scada
|
||||||
|
break
|
||||||
|
|
||||||
|
if not target_scada:
|
||||||
|
raise ValueError(f"SCADA device {device_id} not found")
|
||||||
|
|
||||||
|
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
|
||||||
|
element_id = target_scada["associated_element_id"]
|
||||||
|
scada_type = target_scada["type"]
|
||||||
|
|
||||||
|
if scada_type.lower() == "pipe_flow":
|
||||||
|
# 查询 link 模拟数据
|
||||||
|
return await RealtimeRepository.get_link_field_by_time_range(
|
||||||
|
timescale_conn, start_time, end_time, element_id, field
|
||||||
|
)
|
||||||
|
elif scada_type.lower() == "pressure":
|
||||||
|
# 查询 node 模拟数据
|
||||||
|
return await RealtimeRepository.get_node_field_by_time_range(
|
||||||
|
timescale_conn, start_time, end_time, element_id, field
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unknown SCADA type: {scada_type}")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
async def get_element_associated_scada_data(
|
||||||
|
timescale_conn: AsyncConnection,
|
||||||
|
postgres_conn: AsyncConnection,
|
||||||
|
element_id: str,
|
||||||
|
start_time: datetime,
|
||||||
|
end_time: datetime,
|
||||||
|
use_cleaned: bool = False,
|
||||||
|
) -> Optional[Any]:
|
||||||
|
"""
|
||||||
|
获取 link/node 关联的 SCADA 监测值
|
||||||
|
|
||||||
|
根据传入的 link/node id,匹配 SCADA 信息,
|
||||||
|
如果存在关联的 SCADA device_id,获取实际的监测数据
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timescale_conn: TimescaleDB 异步连接
|
||||||
|
postgres_conn: PostgreSQL 异步连接
|
||||||
|
element_id: link 或 node 的 ID
|
||||||
|
start_time: 开始时间
|
||||||
|
end_time: 结束时间
|
||||||
|
use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value")
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
SCADA 监测数据值,如果没有找到则返回 None
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValueError: 当元素类型无效时
|
||||||
|
"""
|
||||||
|
|
||||||
|
# 1. 查询所有 SCADA 信息
|
||||||
|
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
|
||||||
|
|
||||||
|
# 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备
|
||||||
|
associated_scada = None
|
||||||
|
for scada in scada_infos:
|
||||||
|
if scada["associated_element_id"] == element_id:
|
||||||
|
associated_scada = scada
|
||||||
|
break
|
||||||
|
|
||||||
|
if not associated_scada:
|
||||||
|
# 没有找到关联的 SCADA 设备
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 3. 通过 SCADA device_id 获取监测数据
|
||||||
|
device_id = associated_scada["id"]
|
||||||
|
|
||||||
|
# 根据 use_cleaned 参数选择字段
|
||||||
|
data_field = "cleaned_value" if use_cleaned else "monitored_value"
|
||||||
|
|
||||||
|
return await ScadaRepository.get_scada_field_by_id_time_range(
|
||||||
|
timescale_conn, device_id, start_time, end_time, data_field
|
||||||
|
)
|
||||||
|
|||||||
@@ -7,6 +7,8 @@ from .database import get_database_instance
|
|||||||
from .schemas.realtime import RealtimeRepository
|
from .schemas.realtime import RealtimeRepository
|
||||||
from .schemas.scheme import SchemeRepository
|
from .schemas.scheme import SchemeRepository
|
||||||
from .schemas.scada import ScadaRepository
|
from .schemas.scada import ScadaRepository
|
||||||
|
from .composite_queries import CompositeQueries
|
||||||
|
from postgresql.database import get_database_instance as get_postgres_database_instance
|
||||||
|
|
||||||
router = APIRouter(prefix="/timescaledb", tags=["TimescaleDB"])
|
router = APIRouter(prefix="/timescaledb", tags=["TimescaleDB"])
|
||||||
|
|
||||||
@@ -23,6 +25,18 @@ async def get_database_connection(
|
|||||||
yield conn
|
yield conn
|
||||||
|
|
||||||
|
|
||||||
|
# PostgreSQL 数据库连接依赖函数
|
||||||
|
async def get_postgres_connection(
|
||||||
|
db_name: Optional[str] = Query(
|
||||||
|
None, description="指定要连接的 PostgreSQL 数据库名称,为空时使用默认数据库"
|
||||||
|
)
|
||||||
|
):
|
||||||
|
"""获取 PostgreSQL 数据库连接,支持通过查询参数指定数据库名称"""
|
||||||
|
instance = await get_postgres_database_instance(db_name)
|
||||||
|
async with instance.get_connection() as conn:
|
||||||
|
yield conn
|
||||||
|
|
||||||
|
|
||||||
# --- Realtime Endpoints ---
|
# --- Realtime Endpoints ---
|
||||||
|
|
||||||
|
|
||||||
@@ -40,7 +54,7 @@ async def get_realtime_links(
|
|||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
return await RealtimeRepository.get_links_by_time(conn, start_time, end_time)
|
return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time)
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/realtime/links")
|
@router.delete("/realtime/links")
|
||||||
@@ -49,7 +63,7 @@ async def delete_realtime_links(
|
|||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
await RealtimeRepository.delete_links_by_time(conn, start_time, end_time)
|
await RealtimeRepository.delete_links_by_time_range(conn, start_time, end_time)
|
||||||
return {"message": "Deleted successfully"}
|
return {"message": "Deleted successfully"}
|
||||||
|
|
||||||
|
|
||||||
@@ -82,7 +96,7 @@ async def get_realtime_nodes(
|
|||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
return await RealtimeRepository.get_nodes_by_time(conn, start_time, end_time)
|
return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time)
|
||||||
|
|
||||||
|
|
||||||
@router.delete("/realtime/nodes")
|
@router.delete("/realtime/nodes")
|
||||||
@@ -91,10 +105,56 @@ async def delete_realtime_nodes(
|
|||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
await RealtimeRepository.delete_nodes_by_time(conn, start_time, end_time)
|
await RealtimeRepository.delete_nodes_by_time_range(conn, start_time, end_time)
|
||||||
return {"message": "Deleted successfully"}
|
return {"message": "Deleted successfully"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/realtime/simulation/store", status_code=201)
|
||||||
|
async def store_realtime_simulation_result(
|
||||||
|
node_result_list: List[dict],
|
||||||
|
link_result_list: List[dict],
|
||||||
|
result_start_time: str,
|
||||||
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
):
|
||||||
|
"""Store realtime simulation results to TimescaleDB"""
|
||||||
|
await RealtimeRepository.store_realtime_simulation_result(
|
||||||
|
conn, node_result_list, link_result_list, result_start_time
|
||||||
|
)
|
||||||
|
return {"message": "Simulation results stored successfully"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/realtime/query/by-time-property")
|
||||||
|
async def query_realtime_records_by_time_property(
|
||||||
|
query_time: str,
|
||||||
|
type: str,
|
||||||
|
property: str,
|
||||||
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
):
|
||||||
|
"""Query all realtime records by time and property"""
|
||||||
|
try:
|
||||||
|
return await RealtimeRepository.query_all_record_by_time_property(
|
||||||
|
conn, query_time, type, property
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/realtime/query/by-id-time")
|
||||||
|
async def query_realtime_simulation_by_id_time(
|
||||||
|
ID: str,
|
||||||
|
type: str,
|
||||||
|
query_time: str,
|
||||||
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
):
|
||||||
|
"""Query realtime simulation results by ID and time"""
|
||||||
|
try:
|
||||||
|
return await RealtimeRepository.query_simulation_result_by_ID_time(
|
||||||
|
conn, ID, type, query_time
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
# --- Scheme Endpoints ---
|
# --- Scheme Endpoints ---
|
||||||
|
|
||||||
|
|
||||||
@@ -113,7 +173,7 @@ async def get_scheme_links(
|
|||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
return await SchemeRepository.get_links_by_scheme_and_time(
|
return await SchemeRepository.get_links_by_scheme_and_time_range(
|
||||||
conn, scheme, start_time, end_time
|
conn, scheme, start_time, end_time
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -122,13 +182,14 @@ async def get_scheme_links(
|
|||||||
async def get_scheme_link_field(
|
async def get_scheme_link_field(
|
||||||
scheme: str,
|
scheme: str,
|
||||||
link_id: str,
|
link_id: str,
|
||||||
time: datetime,
|
start_time: datetime,
|
||||||
|
end_time: datetime,
|
||||||
field: str,
|
field: str,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
return await SchemeRepository.get_link_field_by_scheme_and_time(
|
return await SchemeRepository.get_link_field_by_scheme_and_time_range(
|
||||||
conn, time, scheme, link_id, field
|
conn, scheme, start_time, end_time, link_id, field
|
||||||
)
|
)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
@@ -159,7 +220,7 @@ async def delete_scheme_links(
|
|||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
await SchemeRepository.delete_links_by_scheme_and_time(
|
await SchemeRepository.delete_links_by_scheme_and_time_range(
|
||||||
conn, scheme, start_time, end_time
|
conn, scheme, start_time, end_time
|
||||||
)
|
)
|
||||||
return {"message": "Deleted successfully"}
|
return {"message": "Deleted successfully"}
|
||||||
@@ -177,13 +238,14 @@ async def insert_scheme_nodes(
|
|||||||
async def get_scheme_node_field(
|
async def get_scheme_node_field(
|
||||||
scheme: str,
|
scheme: str,
|
||||||
node_id: str,
|
node_id: str,
|
||||||
time: datetime,
|
start_time: datetime,
|
||||||
|
end_time: datetime,
|
||||||
field: str,
|
field: str,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
return await SchemeRepository.get_node_field_by_scheme_and_time(
|
return await SchemeRepository.get_node_field_by_scheme_and_time_range(
|
||||||
conn, time, scheme, node_id, field
|
conn, scheme, start_time, end_time, node_id, field
|
||||||
)
|
)
|
||||||
except ValueError as e:
|
except ValueError as e:
|
||||||
raise HTTPException(status_code=400, detail=str(e))
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
@@ -214,12 +276,61 @@ async def delete_scheme_nodes(
|
|||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
conn: AsyncConnection = Depends(get_database_connection),
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
):
|
):
|
||||||
await SchemeRepository.delete_nodes_by_scheme_and_time(
|
await SchemeRepository.delete_nodes_by_scheme_and_time_range(
|
||||||
conn, scheme, start_time, end_time
|
conn, scheme, start_time, end_time
|
||||||
)
|
)
|
||||||
return {"message": "Deleted successfully"}
|
return {"message": "Deleted successfully"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/scheme/simulation/store", status_code=201)
|
||||||
|
async def store_scheme_simulation_result(
|
||||||
|
scheme: str,
|
||||||
|
node_result_list: List[dict],
|
||||||
|
link_result_list: List[dict],
|
||||||
|
result_start_time: str,
|
||||||
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
):
|
||||||
|
"""Store scheme simulation results to TimescaleDB"""
|
||||||
|
await SchemeRepository.store_scheme_simulation_result(
|
||||||
|
conn, scheme, node_result_list, link_result_list, result_start_time
|
||||||
|
)
|
||||||
|
return {"message": "Scheme simulation results stored successfully"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/scheme/query/by-scheme-time-property")
|
||||||
|
async def query_scheme_records_by_scheme_time_property(
|
||||||
|
scheme: str,
|
||||||
|
query_time: str,
|
||||||
|
type: str,
|
||||||
|
property: str,
|
||||||
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
):
|
||||||
|
"""Query all scheme records by scheme, time and property"""
|
||||||
|
try:
|
||||||
|
return await SchemeRepository.query_all_record_by_scheme_time_property(
|
||||||
|
conn, scheme, query_time, type, property
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/scheme/query/by-id-time")
|
||||||
|
async def query_scheme_simulation_by_id_time(
|
||||||
|
scheme: str,
|
||||||
|
ID: str,
|
||||||
|
type: str,
|
||||||
|
query_time: str,
|
||||||
|
conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
):
|
||||||
|
"""Query scheme simulation results by ID and time"""
|
||||||
|
try:
|
||||||
|
return await SchemeRepository.query_scheme_simulation_result_by_ID_time(
|
||||||
|
conn, scheme, ID, type, query_time
|
||||||
|
)
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
# --- SCADA Endpoints ---
|
# --- SCADA Endpoints ---
|
||||||
|
|
||||||
|
|
||||||
@@ -285,3 +396,60 @@ async def delete_scada_data(
|
|||||||
conn, device_id, start_time, end_time
|
conn, device_id, start_time, end_time
|
||||||
)
|
)
|
||||||
return {"message": "Deleted successfully"}
|
return {"message": "Deleted successfully"}
|
||||||
|
|
||||||
|
|
||||||
|
# --- Composite Query Endpoints ---
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/composite/scada-simulation")
|
||||||
|
async def get_scada_associated_simulation_data(
|
||||||
|
device_id: str,
|
||||||
|
start_time: datetime,
|
||||||
|
end_time: datetime,
|
||||||
|
field: str,
|
||||||
|
timescale_conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
获取 SCADA 关联的 link/node 模拟值
|
||||||
|
|
||||||
|
根据传入的 SCADA device_id,找到关联的 link/node,
|
||||||
|
并根据对应的 type,查询对应的模拟数据
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = await CompositeQueries.get_scada_associated_simulation_data(
|
||||||
|
timescale_conn, postgres_conn, device_id, start_time, end_time, field
|
||||||
|
)
|
||||||
|
if result is None:
|
||||||
|
raise HTTPException(status_code=404, detail="No simulation data found")
|
||||||
|
return result
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/composite/element-scada")
|
||||||
|
async def get_element_associated_scada_data(
|
||||||
|
element_id: str,
|
||||||
|
start_time: datetime,
|
||||||
|
end_time: datetime,
|
||||||
|
use_cleaned: bool = Query(False, description="是否使用清洗后的数据"),
|
||||||
|
timescale_conn: AsyncConnection = Depends(get_database_connection),
|
||||||
|
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
获取 link/node 关联的 SCADA 监测值
|
||||||
|
|
||||||
|
根据传入的 link/node id,匹配 SCADA 信息,
|
||||||
|
如果存在关联的 SCADA device_id,获取实际的监测数据
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = await CompositeQueries.get_element_associated_scada_data(
|
||||||
|
timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned
|
||||||
|
)
|
||||||
|
if result is None:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404, detail="No associated SCADA data found"
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(status_code=400, detail=str(e))
|
||||||
|
|||||||
@@ -94,7 +94,6 @@ class RealtimeRepository:
|
|||||||
conn: AsyncConnection,
|
conn: AsyncConnection,
|
||||||
start_time: datetime,
|
start_time: datetime,
|
||||||
end_time: datetime,
|
end_time: datetime,
|
||||||
link_id: str,
|
|
||||||
field: str,
|
field: str,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
# Validate field name to prevent SQL injection
|
# Validate field name to prevent SQL injection
|
||||||
|
|||||||
Reference in New Issue
Block a user