From 559d5bb8e31c2c09b3c214b83d3e5fd23abaddfe Mon Sep 17 00:00:00 2001 From: Jiang Date: Mon, 9 Mar 2026 18:20:46 +0800 Subject: [PATCH] =?UTF-8?q?app/infra/db=E4=B8=AD=20router=20=E8=BF=81?= =?UTF-8?q?=E7=A7=BB=E5=B9=B6=E6=9B=B4=E6=96=B0=EF=BC=8C=E6=B8=85=E7=90=86?= =?UTF-8?q?=20infra=20=E5=B1=82=E7=9A=84=E6=97=A7=20router?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../v1/endpoints/project_data.py} | 9 +- app/api/v1/endpoints/timeseries/__init__.py | 0 app/api/v1/endpoints/timeseries/composite.py | 205 ++++++ .../v1/endpoints/timeseries/dependencies.py | 19 + app/api/v1/endpoints/timeseries/realtime.py | 126 ++++ app/api/v1/endpoints/timeseries/scada.py | 81 +++ app/api/v1/endpoints/timeseries/scheme.py | 199 ++++++ app/api/v1/router.py | 20 +- app/infra/db/postgresql/__init__.py | 2 +- app/infra/db/timescaledb/__init__.py | 1 - app/infra/db/timescaledb/router.py | 619 ------------------ 11 files changed, 648 insertions(+), 633 deletions(-) rename app/{infra/db/postgresql/router.py => api/v1/endpoints/project_data.py} (87%) create mode 100644 app/api/v1/endpoints/timeseries/__init__.py create mode 100644 app/api/v1/endpoints/timeseries/composite.py create mode 100644 app/api/v1/endpoints/timeseries/dependencies.py create mode 100644 app/api/v1/endpoints/timeseries/realtime.py create mode 100644 app/api/v1/endpoints/timeseries/scada.py create mode 100644 app/api/v1/endpoints/timeseries/scheme.py delete mode 100644 app/infra/db/timescaledb/router.py diff --git a/app/infra/db/postgresql/router.py b/app/api/v1/endpoints/project_data.py similarity index 87% rename from app/infra/db/postgresql/router.py rename to app/api/v1/endpoints/project_data.py index 5c569e6..e3ac917 100644 --- a/app/infra/db/postgresql/router.py +++ b/app/api/v1/endpoints/project_data.py @@ -1,14 +1,13 @@ from fastapi import APIRouter, Depends, HTTPException from psycopg import AsyncConnection -from .scada_info import ScadaRepository -from .scheme import SchemeRepository +from app.infra.db.postgresql.scada_info import ScadaRepository +from app.infra.db.postgresql.scheme import SchemeRepository from app.auth.project_dependencies import get_project_pg_connection router = APIRouter() -# 动态项目 PostgreSQL 连接依赖 async def get_database_connection( conn: AsyncConnection = Depends(get_project_pg_connection), ): @@ -23,7 +22,6 @@ async def get_scada_info_with_connection( 使用连接池查询所有SCADA信息 """ try: - # 使用ScadaRepository查询SCADA信息 scada_data = await ScadaRepository.get_scadas(conn) return {"success": True, "data": scada_data, "count": len(scada_data)} except Exception as e: @@ -40,7 +38,6 @@ async def get_scheme_list_with_connection( 使用连接池查询所有方案信息 """ try: - # 使用SchemeRepository查询方案信息 scheme_data = await SchemeRepository.get_schemes(conn) return {"success": True, "data": scheme_data, "count": len(scheme_data)} except Exception as e: @@ -55,7 +52,6 @@ async def get_burst_locate_result_with_connection( 使用连接池查询所有爆管定位结果 """ try: - # 使用SchemeRepository查询爆管定位结果 burst_data = await SchemeRepository.get_burst_locate_results(conn) return {"success": True, "data": burst_data, "count": len(burst_data)} except Exception as e: @@ -73,7 +69,6 @@ async def get_burst_locate_result_by_incident( 根据 burst_incident 查询爆管定位结果 """ try: - # 使用SchemeRepository查询爆管定位结果 return await SchemeRepository.get_burst_locate_result_by_incident( conn, burst_incident ) diff --git a/app/api/v1/endpoints/timeseries/__init__.py b/app/api/v1/endpoints/timeseries/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/api/v1/endpoints/timeseries/composite.py b/app/api/v1/endpoints/timeseries/composite.py new file mode 100644 index 0000000..116532b --- /dev/null +++ b/app/api/v1/endpoints/timeseries/composite.py @@ -0,0 +1,205 @@ +from fastapi import APIRouter, Depends, HTTPException, Query +from datetime import datetime +from psycopg import AsyncConnection + +from app.infra.db.timescaledb.composite_queries import CompositeQueries +from .dependencies import get_timescale_connection, get_postgres_connection + +router = APIRouter() + + +@router.get("/composite/scada-simulation") +async def get_scada_associated_simulation_data( + start_time: datetime, + end_time: datetime, + device_ids: str, + scheme_type: str = Query(None, description="指定方案名称,若为空则查询实时数据"), + scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), + timescale_conn: AsyncConnection = Depends(get_timescale_connection), + postgres_conn: AsyncConnection = Depends(get_postgres_connection), +): + """ + 获取 SCADA 关联的 link/node 模拟值 + + 根据传入的 SCADA device_ids,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + """ + try: + device_ids_list = ( + [id.strip() for id in device_ids.split(",") if id.strip()] + if device_ids + else [] + ) + + if scheme_type and scheme_name: + result = await CompositeQueries.get_scada_associated_scheme_simulation_data( + timescale_conn, + postgres_conn, + device_ids_list, + start_time, + end_time, + scheme_type, + scheme_name, + ) + else: + result = ( + await CompositeQueries.get_scada_associated_realtime_simulation_data( + timescale_conn, + postgres_conn, + device_ids_list, + start_time, + end_time, + ) + ) + if result is None: + raise HTTPException(status_code=404, detail="No simulation data found") + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/composite/element-simulation") +async def get_feature_simulation_data( + start_time: datetime, + end_time: datetime, + feature_infos: str = Query( + ..., description="特征信息,格式: id1:type1,id2:type2,type为pipe或junction" + ), + scheme_type: str = Query(None, description="指定方案类型,若为空则查询实时数据"), + scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), + timescale_conn: AsyncConnection = Depends(get_timescale_connection), +): + """ + 获取 link/node 模拟值 + + 根据传入的 featureInfos,找到关联的 link/node, + 并根据对应的 type,查询对应的模拟数据 + + Args: + feature_infos: 格式为 "element_id1:type1,element_id2:type2" + 例如: "P1:pipe,J1:junction" + """ + try: + feature_infos_list = [] + if feature_infos: + for item in feature_infos.split(","): + item = item.strip() + if ":" in item: + element_id, element_type = item.split(":", 1) + feature_infos_list.append( + (element_id.strip(), element_type.strip()) + ) + + if not feature_infos_list: + raise HTTPException(status_code=400, detail="feature_infos cannot be empty") + + if scheme_type and scheme_name: + result = await CompositeQueries.get_scheme_simulation_data( + timescale_conn, + feature_infos_list, + start_time, + end_time, + scheme_type, + scheme_name, + ) + else: + result = await CompositeQueries.get_realtime_simulation_data( + timescale_conn, + feature_infos_list, + start_time, + end_time, + ) + + if result is None: + raise HTTPException(status_code=404, detail="No simulation data found") + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/composite/element-scada") +async def get_element_associated_scada_data( + element_id: str, + start_time: datetime, + end_time: datetime, + use_cleaned: bool = Query(False, description="是否使用清洗后的数据"), + timescale_conn: AsyncConnection = Depends(get_timescale_connection), + postgres_conn: AsyncConnection = Depends(get_postgres_connection), +): + """ + 获取 link/node 关联的 SCADA 监测值 + + 根据传入的 link/node id,匹配 SCADA 信息, + 如果存在关联的 SCADA device_id,获取实际的监测数据 + """ + try: + result = await CompositeQueries.get_element_associated_scada_data( + timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned + ) + if result is None: + raise HTTPException( + status_code=404, detail="No associated SCADA data found" + ) + return result + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post("/composite/clean-scada") +async def clean_scada_data( + device_ids: str, + start_time: datetime = Query(...), + end_time: datetime = Query(...), + timescale_conn: AsyncConnection = Depends(get_timescale_connection), + postgres_conn: AsyncConnection = Depends(get_postgres_connection), +): + """ + 清洗 SCADA 数据 + + 根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value + """ + try: + if device_ids == "all": + device_ids_list = [] + else: + device_ids_list = ( + [id.strip() for id in device_ids.split(",") if id.strip()] + if device_ids + else [] + ) + return await CompositeQueries.clean_scada_data( + timescale_conn, postgres_conn, device_ids_list, start_time, end_time + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/composite/pipeline-health-prediction") +async def predict_pipeline_health( + query_time: datetime = Query(..., description="查询时间"), + network_name: str = Query(..., description="管网数据库名称"), + timescale_conn: AsyncConnection = Depends(get_timescale_connection), +): + """ + 预测管道健康状况 + + 根据管网名称和当前时间,查询管道信息和实时数据, + 使用随机生存森林模型预测管道的生存概率 + + Args: + query_time: 查询时间 + db_name: 管网数据库名称 + + Returns: + 预测结果列表,每个元素包含 link_id 和对应的生存函数 + """ + try: + return await CompositeQueries.predict_pipeline_health( + timescale_conn, network_name, query_time + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except FileNotFoundError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}") diff --git a/app/api/v1/endpoints/timeseries/dependencies.py b/app/api/v1/endpoints/timeseries/dependencies.py new file mode 100644 index 0000000..4e84a14 --- /dev/null +++ b/app/api/v1/endpoints/timeseries/dependencies.py @@ -0,0 +1,19 @@ +from fastapi import Depends +from psycopg import AsyncConnection + +from app.auth.project_dependencies import ( + get_project_pg_connection, + get_project_timescale_connection, +) + + +async def get_timescale_connection( + conn: AsyncConnection = Depends(get_project_timescale_connection), +): + yield conn + + +async def get_postgres_connection( + conn: AsyncConnection = Depends(get_project_pg_connection), +): + yield conn diff --git a/app/api/v1/endpoints/timeseries/realtime.py b/app/api/v1/endpoints/timeseries/realtime.py new file mode 100644 index 0000000..6f12c44 --- /dev/null +++ b/app/api/v1/endpoints/timeseries/realtime.py @@ -0,0 +1,126 @@ +from fastapi import APIRouter, Depends, HTTPException +from typing import List +from datetime import datetime +from psycopg import AsyncConnection + +from app.infra.db.timescaledb.repositories.realtime import RealtimeRepository +from .dependencies import get_timescale_connection + +router = APIRouter() + + +@router.post("/realtime/links/batch", status_code=201) +async def insert_realtime_links( + data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection) +): + await RealtimeRepository.insert_links_batch(conn, data) + return {"message": f"Inserted {len(data)} records"} + + +@router.get("/realtime/links") +async def get_realtime_links( + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time) + + +@router.delete("/realtime/links") +async def delete_realtime_links( + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + await RealtimeRepository.delete_links_by_time_range(conn, start_time, end_time) + return {"message": "Deleted successfully"} + + +@router.patch("/realtime/links/{link_id}/field") +async def update_realtime_link_field( + link_id: str, + time: datetime, + field: str, + value: float, + conn: AsyncConnection = Depends(get_timescale_connection), +): + try: + await RealtimeRepository.update_link_field(conn, time, link_id, field, value) + return {"message": "Updated successfully"} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.post("/realtime/nodes/batch", status_code=201) +async def insert_realtime_nodes( + data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection) +): + await RealtimeRepository.insert_nodes_batch(conn, data) + return {"message": f"Inserted {len(data)} records"} + + +@router.get("/realtime/nodes") +async def get_realtime_nodes( + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time) + + +@router.delete("/realtime/nodes") +async def delete_realtime_nodes( + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + await RealtimeRepository.delete_nodes_by_time_range(conn, start_time, end_time) + return {"message": "Deleted successfully"} + + +@router.post("/realtime/simulation/store", status_code=201) +async def store_realtime_simulation_result( + node_result_list: List[dict], + link_result_list: List[dict], + result_start_time: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + """Store realtime simulation results to TimescaleDB""" + await RealtimeRepository.store_realtime_simulation_result( + conn, node_result_list, link_result_list, result_start_time + ) + return {"message": "Simulation results stored successfully"} + + +@router.get("/realtime/query/by-time-property") +async def query_realtime_records_by_time_property( + query_time: str, + type: str, + property: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + """Query all realtime records by time and property""" + try: + results = await RealtimeRepository.query_all_record_by_time_property( + conn, query_time, type, property + ) + return {"results": results} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/realtime/query/by-id-time") +async def query_realtime_simulation_by_id_time( + id: str, + type: str, + query_time: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + """Query realtime simulation results by id and time""" + try: + results = await RealtimeRepository.query_simulation_result_by_id_time( + conn, id, type, query_time + ) + return {"result": results} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) diff --git a/app/api/v1/endpoints/timeseries/scada.py b/app/api/v1/endpoints/timeseries/scada.py new file mode 100644 index 0000000..e36759f --- /dev/null +++ b/app/api/v1/endpoints/timeseries/scada.py @@ -0,0 +1,81 @@ +from fastapi import APIRouter, Depends, HTTPException +from typing import List +from datetime import datetime +from psycopg import AsyncConnection + +from app.infra.db.timescaledb.repositories.scada import ScadaRepository +from .dependencies import get_timescale_connection + +router = APIRouter() + + +@router.post("/scada/batch", status_code=201) +async def insert_scada_data( + data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection) +): + await ScadaRepository.insert_scada_batch(conn, data) + return {"message": f"Inserted {len(data)} records"} + + +@router.get("/scada/by-ids-time-range") +async def get_scada_by_ids_time_range( + start_time: datetime, + end_time: datetime, + device_ids: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + device_ids_list = ( + [id.strip() for id in device_ids.split(",") if id.strip()] if device_ids else [] + ) + return await ScadaRepository.get_scada_by_ids_time_range( + conn, device_ids_list, start_time, end_time + ) + + +@router.get("/scada/by-ids-field-time-range") +async def get_scada_field_by_ids_time_range( + start_time: datetime, + end_time: datetime, + field: str, + device_ids: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + try: + device_ids_list = ( + [id.strip() for id in device_ids.split(",") if id.strip()] + if device_ids + else [] + ) + return await ScadaRepository.get_scada_field_by_id_time_range( + conn, device_ids_list, start_time, end_time, field + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.patch("/scada/{device_id}/field") +async def update_scada_field( + device_id: str, + time: datetime, + field: str, + value: float, + conn: AsyncConnection = Depends(get_timescale_connection), +): + try: + await ScadaRepository.update_scada_field(conn, time, device_id, field, value) + return {"message": "Updated successfully"} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.delete("/scada/by-id-time-range") +async def delete_scada_data( + device_id: str, + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + await ScadaRepository.delete_scada_by_id_time_range( + conn, device_id, start_time, end_time + ) + return {"message": "Deleted successfully"} diff --git a/app/api/v1/endpoints/timeseries/scheme.py b/app/api/v1/endpoints/timeseries/scheme.py new file mode 100644 index 0000000..307f7f1 --- /dev/null +++ b/app/api/v1/endpoints/timeseries/scheme.py @@ -0,0 +1,199 @@ +from fastapi import APIRouter, Depends, HTTPException +from typing import List +from datetime import datetime +from psycopg import AsyncConnection + +from app.infra.db.timescaledb.repositories.scheme import SchemeRepository +from .dependencies import get_timescale_connection + +router = APIRouter() + + +@router.post("/scheme/links/batch", status_code=201) +async def insert_scheme_links( + data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection) +): + await SchemeRepository.insert_links_batch(conn, data) + return {"message": f"Inserted {len(data)} records"} + + +@router.get("/scheme/links") +async def get_scheme_links( + scheme_type: str, + scheme_name: str, + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + return await SchemeRepository.get_links_by_scheme_and_time_range( + conn, scheme_type, scheme_name, start_time, end_time + ) + + +@router.get("/scheme/links/{link_id}/field") +async def get_scheme_link_field( + scheme_type: str, + scheme_name: str, + link_id: str, + start_time: datetime, + end_time: datetime, + field: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + try: + return await SchemeRepository.get_link_field_by_scheme_and_time_range( + conn, scheme_type, scheme_name, start_time, end_time, link_id, field + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.patch("/scheme/links/{link_id}/field") +async def update_scheme_link_field( + scheme_type: str, + scheme_name: str, + link_id: str, + time: datetime, + field: str, + value: float, + conn: AsyncConnection = Depends(get_timescale_connection), +): + try: + await SchemeRepository.update_link_field( + conn, time, scheme_type, scheme_name, link_id, field, value + ) + return {"message": "Updated successfully"} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.delete("/scheme/links") +async def delete_scheme_links( + scheme_type: str, + scheme_name: str, + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + await SchemeRepository.delete_links_by_scheme_and_time_range( + conn, scheme_type, scheme_name, start_time, end_time + ) + return {"message": "Deleted successfully"} + + +@router.post("/scheme/nodes/batch", status_code=201) +async def insert_scheme_nodes( + data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection) +): + await SchemeRepository.insert_nodes_batch(conn, data) + return {"message": f"Inserted {len(data)} records"} + + +@router.get("/scheme/nodes/{node_id}/field") +async def get_scheme_node_field( + scheme_type: str, + scheme_name: str, + node_id: str, + start_time: datetime, + end_time: datetime, + field: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + try: + return await SchemeRepository.get_node_field_by_scheme_and_time_range( + conn, scheme_type, scheme_name, start_time, end_time, node_id, field + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.patch("/scheme/nodes/{node_id}/field") +async def update_scheme_node_field( + scheme_type: str, + scheme_name: str, + node_id: str, + time: datetime, + field: str, + value: float, + conn: AsyncConnection = Depends(get_timescale_connection), +): + try: + await SchemeRepository.update_node_field( + conn, time, scheme_type, scheme_name, node_id, field, value + ) + return {"message": "Updated successfully"} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.delete("/scheme/nodes") +async def delete_scheme_nodes( + scheme_type: str, + scheme_name: str, + start_time: datetime, + end_time: datetime, + conn: AsyncConnection = Depends(get_timescale_connection), +): + await SchemeRepository.delete_nodes_by_scheme_and_time_range( + conn, scheme_type, scheme_name, start_time, end_time + ) + return {"message": "Deleted successfully"} + + +@router.post("/scheme/simulation/store", status_code=201) +async def store_scheme_simulation_result( + scheme_type: str, + scheme_name: str, + node_result_list: List[dict], + link_result_list: List[dict], + result_start_time: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + """Store scheme simulation results to TimescaleDB""" + await SchemeRepository.store_scheme_simulation_result( + conn, + scheme_type, + scheme_name, + node_result_list, + link_result_list, + result_start_time, + ) + return {"message": "Scheme simulation results stored successfully"} + + +@router.get("/scheme/query/by-scheme-time-property") +async def query_scheme_records_by_scheme_time_property( + scheme_type: str, + scheme_name: str, + query_time: str, + type: str, + property: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + """Query all scheme records by scheme, time and property""" + try: + results = await SchemeRepository.query_all_record_by_scheme_time_property( + conn, scheme_type, scheme_name, query_time, type, property + ) + return {"results": results} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/scheme/query/by-id-time") +async def query_scheme_simulation_by_id_time( + scheme_type: str, + scheme_name: str, + id: str, + type: str, + query_time: str, + conn: AsyncConnection = Depends(get_timescale_connection), +): + """Query scheme simulation results by id and time""" + try: + result = await SchemeRepository.query_scheme_simulation_result_by_id_time( + conn, scheme_type, scheme_name, id, type, query_time + ) + return {"result": result} + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) diff --git a/app/api/v1/router.py b/app/api/v1/router.py index 42a0d0d..2768142 100644 --- a/app/api/v1/router.py +++ b/app/api/v1/router.py @@ -40,8 +40,13 @@ from app.api.v1.endpoints.components import ( visuals, ) -from app.infra.db.postgresql import router as postgresql_router -from app.infra.db.timescaledb import router as timescaledb_router +from app.api.v1.endpoints import project_data +from app.api.v1.endpoints.timeseries import ( + realtime as ts_realtime, + scheme as ts_scheme, + scada as ts_scada, + composite as ts_composite, +) api_router = APIRouter() @@ -90,9 +95,14 @@ api_router.include_router( burst_location.router, prefix="/burst-location", tags=["Burst Location"] ) -# Database Routers -api_router.include_router(timescaledb_router, tags=["TimescaleDB"]) -api_router.include_router(postgresql_router, tags=["PostgreSQL"]) +# TimescaleDB Data Access +api_router.include_router(ts_realtime.router, tags=["TimescaleDB - Realtime"]) +api_router.include_router(ts_scheme.router, tags=["TimescaleDB - Scheme"]) +api_router.include_router(ts_scada.router, tags=["TimescaleDB - SCADA"]) +api_router.include_router(ts_composite.router, tags=["TimescaleDB - Composite"]) + +# Project Data (PostgreSQL) +api_router.include_router(project_data.router, tags=["Project Data"]) # Extension api_router.include_router(extension.router, tags=["Extension"]) diff --git a/app/infra/db/postgresql/__init__.py b/app/infra/db/postgresql/__init__.py index 2378043..8b13789 100644 --- a/app/infra/db/postgresql/__init__.py +++ b/app/infra/db/postgresql/__init__.py @@ -1 +1 @@ -from .router import router + diff --git a/app/infra/db/timescaledb/__init__.py b/app/infra/db/timescaledb/__init__.py index df7b7e3..7e1e6c2 100644 --- a/app/infra/db/timescaledb/__init__.py +++ b/app/infra/db/timescaledb/__init__.py @@ -1,4 +1,3 @@ -from .router import router from .database import * from .timescaledb_info import * from .composite_queries import CompositeQueries \ No newline at end of file diff --git a/app/infra/db/timescaledb/router.py b/app/infra/db/timescaledb/router.py deleted file mode 100644 index f7e80d7..0000000 --- a/app/infra/db/timescaledb/router.py +++ /dev/null @@ -1,619 +0,0 @@ -from fastapi import APIRouter, Depends, HTTPException, Query -from typing import List -from datetime import datetime -from psycopg import AsyncConnection - -from .repositories.realtime import RealtimeRepository -from .repositories.scheme import SchemeRepository -from .repositories.scada import ScadaRepository -from .composite_queries import CompositeQueries -from app.auth.project_dependencies import ( - get_project_pg_connection, - get_project_timescale_connection, -) - -router = APIRouter() - - -# 动态项目 TimescaleDB 连接依赖 -async def get_database_connection( - conn: AsyncConnection = Depends(get_project_timescale_connection), -): - yield conn - - -# 动态项目 PostgreSQL 连接依赖 -async def get_postgres_connection( - conn: AsyncConnection = Depends(get_project_pg_connection), -): - yield conn - - -# --- Realtime Endpoints --- - - -@router.post("/realtime/links/batch", status_code=201) -async def insert_realtime_links( - data: List[dict], conn: AsyncConnection = Depends(get_database_connection) -): - await RealtimeRepository.insert_links_batch(conn, data) - return {"message": f"Inserted {len(data)} records"} - - -@router.get("/realtime/links") -async def get_realtime_links( - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time) - - -@router.delete("/realtime/links") -async def delete_realtime_links( - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - await RealtimeRepository.delete_links_by_time_range(conn, start_time, end_time) - return {"message": "Deleted successfully"} - - -@router.patch("/realtime/links/{link_id}/field") -async def update_realtime_link_field( - link_id: str, - time: datetime, - field: str, - value: float, # Assuming float for now, could be Any but FastAPI needs type - conn: AsyncConnection = Depends(get_database_connection), -): - try: - await RealtimeRepository.update_link_field(conn, time, link_id, field, value) - return {"message": "Updated successfully"} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.post("/realtime/nodes/batch", status_code=201) -async def insert_realtime_nodes( - data: List[dict], conn: AsyncConnection = Depends(get_database_connection) -): - await RealtimeRepository.insert_nodes_batch(conn, data) - return {"message": f"Inserted {len(data)} records"} - - -@router.get("/realtime/nodes") -async def get_realtime_nodes( - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time) - - -@router.delete("/realtime/nodes") -async def delete_realtime_nodes( - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - await RealtimeRepository.delete_nodes_by_time_range(conn, start_time, end_time) - return {"message": "Deleted successfully"} - - -@router.post("/realtime/simulation/store", status_code=201) -async def store_realtime_simulation_result( - node_result_list: List[dict], - link_result_list: List[dict], - result_start_time: str, - conn: AsyncConnection = Depends(get_database_connection), -): - """Store realtime simulation results to TimescaleDB""" - await RealtimeRepository.store_realtime_simulation_result( - conn, node_result_list, link_result_list, result_start_time - ) - return {"message": "Simulation results stored successfully"} - - -@router.get("/realtime/query/by-time-property") -async def query_realtime_records_by_time_property( - query_time: str, - type: str, - property: str, - conn: AsyncConnection = Depends(get_database_connection), -): - """Query all realtime records by time and property""" - try: - results = await RealtimeRepository.query_all_record_by_time_property( - conn, query_time, type, property - ) - return {"results": results} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.get("/realtime/query/by-id-time") -async def query_realtime_simulation_by_id_time( - id: str, - type: str, - query_time: str, - conn: AsyncConnection = Depends(get_database_connection), -): - """Query realtime simulation results by id and time""" - try: - results = await RealtimeRepository.query_simulation_result_by_id_time( - conn, id, type, query_time - ) - return {"result": results} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -# --- Scheme Endpoints --- - - -@router.post("/scheme/links/batch", status_code=201) -async def insert_scheme_links( - data: List[dict], conn: AsyncConnection = Depends(get_database_connection) -): - await SchemeRepository.insert_links_batch(conn, data) - return {"message": f"Inserted {len(data)} records"} - - -@router.get("/scheme/links") -async def get_scheme_links( - scheme_type: str, - scheme_name: str, - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - return await SchemeRepository.get_links_by_scheme_and_time_range( - conn, scheme_type, scheme_name, start_time, end_time - ) - - -@router.get("/scheme/links/{link_id}/field") -async def get_scheme_link_field( - scheme_type: str, - scheme_name: str, - link_id: str, - start_time: datetime, - end_time: datetime, - field: str, - conn: AsyncConnection = Depends(get_database_connection), -): - try: - return await SchemeRepository.get_link_field_by_scheme_and_time_range( - conn, scheme_type, scheme_name, start_time, end_time, link_id, field - ) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.patch("/scheme/links/{link_id}/field") -async def update_scheme_link_field( - scheme_type: str, - scheme_name: str, - link_id: str, - time: datetime, - field: str, - value: float, - conn: AsyncConnection = Depends(get_database_connection), -): - try: - await SchemeRepository.update_link_field( - conn, time, scheme_type, scheme_name, link_id, field, value - ) - return {"message": "Updated successfully"} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.delete("/scheme/links") -async def delete_scheme_links( - scheme_type: str, - scheme_name: str, - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - await SchemeRepository.delete_links_by_scheme_and_time_range( - conn, scheme_type, scheme_name, start_time, end_time - ) - return {"message": "Deleted successfully"} - - -@router.post("/scheme/nodes/batch", status_code=201) -async def insert_scheme_nodes( - data: List[dict], conn: AsyncConnection = Depends(get_database_connection) -): - await SchemeRepository.insert_nodes_batch(conn, data) - return {"message": f"Inserted {len(data)} records"} - - -@router.get("/scheme/nodes/{node_id}/field") -async def get_scheme_node_field( - scheme_type: str, - scheme_name: str, - node_id: str, - start_time: datetime, - end_time: datetime, - field: str, - conn: AsyncConnection = Depends(get_database_connection), -): - try: - return await SchemeRepository.get_node_field_by_scheme_and_time_range( - conn, scheme_type, scheme_name, start_time, end_time, node_id, field - ) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.patch("/scheme/nodes/{node_id}/field") -async def update_scheme_node_field( - scheme_type: str, - scheme_name: str, - node_id: str, - time: datetime, - field: str, - value: float, - conn: AsyncConnection = Depends(get_database_connection), -): - try: - await SchemeRepository.update_node_field( - conn, time, scheme_type, scheme_name, node_id, field, value - ) - return {"message": "Updated successfully"} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.delete("/scheme/nodes") -async def delete_scheme_nodes( - scheme_type: str, - scheme_name: str, - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - await SchemeRepository.delete_nodes_by_scheme_and_time_range( - conn, scheme_type, scheme_name, start_time, end_time - ) - return {"message": "Deleted successfully"} - - -@router.post("/scheme/simulation/store", status_code=201) -async def store_scheme_simulation_result( - scheme_type: str, - scheme_name: str, - node_result_list: List[dict], - link_result_list: List[dict], - result_start_time: str, - conn: AsyncConnection = Depends(get_database_connection), -): - """Store scheme simulation results to TimescaleDB""" - await SchemeRepository.store_scheme_simulation_result( - conn, - scheme_type, - scheme_name, - node_result_list, - link_result_list, - result_start_time, - ) - return {"message": "Scheme simulation results stored successfully"} - - -@router.get("/scheme/query/by-scheme-time-property") -async def query_scheme_records_by_scheme_time_property( - scheme_type: str, - scheme_name: str, - query_time: str, - type: str, - property: str, - conn: AsyncConnection = Depends(get_database_connection), -): - """Query all scheme records by scheme, time and property""" - try: - results = await SchemeRepository.query_all_record_by_scheme_time_property( - conn, scheme_type, scheme_name, query_time, type, property - ) - return {"results": results} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.get("/scheme/query/by-id-time") -async def query_scheme_simulation_by_id_time( - scheme_type: str, - scheme_name: str, - id: str, - type: str, - query_time: str, - conn: AsyncConnection = Depends(get_database_connection), -): - """Query scheme simulation results by id and time""" - try: - result = await SchemeRepository.query_scheme_simulation_result_by_id_time( - conn, scheme_type, scheme_name, id, type, query_time - ) - return {"result": result} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -# --- SCADA Endpoints --- - - -@router.post("/scada/batch", status_code=201) -async def insert_scada_data( - data: List[dict], conn: AsyncConnection = Depends(get_database_connection) -): - await ScadaRepository.insert_scada_batch(conn, data) - return {"message": f"Inserted {len(data)} records"} - - -@router.get("/scada/by-ids-time-range") -async def get_scada_by_ids_time_range( - start_time: datetime, - end_time: datetime, - device_ids: str, - conn: AsyncConnection = Depends(get_database_connection), -): - device_ids_list = ( - [id.strip() for id in device_ids.split(",") if id.strip()] if device_ids else [] - ) - return await ScadaRepository.get_scada_by_ids_time_range( - conn, device_ids_list, start_time, end_time - ) - - -@router.get("/scada/by-ids-field-time-range") -async def get_scada_field_by_ids_time_range( - start_time: datetime, - end_time: datetime, - field: str, - device_ids: str, - conn: AsyncConnection = Depends(get_database_connection), -): - try: - device_ids_list = ( - [id.strip() for id in device_ids.split(",") if id.strip()] - if device_ids - else [] - ) - return await ScadaRepository.get_scada_field_by_id_time_range( - conn, device_ids_list, start_time, end_time, field - ) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.patch("/scada/{device_id}/field") -async def update_scada_field( - device_id: str, - time: datetime, - field: str, - value: float, - conn: AsyncConnection = Depends(get_database_connection), -): - try: - await ScadaRepository.update_scada_field(conn, time, device_id, field, value) - return {"message": "Updated successfully"} - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.delete("/scada/by-id-time-range") -async def delete_scada_data( - device_id: str, - start_time: datetime, - end_time: datetime, - conn: AsyncConnection = Depends(get_database_connection), -): - await ScadaRepository.delete_scada_by_id_time_range( - conn, device_id, start_time, end_time - ) - return {"message": "Deleted successfully"} - - -# --- Composite Query Endpoints --- - - -@router.get("/composite/scada-simulation") -async def get_scada_associated_simulation_data( - start_time: datetime, - end_time: datetime, - device_ids: str, - scheme_type: str = Query(None, description="指定方案名称,若为空则查询实时数据"), - scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), - timescale_conn: AsyncConnection = Depends(get_database_connection), - postgres_conn: AsyncConnection = Depends(get_postgres_connection), -): - """ - 获取 SCADA 关联的 link/node 模拟值 - - 根据传入的 SCADA device_ids,找到关联的 link/node, - 并根据对应的 type,查询对应的模拟数据 - """ - try: - # 手动解析 device_ids 为 List[str],去除空格 - device_ids_list = ( - [id.strip() for id in device_ids.split(",") if id.strip()] - if device_ids - else [] - ) - - if scheme_type and scheme_name: - result = await CompositeQueries.get_scada_associated_scheme_simulation_data( - timescale_conn, - postgres_conn, - device_ids_list, # 使用解析后的列表 - start_time, - end_time, - scheme_type, - scheme_name, - ) - else: - result = ( - await CompositeQueries.get_scada_associated_realtime_simulation_data( - timescale_conn, - postgres_conn, - device_ids_list, # 使用解析后的列表 - start_time, - end_time, - ) - ) - if result is None: - raise HTTPException(status_code=404, detail="No simulation data found") - return result - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.get("/composite/element-simulation") -async def get_feature_simulation_data( - start_time: datetime, - end_time: datetime, - feature_infos: str = Query( - ..., description="特征信息,格式: id1:type1,id2:type2,type为pipe或junction" - ), - scheme_type: str = Query(None, description="指定方案类型,若为空则查询实时数据"), - scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"), - timescale_conn: AsyncConnection = Depends(get_database_connection), -): - """ - 获取 link/node 模拟值 - - 根据传入的 featureInfos,找到关联的 link/node, - 并根据对应的 type,查询对应的模拟数据 - - Args: - feature_infos: 格式为 "element_id1:type1,element_id2:type2" - 例如: "P1:pipe,J1:junction" - """ - try: - # 解析 feature_infos 为 List[Tuple[str, str]] - feature_infos_list = [] - if feature_infos: - for item in feature_infos.split(","): - item = item.strip() - if ":" in item: - element_id, element_type = item.split(":", 1) - feature_infos_list.append( - (element_id.strip(), element_type.strip()) - ) - - if not feature_infos_list: - raise HTTPException(status_code=400, detail="feature_infos cannot be empty") - - if scheme_type and scheme_name: - result = await CompositeQueries.get_scheme_simulation_data( - timescale_conn, - feature_infos_list, - start_time, - end_time, - scheme_type, - scheme_name, - ) - else: - result = await CompositeQueries.get_realtime_simulation_data( - timescale_conn, - feature_infos_list, - start_time, - end_time, - ) - - if result is None: - raise HTTPException(status_code=404, detail="No simulation data found") - return result - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.get("/composite/element-scada") -async def get_element_associated_scada_data( - element_id: str, - start_time: datetime, - end_time: datetime, - use_cleaned: bool = Query(False, description="是否使用清洗后的数据"), - timescale_conn: AsyncConnection = Depends(get_database_connection), - postgres_conn: AsyncConnection = Depends(get_postgres_connection), -): - """ - 获取 link/node 关联的 SCADA 监测值 - - 根据传入的 link/node id,匹配 SCADA 信息, - 如果存在关联的 SCADA device_id,获取实际的监测数据 - """ - try: - result = await CompositeQueries.get_element_associated_scada_data( - timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned - ) - if result is None: - raise HTTPException( - status_code=404, detail="No associated SCADA data found" - ) - return result - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.post("/composite/clean-scada") -async def clean_scada_data( - device_ids: str, - start_time: datetime = Query(...), - end_time: datetime = Query(...), - timescale_conn: AsyncConnection = Depends(get_database_connection), - postgres_conn: AsyncConnection = Depends(get_postgres_connection), -): - """ - 清洗 SCADA 数据 - - 根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value - """ - try: - if device_ids == "all": - device_ids_list = [] - else: - device_ids_list = ( - [id.strip() for id in device_ids.split(",") if id.strip()] - if device_ids - else [] - ) - return await CompositeQueries.clean_scada_data( - timescale_conn, postgres_conn, device_ids_list, start_time, end_time - ) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - -@router.get("/composite/pipeline-health-prediction") -async def predict_pipeline_health( - query_time: datetime = Query(..., description="查询时间"), - network_name: str = Query(..., description="管网数据库名称"), - timescale_conn: AsyncConnection = Depends(get_database_connection), -): - """ - 预测管道健康状况 - - 根据管网名称和当前时间,查询管道信息和实时数据, - 使用随机生存森林模型预测管道的生存概率 - - Args: - query_time: 查询时间 - db_name: 管网数据库名称 - - Returns: - 预测结果列表,每个元素包含 link_id 和对应的生存函数 - """ - try: - return await CompositeQueries.predict_pipeline_health( - timescale_conn, network_name, query_time - ) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - except FileNotFoundError as e: - raise HTTPException(status_code=404, detail=str(e)) - except Exception as e: - raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")