Files
TJWaterServerBinary/app/api/v1/endpoints/project_data.py
T

93 lines
3.2 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Path, Query
from psycopg import AsyncConnection
import app.native.wndb as wndb
from app.infra.db.postgresql.scheme import SchemeRepository
from app.auth.project_dependencies import get_project_pg_connection
from app.services import project_info
router = APIRouter()
async def get_database_connection(
conn: AsyncConnection = Depends(get_project_pg_connection),
):
"""获取数据库连接"""
yield conn
@router.get("/scada-info", summary="获取SCADA信息", description="使用连接池查询所有SCADA信息")
async def get_scada_info_with_connection(
conn: AsyncConnection = Depends(get_database_connection),
):
"""
获取所有SCADA信息
返回项目中所有的SCADA设备信息
"""
try:
_ = conn
network_name = project_info.name
scada_data = wndb.get_all_scada_info(network_name) if network_name else []
return {"success": True, "data": scada_data, "count": len(scada_data)}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"查询SCADA信息时发生错误: {str(e)}"
)
@router.get("/scheme-list", summary="获取方案列表", description="使用连接池查询所有方案信息")
async def get_scheme_list_with_connection(
conn: AsyncConnection = Depends(get_database_connection),
):
"""
获取所有方案信息
返回项目中所有方案的详细信息
"""
try:
scheme_data = await SchemeRepository.get_schemes(conn)
return {"success": True, "data": scheme_data, "count": len(scheme_data)}
except Exception as e:
raise HTTPException(status_code=500, detail=f"查询方案信息时发生错误: {str(e)}")
@router.get("/burst-locate-result", summary="获取爆管定位结果", description="使用连接池查询所有爆管定位结果")
async def get_burst_locate_result_with_connection(
conn: AsyncConnection = Depends(get_database_connection),
):
"""
获取所有爆管定位结果
返回项目中所有的爆管定位分析结果
"""
try:
burst_data = await SchemeRepository.get_burst_locate_results(conn)
return {"success": True, "data": burst_data, "count": len(burst_data)}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"查询爆管定位结果时发生错误: {str(e)}"
)
@router.get("/burst-locate-result/{burst_incident}", summary="按事件查询爆管定位结果", description="根据爆管事件ID查询对应的爆管定位结果")
async def get_burst_locate_result_by_incident(
burst_incident: str = Path(..., description="爆管事件ID"),
conn: AsyncConnection = Depends(get_database_connection),
):
"""
根据爆管事件ID查询爆管定位结果
参数:
burst_incident: 爆管事件的唯一标识符
"""
try:
return await SchemeRepository.get_burst_locate_result_by_incident(
conn, burst_incident
)
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"根据 burst_incident 查询爆管定位结果时发生错误: {str(e)}",
)