diff --git a/timescaledb/__init__.py b/timescaledb/__init__.py index 398a9b5..df7b7e3 100644 --- a/timescaledb/__init__.py +++ b/timescaledb/__init__.py @@ -1,3 +1,4 @@ from .router import router from .database import * -from .timescaledb_info import * \ No newline at end of file +from .timescaledb_info import * +from .composite_queries import CompositeQueries \ No newline at end of file diff --git a/timescaledb/composite_queries.py b/timescaledb/composite_queries.py index e69de29..d6c9ff7 100644 --- a/timescaledb/composite_queries.py +++ b/timescaledb/composite_queries.py @@ -0,0 +1,126 @@ +from typing import List, Optional, Dict, Any +from datetime import datetime +from psycopg import AsyncConnection + +from postgresql.scada_info import ScadaRepository as PostgreScadaRepository +from timescaledb.schemas.realtime import RealtimeRepository +from timescaledb.schemas.scada import ScadaRepository + + +class CompositeQueries: + """ + 复合查询类,提供跨表查询功能 + """ + + @staticmethod + async def get_scada_associated_simulation_data( + timescale_conn: AsyncConnection, + postgres_conn: AsyncConnection, + device_id: str, + start_time: datetime, + end_time: datetime, + field: str, + ) -> Optional[Any]: + """ + 获取 SCADA 关联的 link/node 模拟值 + + 根据传入的 SCADA device_id,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + + Args: + timescale_conn: TimescaleDB 异步连接 + postgres_conn: PostgreSQL 异步连接 + device_id: SCADA 设备ID + start_time: 开始时间 + end_time: 结束时间 + field: 要查询的字段名 + + Returns: + 模拟数据值,如果没有找到则返回 None + + Raises: + ValueError: 当 SCADA 设备未找到或字段无效时 + """ + # 1. 查询所有 SCADA 信息 + scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) + + # 2. 根据 device_id 找到对应的 SCADA 信息 + target_scada = None + for scada in scada_infos: + if scada["id"] == device_id: + target_scada = scada + break + + if not target_scada: + raise ValueError(f"SCADA device {device_id} not found") + + # 3. 根据 type 和 associated_element_id 查询对应的模拟数据 + element_id = target_scada["associated_element_id"] + scada_type = target_scada["type"] + + if scada_type.lower() == "pipe_flow": + # 查询 link 模拟数据 + return await RealtimeRepository.get_link_field_by_time_range( + timescale_conn, start_time, end_time, element_id, field + ) + elif scada_type.lower() == "pressure": + # 查询 node 模拟数据 + return await RealtimeRepository.get_node_field_by_time_range( + timescale_conn, start_time, end_time, element_id, field + ) + else: + raise ValueError(f"Unknown SCADA type: {scada_type}") + + @staticmethod + async def get_element_associated_scada_data( + timescale_conn: AsyncConnection, + postgres_conn: AsyncConnection, + element_id: str, + start_time: datetime, + end_time: datetime, + use_cleaned: bool = False, + ) -> Optional[Any]: + """ + 获取 link/node 关联的 SCADA 监测值 + + 根据传入的 link/node id,匹配 SCADA 信息, + 如果存在关联的 SCADA device_id,获取实际的监测数据 + + Args: + timescale_conn: TimescaleDB 异步连接 + postgres_conn: PostgreSQL 异步连接 + element_id: link 或 node 的 ID + start_time: 开始时间 + end_time: 结束时间 + use_cleaned: 是否使用清洗后的数据 (True: "cleaned_value", False: "monitored_value") + + Returns: + SCADA 监测数据值,如果没有找到则返回 None + + Raises: + ValueError: 当元素类型无效时 + """ + + # 1. 查询所有 SCADA 信息 + scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn) + + # 2. 根据 element_type 和 element_id 找到关联的 SCADA 设备 + associated_scada = None + for scada in scada_infos: + if scada["associated_element_id"] == element_id: + associated_scada = scada + break + + if not associated_scada: + # 没有找到关联的 SCADA 设备 + return None + + # 3. 通过 SCADA device_id 获取监测数据 + device_id = associated_scada["id"] + + # 根据 use_cleaned 参数选择字段 + data_field = "cleaned_value" if use_cleaned else "monitored_value" + + return await ScadaRepository.get_scada_field_by_id_time_range( + timescale_conn, device_id, start_time, end_time, data_field + ) diff --git a/timescaledb/router.py b/timescaledb/router.py index 680789d..fe2ab72 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -7,6 +7,8 @@ from .database import get_database_instance from .schemas.realtime import RealtimeRepository from .schemas.scheme import SchemeRepository from .schemas.scada import ScadaRepository +from .composite_queries import CompositeQueries +from postgresql.database import get_database_instance as get_postgres_database_instance router = APIRouter(prefix="/timescaledb", tags=["TimescaleDB"]) @@ -23,6 +25,18 @@ async def get_database_connection( yield conn +# PostgreSQL 数据库连接依赖函数 +async def get_postgres_connection( + db_name: Optional[str] = Query( + None, description="指定要连接的 PostgreSQL 数据库名称,为空时使用默认数据库" + ) +): + """获取 PostgreSQL 数据库连接,支持通过查询参数指定数据库名称""" + instance = await get_postgres_database_instance(db_name) + async with instance.get_connection() as conn: + yield conn + + # --- Realtime Endpoints --- @@ -40,7 +54,7 @@ async def get_realtime_links( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - return await RealtimeRepository.get_links_by_time(conn, start_time, end_time) + return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time) @router.delete("/realtime/links") @@ -49,7 +63,7 @@ async def delete_realtime_links( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - await RealtimeRepository.delete_links_by_time(conn, start_time, end_time) + await RealtimeRepository.delete_links_by_time_range(conn, start_time, end_time) return {"message": "Deleted successfully"} @@ -82,7 +96,7 @@ async def get_realtime_nodes( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - return await RealtimeRepository.get_nodes_by_time(conn, start_time, end_time) + return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time) @router.delete("/realtime/nodes") @@ -91,10 +105,56 @@ async def delete_realtime_nodes( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - await RealtimeRepository.delete_nodes_by_time(conn, start_time, end_time) + await RealtimeRepository.delete_nodes_by_time_range(conn, start_time, end_time) return {"message": "Deleted successfully"} +@router.post("/realtime/simulation/store", status_code=201) +async def store_realtime_simulation_result( + node_result_list: List[dict], + link_result_list: List[dict], + result_start_time: str, + conn: AsyncConnection = Depends(get_database_connection), +): + """Store realtime simulation results to TimescaleDB""" + await RealtimeRepository.store_realtime_simulation_result( + conn, node_result_list, link_result_list, result_start_time + ) + return {"message": "Simulation results stored successfully"} + + +@router.get("/realtime/query/by-time-property") +async def query_realtime_records_by_time_property( + query_time: str, + type: str, + property: str, + conn: AsyncConnection = Depends(get_database_connection), +): + """Query all realtime records by time and property""" + try: + return await RealtimeRepository.query_all_record_by_time_property( + conn, query_time, type, property + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/realtime/query/by-id-time") +async def query_realtime_simulation_by_id_time( + ID: str, + type: str, + query_time: str, + conn: AsyncConnection = Depends(get_database_connection), +): + """Query realtime simulation results by ID and time""" + try: + return await RealtimeRepository.query_simulation_result_by_ID_time( + conn, ID, type, query_time + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + # --- Scheme Endpoints --- @@ -113,7 +173,7 @@ async def get_scheme_links( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - return await SchemeRepository.get_links_by_scheme_and_time( + return await SchemeRepository.get_links_by_scheme_and_time_range( conn, scheme, start_time, end_time ) @@ -122,13 +182,14 @@ async def get_scheme_links( async def get_scheme_link_field( scheme: str, link_id: str, - time: datetime, + start_time: datetime, + end_time: datetime, field: str, conn: AsyncConnection = Depends(get_database_connection), ): try: - return await SchemeRepository.get_link_field_by_scheme_and_time( - conn, time, scheme, link_id, field + return await SchemeRepository.get_link_field_by_scheme_and_time_range( + conn, scheme, start_time, end_time, link_id, field ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -159,7 +220,7 @@ async def delete_scheme_links( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - await SchemeRepository.delete_links_by_scheme_and_time( + await SchemeRepository.delete_links_by_scheme_and_time_range( conn, scheme, start_time, end_time ) return {"message": "Deleted successfully"} @@ -177,13 +238,14 @@ async def insert_scheme_nodes( async def get_scheme_node_field( scheme: str, node_id: str, - time: datetime, + start_time: datetime, + end_time: datetime, field: str, conn: AsyncConnection = Depends(get_database_connection), ): try: - return await SchemeRepository.get_node_field_by_scheme_and_time( - conn, time, scheme, node_id, field + return await SchemeRepository.get_node_field_by_scheme_and_time_range( + conn, scheme, start_time, end_time, node_id, field ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -214,12 +276,61 @@ async def delete_scheme_nodes( end_time: datetime, conn: AsyncConnection = Depends(get_database_connection), ): - await SchemeRepository.delete_nodes_by_scheme_and_time( + await SchemeRepository.delete_nodes_by_scheme_and_time_range( conn, scheme, start_time, end_time ) return {"message": "Deleted successfully"} +@router.post("/scheme/simulation/store", status_code=201) +async def store_scheme_simulation_result( + scheme: str, + node_result_list: List[dict], + link_result_list: List[dict], + result_start_time: str, + conn: AsyncConnection = Depends(get_database_connection), +): + """Store scheme simulation results to TimescaleDB""" + await SchemeRepository.store_scheme_simulation_result( + conn, scheme, node_result_list, link_result_list, result_start_time + ) + return {"message": "Scheme simulation results stored successfully"} + + +@router.get("/scheme/query/by-scheme-time-property") +async def query_scheme_records_by_scheme_time_property( + scheme: str, + query_time: str, + type: str, + property: str, + conn: AsyncConnection = Depends(get_database_connection), +): + """Query all scheme records by scheme, time and property""" + try: + return await SchemeRepository.query_all_record_by_scheme_time_property( + conn, scheme, query_time, type, property + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/scheme/query/by-id-time") +async def query_scheme_simulation_by_id_time( + scheme: str, + ID: str, + type: str, + query_time: str, + conn: AsyncConnection = Depends(get_database_connection), +): + """Query scheme simulation results by ID and time""" + try: + return await SchemeRepository.query_scheme_simulation_result_by_ID_time( + conn, scheme, ID, type, query_time + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + # --- SCADA Endpoints --- @@ -285,3 +396,60 @@ async def delete_scada_data( conn, device_id, start_time, end_time ) return {"message": "Deleted successfully"} + + +# --- Composite Query Endpoints --- + + +@router.get("/composite/scada-simulation") +async def get_scada_associated_simulation_data( + device_id: str, + start_time: datetime, + end_time: datetime, + field: str, + timescale_conn: AsyncConnection = Depends(get_database_connection), + postgres_conn: AsyncConnection = Depends(get_postgres_connection), +): + """ + 获取 SCADA 关联的 link/node 模拟值 + + 根据传入的 SCADA device_id,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + """ + try: + result = await CompositeQueries.get_scada_associated_simulation_data( + timescale_conn, postgres_conn, device_id, start_time, end_time, field + ) + if result is None: + raise HTTPException(status_code=404, detail="No simulation data found") + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/composite/element-scada") +async def get_element_associated_scada_data( + element_id: str, + start_time: datetime, + end_time: datetime, + use_cleaned: bool = Query(False, description="是否使用清洗后的数据"), + timescale_conn: AsyncConnection = Depends(get_database_connection), + postgres_conn: AsyncConnection = Depends(get_postgres_connection), +): + """ + 获取 link/node 关联的 SCADA 监测值 + + 根据传入的 link/node id,匹配 SCADA 信息, + 如果存在关联的 SCADA device_id,获取实际的监测数据 + """ + try: + result = await CompositeQueries.get_element_associated_scada_data( + timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned + ) + if result is None: + raise HTTPException( + status_code=404, detail="No associated SCADA data found" + ) + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) diff --git a/timescaledb/schemas/realtime.py b/timescaledb/schemas/realtime.py index 91bf09f..6305676 100644 --- a/timescaledb/schemas/realtime.py +++ b/timescaledb/schemas/realtime.py @@ -94,7 +94,6 @@ class RealtimeRepository: conn: AsyncConnection, start_time: datetime, end_time: datetime, - link_id: str, field: str, ) -> Any: # Validate field name to prevent SQL injection