From e8a883bcb7961b23037d74577db2c88b2b3b7320 Mon Sep 17 00:00:00 2001 From: JIANG Date: Mon, 22 Dec 2025 16:12:37 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=AE=9A=E4=BD=8D=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=E6=9F=A5=E8=AF=A2=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 2 + postgresql/__init__.py | 1 + postgresql/router.py | 55 +++++++++++++++++++++- postgresql/scheme.py | 104 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+), 1 deletion(-) create mode 100644 postgresql/__init__.py create mode 100644 postgresql/scheme.py diff --git a/main.py b/main.py index bc8d49e..20a611b 100644 --- a/main.py +++ b/main.py @@ -42,6 +42,7 @@ from datetime import datetime, timedelta, timezone # 第三方/自定义模块 import influxdb_api import timescaledb +import postgresql import py_linq import time_api import simulation @@ -136,6 +137,7 @@ async def lifespan(app: FastAPI): app = FastAPI(lifespan=lifespan) app.include_router(timescaledb.router) +app.include_router(postgresql.router) access_tokens = [] diff --git a/postgresql/__init__.py b/postgresql/__init__.py new file mode 100644 index 0000000..2378043 --- /dev/null +++ b/postgresql/__init__.py @@ -0,0 +1 @@ +from .router import router diff --git a/postgresql/router.py b/postgresql/router.py index 1b36ba7..28df3ea 100644 --- a/postgresql/router.py +++ b/postgresql/router.py @@ -4,6 +4,7 @@ from psycopg import AsyncConnection from .database import get_database_instance from .scada_info import ScadaRepository +from .scheme import SchemeRepository router = APIRouter(prefix="/postgresql", tags=["postgresql"]) @@ -29,9 +30,61 @@ async def get_scada_info_with_connection( """ try: # 使用ScadaRepository查询SCADA信息 - scada_data = await ScadaRepository.get_scadas_info(conn) + scada_data = await ScadaRepository.get_scadas(conn) return {"success": True, "data": scada_data, "count": len(scada_data)} except Exception as e: raise HTTPException( status_code=500, detail=f"查询SCADA信息时发生错误: {str(e)}" ) + + +@router.get("/scheme-list") +async def get_scheme_list_with_connection( + conn: AsyncConnection = Depends(get_database_connection), +): + """ + 使用连接池查询所有方案信息 + """ + try: + # 使用SchemeRepository查询方案信息 + scheme_data = await SchemeRepository.get_schemes(conn) + return {"success": True, "data": scheme_data, "count": len(scheme_data)} + except Exception as e: + raise HTTPException(status_code=500, detail=f"查询方案信息时发生错误: {str(e)}") + + +@router.get("/burst-locate-result") +async def get_burst_locate_result_with_connection( + conn: AsyncConnection = Depends(get_database_connection), +): + """ + 使用连接池查询所有爆管定位结果 + """ + try: + # 使用SchemeRepository查询爆管定位结果 + burst_data = await SchemeRepository.get_burst_locate_results(conn) + return {"success": True, "data": burst_data, "count": len(burst_data)} + except Exception as e: + raise HTTPException( + status_code=500, detail=f"查询爆管定位结果时发生错误: {str(e)}" + ) + + +@router.get("/burst-locate-result/{burst_incident}") +async def get_burst_locate_result_by_incident( + burst_incident: str, + conn: AsyncConnection = Depends(get_database_connection), +): + """ + 根据 burst_incident 查询爆管定位结果 + """ + try: + # 使用SchemeRepository查询爆管定位结果 + return await SchemeRepository.get_burst_locate_result_by_incident( + conn, burst_incident + ) + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"根据 burst_incident 查询爆管定位结果时发生错误: {str(e)}", + ) diff --git a/postgresql/scheme.py b/postgresql/scheme.py new file mode 100644 index 0000000..ed9c240 --- /dev/null +++ b/postgresql/scheme.py @@ -0,0 +1,104 @@ +from typing import List, Optional, Any +from psycopg import AsyncConnection + + +class SchemeRepository: + + @staticmethod + async def get_schemes(conn: AsyncConnection) -> List[dict]: + """ + 查询pg数据库中, scheme_list 的所有记录 + :param conn: 异步数据库连接 + :return: 包含所有记录的列表, 每条记录为一个字典 + """ + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail + FROM public.scheme_list + """ + ) + records = await cur.fetchall() + + scheme_list = [] + for record in records: + scheme_list.append( + { + "scheme_id": record["scheme_id"], + "scheme_name": record["scheme_name"], + "scheme_type": record["scheme_type"], + "username": record["username"], + "create_time": record["create_time"], + "scheme_start_time": record["scheme_start_time"], + "scheme_detail": record["scheme_detail"], + } + ) + + return scheme_list + + @staticmethod + async def get_burst_locate_results(conn: AsyncConnection) -> List[dict]: + """ + 查询pg数据库中, burst_locate_result 的所有记录 + :param conn: 异步数据库连接 + :return: 包含所有记录的列表, 每条记录为一个字典 + """ + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT id, type, burst_incident, leakage, detect_time, locate_result + FROM public.burst_locate_result + """ + ) + records = await cur.fetchall() + + results = [] + for record in records: + results.append( + { + "id": record["id"], + "type": record["type"], + "burst_incident": record["burst_incident"], + "leakage": record["leakage"], + "detect_time": record["detect_time"], + "locate_result": record["locate_result"], + } + ) + + return results + + @staticmethod + async def get_burst_locate_result_by_incident( + conn: AsyncConnection, burst_incident: str + ) -> List[dict]: + """ + 根据 burst_incident 查询爆管定位结果 + :param conn: 异步数据库连接 + :param burst_incident: 爆管事件标识 + :return: 包含匹配记录的列表 + """ + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT id, type, burst_incident, leakage, detect_time, locate_result + FROM public.burst_locate_result + WHERE burst_incident = %s + """, + (burst_incident,), + ) + records = await cur.fetchall() + + results = [] + for record in records: + results.append( + { + "id": record["id"], + "type": record["type"], + "burst_incident": record["burst_incident"], + "leakage": record["leakage"], + "detect_time": record["detect_time"], + "locate_result": record["locate_result"], + } + ) + + return results