diff --git a/app/infra/db/postgresql/internal_queries.py b/app/infra/db/postgresql/internal_queries.py deleted file mode 100644 index 07cc254..0000000 --- a/app/infra/db/postgresql/internal_queries.py +++ /dev/null @@ -1,83 +0,0 @@ -import time -from typing import List, Optional - -from fastapi.logger import logger -import app.core.config as postgresql_info -import psycopg - - -class InternalQueries: - @staticmethod - def get_links_by_property( - fields: Optional[List[str]] = None, - property_conditions: Optional[dict] = None, - db_name: str = None, - max_retries: int = 3, - ) -> List[dict]: - """ - 查询pg数据库中,pipes 的指定字段记录或根据属性筛选 - :param fields: 要查询的字段列表,如 ["id", "diameter", "status"],默认查询所有字段 - :param property: 可选的筛选条件字典,如 {"status": "Open"} 或 {"diameter": 300} - :param db_name: 数据库名称 - :param max_retries: 最大重试次数 - :return: 包含所有记录的列表,每条记录为一个字典 - """ - # 如果未指定字段,查询所有字段 - if not fields: - fields = [ - "id", - "node1", - "node2", - "length", - "diameter", - "roughness", - "minor_loss", - "status", - ] - - for attempt in range(max_retries): - try: - conn_string = ( - postgresql_info.get_pgconn_string(db_name=db_name) - if db_name - else postgresql_info.get_pgconn_string() - ) - with psycopg.Connection.connect(conn_string) as conn: - with conn.cursor() as cur: - # 构建SELECT子句 - select_fields = ", ".join(fields) - base_query = f""" - SELECT {select_fields} - FROM public.pipes - """ - - # 如果提供了筛选条件,构建WHERE子句 - if property_conditions: - conditions = [] - params = [] - for key, value in property_conditions.items(): - conditions.append(f"{key} = %s") - params.append(value) - - query = base_query + " WHERE " + " AND ".join(conditions) - cur.execute(query, params) - else: - cur.execute(base_query) - - records = cur.fetchall() - # 将查询结果转换为字典列表 - pipes = [] - for record in records: - pipe_dict = {} - for idx, field in enumerate(fields): - pipe_dict[field] = record[idx] - pipes.append(pipe_dict) - - return pipes - break # 成功 - except Exception as e: - logger.error(f"查询尝试 {attempt + 1} 失败: {e}") - if attempt < max_retries - 1: - time.sleep(1) - else: - raise diff --git a/app/infra/db/timescaledb/composite_queries.py b/app/infra/db/timescaledb/composite_queries.py index f961377..a57479d 100644 --- a/app/infra/db/timescaledb/composite_queries.py +++ b/app/infra/db/timescaledb/composite_queries.py @@ -9,7 +9,6 @@ from app.algorithms.cleaning.pressure import clean_pressure_data_df_km from app.algorithms.health.analyzer import PipelineHealthAnalyzer import app.native.wndb as wndb -from app.infra.db.postgresql.internal_queries import InternalQueries from app.infra.db.timescaledb.repositories.realtime import RealtimeRepository from app.infra.db.timescaledb.repositories.scheme import SchemeRepository from app.infra.db.timescaledb.repositories.scada import ScadaRepository @@ -498,10 +497,7 @@ class CompositeQueries: # 批量查询这些管道的详细信息 fields = ["id", "diameter", "node1", "node2"] - all_links = InternalQueries.get_links_by_property( - fields=fields, - db_name=network_name, - ) + all_links = wndb.get_pipes_by_property(network_name, fields=fields) # 转换为字典以快速查找 links_dict = {link["id"]: link for link in all_links} diff --git a/app/native/wndb/__init__.py b/app/native/wndb/__init__.py index 8a5978b..57d230b 100644 --- a/app/native/wndb/__init__.py +++ b/app/native/wndb/__init__.py @@ -130,7 +130,14 @@ from .s4_tanks import get_tank_schema, add_tank, get_tank, set_tank, get_all_tan from .batch_api import delete_tank_cascade from .s5_pipes import PIPE_STATUS_OPEN, PIPE_STATUS_CLOSED, PIPE_STATUS_CV -from .s5_pipes import get_pipe_schema, add_pipe, get_pipe, set_pipe, get_all_pipes +from .s5_pipes import ( + get_pipe_schema, + add_pipe, + get_pipe, + set_pipe, + get_all_pipes, + get_pipes_by_property, +) from .batch_api import delete_pipe_cascade from .s6_pumps import get_pump_schema, add_pump, get_pump, set_pump, get_all_pumps diff --git a/app/native/wndb/s5_pipes.py b/app/native/wndb/s5_pipes.py index 2930c3a..91b50cc 100644 --- a/app/native/wndb/s5_pipes.py +++ b/app/native/wndb/s5_pipes.py @@ -54,6 +54,52 @@ def get_all_pipes(name: str) -> list[dict[str, Any]]: return result + +def get_pipes_by_property( + name: str, + fields: list[str] | None = None, + property_conditions: dict[str, Any] | None = None, +) -> list[dict[str, Any]]: + if not fields: + fields = [ + 'id', + 'node1', + 'node2', + 'length', + 'diameter', + 'roughness', + 'minor_loss', + 'status', + ] + + rows = read_all(name, "select * from pipes") + if rows == None: + return [] + + result = [] + for row in rows: + if property_conditions: + matched = True + for key, value in property_conditions.items(): + if row[key] != value: + matched = False + break + if not matched: + continue + + d = {} + for field in fields: + value = row[field] + if field in ('length', 'diameter', 'roughness', 'minor_loss') and value is not None: + d[field] = float(value) + elif field in ('id', 'node1', 'node2', 'status') and value is not None: + d[field] = str(value) + else: + d[field] = value + result.append(d) + + return result + class Pipe(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'pipe'