app/infra/db中 router 迁移并更新,清理 infra 层的旧 router

This commit is contained in:
2026-03-09 18:20:46 +08:00
parent 7345210bdd
commit 559d5bb8e3
11 changed files with 648 additions and 633 deletions
@@ -1,14 +1,13 @@
from fastapi import APIRouter, Depends, HTTPException
from psycopg import AsyncConnection
from .scada_info import ScadaRepository
from .scheme import SchemeRepository
from app.infra.db.postgresql.scada_info import ScadaRepository
from app.infra.db.postgresql.scheme import SchemeRepository
from app.auth.project_dependencies import get_project_pg_connection
router = APIRouter()
# 动态项目 PostgreSQL 连接依赖
async def get_database_connection(
conn: AsyncConnection = Depends(get_project_pg_connection),
):
@@ -23,7 +22,6 @@ async def get_scada_info_with_connection(
使用连接池查询所有SCADA信息
"""
try:
# 使用ScadaRepository查询SCADA信息
scada_data = await ScadaRepository.get_scadas(conn)
return {"success": True, "data": scada_data, "count": len(scada_data)}
except Exception as e:
@@ -40,7 +38,6 @@ async def get_scheme_list_with_connection(
使用连接池查询所有方案信息
"""
try:
# 使用SchemeRepository查询方案信息
scheme_data = await SchemeRepository.get_schemes(conn)
return {"success": True, "data": scheme_data, "count": len(scheme_data)}
except Exception as e:
@@ -55,7 +52,6 @@ async def get_burst_locate_result_with_connection(
使用连接池查询所有爆管定位结果
"""
try:
# 使用SchemeRepository查询爆管定位结果
burst_data = await SchemeRepository.get_burst_locate_results(conn)
return {"success": True, "data": burst_data, "count": len(burst_data)}
except Exception as e:
@@ -73,7 +69,6 @@ async def get_burst_locate_result_by_incident(
根据 burst_incident 查询爆管定位结果
"""
try:
# 使用SchemeRepository查询爆管定位结果
return await SchemeRepository.get_burst_locate_result_by_incident(
conn, burst_incident
)
@@ -0,0 +1,205 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from datetime import datetime
from psycopg import AsyncConnection
from app.infra.db.timescaledb.composite_queries import CompositeQueries
from .dependencies import get_timescale_connection, get_postgres_connection
router = APIRouter()
@router.get("/composite/scada-simulation")
async def get_scada_associated_simulation_data(
start_time: datetime,
end_time: datetime,
device_ids: str,
scheme_type: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
获取 SCADA 关联的 link/node 模拟值
根据传入的 SCADA device_ids,找到关联的 link/node
并根据对应的 type,查询对应的模拟数据
"""
try:
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
if scheme_type and scheme_name:
result = await CompositeQueries.get_scada_associated_scheme_simulation_data(
timescale_conn,
postgres_conn,
device_ids_list,
start_time,
end_time,
scheme_type,
scheme_name,
)
else:
result = (
await CompositeQueries.get_scada_associated_realtime_simulation_data(
timescale_conn,
postgres_conn,
device_ids_list,
start_time,
end_time,
)
)
if result is None:
raise HTTPException(status_code=404, detail="No simulation data found")
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/element-simulation")
async def get_feature_simulation_data(
start_time: datetime,
end_time: datetime,
feature_infos: str = Query(
..., description="特征信息,格式: id1:type1,id2:type2type为pipe或junction"
),
scheme_type: str = Query(None, description="指定方案类型,若为空则查询实时数据"),
scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
获取 link/node 模拟值
根据传入的 featureInfos,找到关联的 link/node
并根据对应的 type,查询对应的模拟数据
Args:
feature_infos: 格式为 "element_id1:type1,element_id2:type2"
例如: "P1:pipe,J1:junction"
"""
try:
feature_infos_list = []
if feature_infos:
for item in feature_infos.split(","):
item = item.strip()
if ":" in item:
element_id, element_type = item.split(":", 1)
feature_infos_list.append(
(element_id.strip(), element_type.strip())
)
if not feature_infos_list:
raise HTTPException(status_code=400, detail="feature_infos cannot be empty")
if scheme_type and scheme_name:
result = await CompositeQueries.get_scheme_simulation_data(
timescale_conn,
feature_infos_list,
start_time,
end_time,
scheme_type,
scheme_name,
)
else:
result = await CompositeQueries.get_realtime_simulation_data(
timescale_conn,
feature_infos_list,
start_time,
end_time,
)
if result is None:
raise HTTPException(status_code=404, detail="No simulation data found")
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/element-scada")
async def get_element_associated_scada_data(
element_id: str,
start_time: datetime,
end_time: datetime,
use_cleaned: bool = Query(False, description="是否使用清洗后的数据"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
获取 link/node 关联的 SCADA 监测值
根据传入的 link/node id,匹配 SCADA 信息,
如果存在关联的 SCADA device_id,获取实际的监测数据
"""
try:
result = await CompositeQueries.get_element_associated_scada_data(
timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned
)
if result is None:
raise HTTPException(
status_code=404, detail="No associated SCADA data found"
)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/composite/clean-scada")
async def clean_scada_data(
device_ids: str,
start_time: datetime = Query(...),
end_time: datetime = Query(...),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
清洗 SCADA 数据
根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value
"""
try:
if device_ids == "all":
device_ids_list = []
else:
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
return await CompositeQueries.clean_scada_data(
timescale_conn, postgres_conn, device_ids_list, start_time, end_time
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/pipeline-health-prediction")
async def predict_pipeline_health(
query_time: datetime = Query(..., description="查询时间"),
network_name: str = Query(..., description="管网数据库名称"),
timescale_conn: AsyncConnection = Depends(get_timescale_connection),
):
"""
预测管道健康状况
根据管网名称和当前时间,查询管道信息和实时数据,
使用随机生存森林模型预测管道的生存概率
Args:
query_time: 查询时间
db_name: 管网数据库名称
Returns:
预测结果列表,每个元素包含 link_id 和对应的生存函数
"""
try:
return await CompositeQueries.predict_pipeline_health(
timescale_conn, network_name, query_time
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")
@@ -0,0 +1,19 @@
from fastapi import Depends
from psycopg import AsyncConnection
from app.auth.project_dependencies import (
get_project_pg_connection,
get_project_timescale_connection,
)
async def get_timescale_connection(
conn: AsyncConnection = Depends(get_project_timescale_connection),
):
yield conn
async def get_postgres_connection(
conn: AsyncConnection = Depends(get_project_pg_connection),
):
yield conn
+126
View File
@@ -0,0 +1,126 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from datetime import datetime
from psycopg import AsyncConnection
from app.infra.db.timescaledb.repositories.realtime import RealtimeRepository
from .dependencies import get_timescale_connection
router = APIRouter()
@router.post("/realtime/links/batch", status_code=201)
async def insert_realtime_links(
data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection)
):
await RealtimeRepository.insert_links_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/realtime/links")
async def get_realtime_links(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time)
@router.delete("/realtime/links")
async def delete_realtime_links(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
await RealtimeRepository.delete_links_by_time_range(conn, start_time, end_time)
return {"message": "Deleted successfully"}
@router.patch("/realtime/links/{link_id}/field")
async def update_realtime_link_field(
link_id: str,
time: datetime,
field: str,
value: float,
conn: AsyncConnection = Depends(get_timescale_connection),
):
try:
await RealtimeRepository.update_link_field(conn, time, link_id, field, value)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/realtime/nodes/batch", status_code=201)
async def insert_realtime_nodes(
data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection)
):
await RealtimeRepository.insert_nodes_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/realtime/nodes")
async def get_realtime_nodes(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time)
@router.delete("/realtime/nodes")
async def delete_realtime_nodes(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
await RealtimeRepository.delete_nodes_by_time_range(conn, start_time, end_time)
return {"message": "Deleted successfully"}
@router.post("/realtime/simulation/store", status_code=201)
async def store_realtime_simulation_result(
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""Store realtime simulation results to TimescaleDB"""
await RealtimeRepository.store_realtime_simulation_result(
conn, node_result_list, link_result_list, result_start_time
)
return {"message": "Simulation results stored successfully"}
@router.get("/realtime/query/by-time-property")
async def query_realtime_records_by_time_property(
query_time: str,
type: str,
property: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""Query all realtime records by time and property"""
try:
results = await RealtimeRepository.query_all_record_by_time_property(
conn, query_time, type, property
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/realtime/query/by-id-time")
async def query_realtime_simulation_by_id_time(
id: str,
type: str,
query_time: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""Query realtime simulation results by id and time"""
try:
results = await RealtimeRepository.query_simulation_result_by_id_time(
conn, id, type, query_time
)
return {"result": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
+81
View File
@@ -0,0 +1,81 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from datetime import datetime
from psycopg import AsyncConnection
from app.infra.db.timescaledb.repositories.scada import ScadaRepository
from .dependencies import get_timescale_connection
router = APIRouter()
@router.post("/scada/batch", status_code=201)
async def insert_scada_data(
data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection)
):
await ScadaRepository.insert_scada_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/scada/by-ids-time-range")
async def get_scada_by_ids_time_range(
start_time: datetime,
end_time: datetime,
device_ids: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()] if device_ids else []
)
return await ScadaRepository.get_scada_by_ids_time_range(
conn, device_ids_list, start_time, end_time
)
@router.get("/scada/by-ids-field-time-range")
async def get_scada_field_by_ids_time_range(
start_time: datetime,
end_time: datetime,
field: str,
device_ids: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
try:
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
return await ScadaRepository.get_scada_field_by_id_time_range(
conn, device_ids_list, start_time, end_time, field
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.patch("/scada/{device_id}/field")
async def update_scada_field(
device_id: str,
time: datetime,
field: str,
value: float,
conn: AsyncConnection = Depends(get_timescale_connection),
):
try:
await ScadaRepository.update_scada_field(conn, time, device_id, field, value)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/scada/by-id-time-range")
async def delete_scada_data(
device_id: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
await ScadaRepository.delete_scada_by_id_time_range(
conn, device_id, start_time, end_time
)
return {"message": "Deleted successfully"}
+199
View File
@@ -0,0 +1,199 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from datetime import datetime
from psycopg import AsyncConnection
from app.infra.db.timescaledb.repositories.scheme import SchemeRepository
from .dependencies import get_timescale_connection
router = APIRouter()
@router.post("/scheme/links/batch", status_code=201)
async def insert_scheme_links(
data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection)
):
await SchemeRepository.insert_links_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/scheme/links")
async def get_scheme_links(
scheme_type: str,
scheme_name: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
return await SchemeRepository.get_links_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time
)
@router.get("/scheme/links/{link_id}/field")
async def get_scheme_link_field(
scheme_type: str,
scheme_name: str,
link_id: str,
start_time: datetime,
end_time: datetime,
field: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
try:
return await SchemeRepository.get_link_field_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, link_id, field
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.patch("/scheme/links/{link_id}/field")
async def update_scheme_link_field(
scheme_type: str,
scheme_name: str,
link_id: str,
time: datetime,
field: str,
value: float,
conn: AsyncConnection = Depends(get_timescale_connection),
):
try:
await SchemeRepository.update_link_field(
conn, time, scheme_type, scheme_name, link_id, field, value
)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/scheme/links")
async def delete_scheme_links(
scheme_type: str,
scheme_name: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
await SchemeRepository.delete_links_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time
)
return {"message": "Deleted successfully"}
@router.post("/scheme/nodes/batch", status_code=201)
async def insert_scheme_nodes(
data: List[dict], conn: AsyncConnection = Depends(get_timescale_connection)
):
await SchemeRepository.insert_nodes_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/scheme/nodes/{node_id}/field")
async def get_scheme_node_field(
scheme_type: str,
scheme_name: str,
node_id: str,
start_time: datetime,
end_time: datetime,
field: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
try:
return await SchemeRepository.get_node_field_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, node_id, field
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.patch("/scheme/nodes/{node_id}/field")
async def update_scheme_node_field(
scheme_type: str,
scheme_name: str,
node_id: str,
time: datetime,
field: str,
value: float,
conn: AsyncConnection = Depends(get_timescale_connection),
):
try:
await SchemeRepository.update_node_field(
conn, time, scheme_type, scheme_name, node_id, field, value
)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/scheme/nodes")
async def delete_scheme_nodes(
scheme_type: str,
scheme_name: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_timescale_connection),
):
await SchemeRepository.delete_nodes_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time
)
return {"message": "Deleted successfully"}
@router.post("/scheme/simulation/store", status_code=201)
async def store_scheme_simulation_result(
scheme_type: str,
scheme_name: str,
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""Store scheme simulation results to TimescaleDB"""
await SchemeRepository.store_scheme_simulation_result(
conn,
scheme_type,
scheme_name,
node_result_list,
link_result_list,
result_start_time,
)
return {"message": "Scheme simulation results stored successfully"}
@router.get("/scheme/query/by-scheme-time-property")
async def query_scheme_records_by_scheme_time_property(
scheme_type: str,
scheme_name: str,
query_time: str,
type: str,
property: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""Query all scheme records by scheme, time and property"""
try:
results = await SchemeRepository.query_all_record_by_scheme_time_property(
conn, scheme_type, scheme_name, query_time, type, property
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/scheme/query/by-id-time")
async def query_scheme_simulation_by_id_time(
scheme_type: str,
scheme_name: str,
id: str,
type: str,
query_time: str,
conn: AsyncConnection = Depends(get_timescale_connection),
):
"""Query scheme simulation results by id and time"""
try:
result = await SchemeRepository.query_scheme_simulation_result_by_id_time(
conn, scheme_type, scheme_name, id, type, query_time
)
return {"result": result}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
+15 -5
View File
@@ -40,8 +40,13 @@ from app.api.v1.endpoints.components import (
visuals,
)
from app.infra.db.postgresql import router as postgresql_router
from app.infra.db.timescaledb import router as timescaledb_router
from app.api.v1.endpoints import project_data
from app.api.v1.endpoints.timeseries import (
realtime as ts_realtime,
scheme as ts_scheme,
scada as ts_scada,
composite as ts_composite,
)
api_router = APIRouter()
@@ -90,9 +95,14 @@ api_router.include_router(
burst_location.router, prefix="/burst-location", tags=["Burst Location"]
)
# Database Routers
api_router.include_router(timescaledb_router, tags=["TimescaleDB"])
api_router.include_router(postgresql_router, tags=["PostgreSQL"])
# TimescaleDB Data Access
api_router.include_router(ts_realtime.router, tags=["TimescaleDB - Realtime"])
api_router.include_router(ts_scheme.router, tags=["TimescaleDB - Scheme"])
api_router.include_router(ts_scada.router, tags=["TimescaleDB - SCADA"])
api_router.include_router(ts_composite.router, tags=["TimescaleDB - Composite"])
# Project Data (PostgreSQL)
api_router.include_router(project_data.router, tags=["Project Data"])
# Extension
api_router.include_router(extension.router, tags=["Extension"])
+1 -1
View File
@@ -1 +1 @@
from .router import router
-1
View File
@@ -1,4 +1,3 @@
from .router import router
from .database import *
from .timescaledb_info import *
from .composite_queries import CompositeQueries
-619
View File
@@ -1,619 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List
from datetime import datetime
from psycopg import AsyncConnection
from .repositories.realtime import RealtimeRepository
from .repositories.scheme import SchemeRepository
from .repositories.scada import ScadaRepository
from .composite_queries import CompositeQueries
from app.auth.project_dependencies import (
get_project_pg_connection,
get_project_timescale_connection,
)
router = APIRouter()
# 动态项目 TimescaleDB 连接依赖
async def get_database_connection(
conn: AsyncConnection = Depends(get_project_timescale_connection),
):
yield conn
# 动态项目 PostgreSQL 连接依赖
async def get_postgres_connection(
conn: AsyncConnection = Depends(get_project_pg_connection),
):
yield conn
# --- Realtime Endpoints ---
@router.post("/realtime/links/batch", status_code=201)
async def insert_realtime_links(
data: List[dict], conn: AsyncConnection = Depends(get_database_connection)
):
await RealtimeRepository.insert_links_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/realtime/links")
async def get_realtime_links(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
return await RealtimeRepository.get_links_by_time_range(conn, start_time, end_time)
@router.delete("/realtime/links")
async def delete_realtime_links(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
await RealtimeRepository.delete_links_by_time_range(conn, start_time, end_time)
return {"message": "Deleted successfully"}
@router.patch("/realtime/links/{link_id}/field")
async def update_realtime_link_field(
link_id: str,
time: datetime,
field: str,
value: float, # Assuming float for now, could be Any but FastAPI needs type
conn: AsyncConnection = Depends(get_database_connection),
):
try:
await RealtimeRepository.update_link_field(conn, time, link_id, field, value)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/realtime/nodes/batch", status_code=201)
async def insert_realtime_nodes(
data: List[dict], conn: AsyncConnection = Depends(get_database_connection)
):
await RealtimeRepository.insert_nodes_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/realtime/nodes")
async def get_realtime_nodes(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
return await RealtimeRepository.get_nodes_by_time_range(conn, start_time, end_time)
@router.delete("/realtime/nodes")
async def delete_realtime_nodes(
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
await RealtimeRepository.delete_nodes_by_time_range(conn, start_time, end_time)
return {"message": "Deleted successfully"}
@router.post("/realtime/simulation/store", status_code=201)
async def store_realtime_simulation_result(
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Store realtime simulation results to TimescaleDB"""
await RealtimeRepository.store_realtime_simulation_result(
conn, node_result_list, link_result_list, result_start_time
)
return {"message": "Simulation results stored successfully"}
@router.get("/realtime/query/by-time-property")
async def query_realtime_records_by_time_property(
query_time: str,
type: str,
property: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Query all realtime records by time and property"""
try:
results = await RealtimeRepository.query_all_record_by_time_property(
conn, query_time, type, property
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/realtime/query/by-id-time")
async def query_realtime_simulation_by_id_time(
id: str,
type: str,
query_time: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Query realtime simulation results by id and time"""
try:
results = await RealtimeRepository.query_simulation_result_by_id_time(
conn, id, type, query_time
)
return {"result": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# --- Scheme Endpoints ---
@router.post("/scheme/links/batch", status_code=201)
async def insert_scheme_links(
data: List[dict], conn: AsyncConnection = Depends(get_database_connection)
):
await SchemeRepository.insert_links_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/scheme/links")
async def get_scheme_links(
scheme_type: str,
scheme_name: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
return await SchemeRepository.get_links_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time
)
@router.get("/scheme/links/{link_id}/field")
async def get_scheme_link_field(
scheme_type: str,
scheme_name: str,
link_id: str,
start_time: datetime,
end_time: datetime,
field: str,
conn: AsyncConnection = Depends(get_database_connection),
):
try:
return await SchemeRepository.get_link_field_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, link_id, field
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.patch("/scheme/links/{link_id}/field")
async def update_scheme_link_field(
scheme_type: str,
scheme_name: str,
link_id: str,
time: datetime,
field: str,
value: float,
conn: AsyncConnection = Depends(get_database_connection),
):
try:
await SchemeRepository.update_link_field(
conn, time, scheme_type, scheme_name, link_id, field, value
)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/scheme/links")
async def delete_scheme_links(
scheme_type: str,
scheme_name: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
await SchemeRepository.delete_links_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time
)
return {"message": "Deleted successfully"}
@router.post("/scheme/nodes/batch", status_code=201)
async def insert_scheme_nodes(
data: List[dict], conn: AsyncConnection = Depends(get_database_connection)
):
await SchemeRepository.insert_nodes_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/scheme/nodes/{node_id}/field")
async def get_scheme_node_field(
scheme_type: str,
scheme_name: str,
node_id: str,
start_time: datetime,
end_time: datetime,
field: str,
conn: AsyncConnection = Depends(get_database_connection),
):
try:
return await SchemeRepository.get_node_field_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time, node_id, field
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.patch("/scheme/nodes/{node_id}/field")
async def update_scheme_node_field(
scheme_type: str,
scheme_name: str,
node_id: str,
time: datetime,
field: str,
value: float,
conn: AsyncConnection = Depends(get_database_connection),
):
try:
await SchemeRepository.update_node_field(
conn, time, scheme_type, scheme_name, node_id, field, value
)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/scheme/nodes")
async def delete_scheme_nodes(
scheme_type: str,
scheme_name: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
await SchemeRepository.delete_nodes_by_scheme_and_time_range(
conn, scheme_type, scheme_name, start_time, end_time
)
return {"message": "Deleted successfully"}
@router.post("/scheme/simulation/store", status_code=201)
async def store_scheme_simulation_result(
scheme_type: str,
scheme_name: str,
node_result_list: List[dict],
link_result_list: List[dict],
result_start_time: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Store scheme simulation results to TimescaleDB"""
await SchemeRepository.store_scheme_simulation_result(
conn,
scheme_type,
scheme_name,
node_result_list,
link_result_list,
result_start_time,
)
return {"message": "Scheme simulation results stored successfully"}
@router.get("/scheme/query/by-scheme-time-property")
async def query_scheme_records_by_scheme_time_property(
scheme_type: str,
scheme_name: str,
query_time: str,
type: str,
property: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Query all scheme records by scheme, time and property"""
try:
results = await SchemeRepository.query_all_record_by_scheme_time_property(
conn, scheme_type, scheme_name, query_time, type, property
)
return {"results": results}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/scheme/query/by-id-time")
async def query_scheme_simulation_by_id_time(
scheme_type: str,
scheme_name: str,
id: str,
type: str,
query_time: str,
conn: AsyncConnection = Depends(get_database_connection),
):
"""Query scheme simulation results by id and time"""
try:
result = await SchemeRepository.query_scheme_simulation_result_by_id_time(
conn, scheme_type, scheme_name, id, type, query_time
)
return {"result": result}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# --- SCADA Endpoints ---
@router.post("/scada/batch", status_code=201)
async def insert_scada_data(
data: List[dict], conn: AsyncConnection = Depends(get_database_connection)
):
await ScadaRepository.insert_scada_batch(conn, data)
return {"message": f"Inserted {len(data)} records"}
@router.get("/scada/by-ids-time-range")
async def get_scada_by_ids_time_range(
start_time: datetime,
end_time: datetime,
device_ids: str,
conn: AsyncConnection = Depends(get_database_connection),
):
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()] if device_ids else []
)
return await ScadaRepository.get_scada_by_ids_time_range(
conn, device_ids_list, start_time, end_time
)
@router.get("/scada/by-ids-field-time-range")
async def get_scada_field_by_ids_time_range(
start_time: datetime,
end_time: datetime,
field: str,
device_ids: str,
conn: AsyncConnection = Depends(get_database_connection),
):
try:
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
return await ScadaRepository.get_scada_field_by_id_time_range(
conn, device_ids_list, start_time, end_time, field
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.patch("/scada/{device_id}/field")
async def update_scada_field(
device_id: str,
time: datetime,
field: str,
value: float,
conn: AsyncConnection = Depends(get_database_connection),
):
try:
await ScadaRepository.update_scada_field(conn, time, device_id, field, value)
return {"message": "Updated successfully"}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/scada/by-id-time-range")
async def delete_scada_data(
device_id: str,
start_time: datetime,
end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection),
):
await ScadaRepository.delete_scada_by_id_time_range(
conn, device_id, start_time, end_time
)
return {"message": "Deleted successfully"}
# --- Composite Query Endpoints ---
@router.get("/composite/scada-simulation")
async def get_scada_associated_simulation_data(
start_time: datetime,
end_time: datetime,
device_ids: str,
scheme_type: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
timescale_conn: AsyncConnection = Depends(get_database_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
获取 SCADA 关联的 link/node 模拟值
根据传入的 SCADA device_ids,找到关联的 link/node
并根据对应的 type,查询对应的模拟数据
"""
try:
# 手动解析 device_ids 为 List[str],去除空格
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
if scheme_type and scheme_name:
result = await CompositeQueries.get_scada_associated_scheme_simulation_data(
timescale_conn,
postgres_conn,
device_ids_list, # 使用解析后的列表
start_time,
end_time,
scheme_type,
scheme_name,
)
else:
result = (
await CompositeQueries.get_scada_associated_realtime_simulation_data(
timescale_conn,
postgres_conn,
device_ids_list, # 使用解析后的列表
start_time,
end_time,
)
)
if result is None:
raise HTTPException(status_code=404, detail="No simulation data found")
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/element-simulation")
async def get_feature_simulation_data(
start_time: datetime,
end_time: datetime,
feature_infos: str = Query(
..., description="特征信息,格式: id1:type1,id2:type2type为pipe或junction"
),
scheme_type: str = Query(None, description="指定方案类型,若为空则查询实时数据"),
scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
timescale_conn: AsyncConnection = Depends(get_database_connection),
):
"""
获取 link/node 模拟值
根据传入的 featureInfos,找到关联的 link/node
并根据对应的 type,查询对应的模拟数据
Args:
feature_infos: 格式为 "element_id1:type1,element_id2:type2"
例如: "P1:pipe,J1:junction"
"""
try:
# 解析 feature_infos 为 List[Tuple[str, str]]
feature_infos_list = []
if feature_infos:
for item in feature_infos.split(","):
item = item.strip()
if ":" in item:
element_id, element_type = item.split(":", 1)
feature_infos_list.append(
(element_id.strip(), element_type.strip())
)
if not feature_infos_list:
raise HTTPException(status_code=400, detail="feature_infos cannot be empty")
if scheme_type and scheme_name:
result = await CompositeQueries.get_scheme_simulation_data(
timescale_conn,
feature_infos_list,
start_time,
end_time,
scheme_type,
scheme_name,
)
else:
result = await CompositeQueries.get_realtime_simulation_data(
timescale_conn,
feature_infos_list,
start_time,
end_time,
)
if result is None:
raise HTTPException(status_code=404, detail="No simulation data found")
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/element-scada")
async def get_element_associated_scada_data(
element_id: str,
start_time: datetime,
end_time: datetime,
use_cleaned: bool = Query(False, description="是否使用清洗后的数据"),
timescale_conn: AsyncConnection = Depends(get_database_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
获取 link/node 关联的 SCADA 监测值
根据传入的 link/node id,匹配 SCADA 信息,
如果存在关联的 SCADA device_id,获取实际的监测数据
"""
try:
result = await CompositeQueries.get_element_associated_scada_data(
timescale_conn, postgres_conn, element_id, start_time, end_time, use_cleaned
)
if result is None:
raise HTTPException(
status_code=404, detail="No associated SCADA data found"
)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/composite/clean-scada")
async def clean_scada_data(
device_ids: str,
start_time: datetime = Query(...),
end_time: datetime = Query(...),
timescale_conn: AsyncConnection = Depends(get_database_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
清洗 SCADA 数据
根据 device_ids 查询 monitored_value,清洗后更新 cleaned_value
"""
try:
if device_ids == "all":
device_ids_list = []
else:
device_ids_list = (
[id.strip() for id in device_ids.split(",") if id.strip()]
if device_ids
else []
)
return await CompositeQueries.clean_scada_data(
timescale_conn, postgres_conn, device_ids_list, start_time, end_time
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/pipeline-health-prediction")
async def predict_pipeline_health(
query_time: datetime = Query(..., description="查询时间"),
network_name: str = Query(..., description="管网数据库名称"),
timescale_conn: AsyncConnection = Depends(get_database_connection),
):
"""
预测管道健康状况
根据管网名称和当前时间,查询管道信息和实时数据,
使用随机生存森林模型预测管道的生存概率
Args:
query_time: 查询时间
db_name: 管网数据库名称
Returns:
预测结果列表,每个元素包含 link_id 和对应的生存函数
"""
try:
return await CompositeQueries.predict_pipeline_health(
timescale_conn, network_name, query_time
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")