移除旧的InternalQueries类,更新管道查询逻辑
This commit is contained in:
@@ -1,83 +0,0 @@
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
from fastapi.logger import logger
|
||||
import app.core.config as postgresql_info
|
||||
import psycopg
|
||||
|
||||
|
||||
class InternalQueries:
|
||||
@staticmethod
|
||||
def get_links_by_property(
|
||||
fields: Optional[List[str]] = None,
|
||||
property_conditions: Optional[dict] = None,
|
||||
db_name: str = None,
|
||||
max_retries: int = 3,
|
||||
) -> List[dict]:
|
||||
"""
|
||||
查询pg数据库中,pipes 的指定字段记录或根据属性筛选
|
||||
:param fields: 要查询的字段列表,如 ["id", "diameter", "status"],默认查询所有字段
|
||||
:param property: 可选的筛选条件字典,如 {"status": "Open"} 或 {"diameter": 300}
|
||||
:param db_name: 数据库名称
|
||||
:param max_retries: 最大重试次数
|
||||
:return: 包含所有记录的列表,每条记录为一个字典
|
||||
"""
|
||||
# 如果未指定字段,查询所有字段
|
||||
if not fields:
|
||||
fields = [
|
||||
"id",
|
||||
"node1",
|
||||
"node2",
|
||||
"length",
|
||||
"diameter",
|
||||
"roughness",
|
||||
"minor_loss",
|
||||
"status",
|
||||
]
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
conn_string = (
|
||||
postgresql_info.get_pgconn_string(db_name=db_name)
|
||||
if db_name
|
||||
else postgresql_info.get_pgconn_string()
|
||||
)
|
||||
with psycopg.Connection.connect(conn_string) as conn:
|
||||
with conn.cursor() as cur:
|
||||
# 构建SELECT子句
|
||||
select_fields = ", ".join(fields)
|
||||
base_query = f"""
|
||||
SELECT {select_fields}
|
||||
FROM public.pipes
|
||||
"""
|
||||
|
||||
# 如果提供了筛选条件,构建WHERE子句
|
||||
if property_conditions:
|
||||
conditions = []
|
||||
params = []
|
||||
for key, value in property_conditions.items():
|
||||
conditions.append(f"{key} = %s")
|
||||
params.append(value)
|
||||
|
||||
query = base_query + " WHERE " + " AND ".join(conditions)
|
||||
cur.execute(query, params)
|
||||
else:
|
||||
cur.execute(base_query)
|
||||
|
||||
records = cur.fetchall()
|
||||
# 将查询结果转换为字典列表
|
||||
pipes = []
|
||||
for record in records:
|
||||
pipe_dict = {}
|
||||
for idx, field in enumerate(fields):
|
||||
pipe_dict[field] = record[idx]
|
||||
pipes.append(pipe_dict)
|
||||
|
||||
return pipes
|
||||
break # 成功
|
||||
except Exception as e:
|
||||
logger.error(f"查询尝试 {attempt + 1} 失败: {e}")
|
||||
if attempt < max_retries - 1:
|
||||
time.sleep(1)
|
||||
else:
|
||||
raise
|
||||
@@ -9,7 +9,6 @@ from app.algorithms.cleaning.pressure import clean_pressure_data_df_km
|
||||
from app.algorithms.health.analyzer import PipelineHealthAnalyzer
|
||||
import app.native.wndb as wndb
|
||||
|
||||
from app.infra.db.postgresql.internal_queries import InternalQueries
|
||||
from app.infra.db.timescaledb.repositories.realtime import RealtimeRepository
|
||||
from app.infra.db.timescaledb.repositories.scheme import SchemeRepository
|
||||
from app.infra.db.timescaledb.repositories.scada import ScadaRepository
|
||||
@@ -498,10 +497,7 @@ class CompositeQueries:
|
||||
|
||||
# 批量查询这些管道的详细信息
|
||||
fields = ["id", "diameter", "node1", "node2"]
|
||||
all_links = InternalQueries.get_links_by_property(
|
||||
fields=fields,
|
||||
db_name=network_name,
|
||||
)
|
||||
all_links = wndb.get_pipes_by_property(network_name, fields=fields)
|
||||
|
||||
# 转换为字典以快速查找
|
||||
links_dict = {link["id"]: link for link in all_links}
|
||||
|
||||
@@ -130,7 +130,14 @@ from .s4_tanks import get_tank_schema, add_tank, get_tank, set_tank, get_all_tan
|
||||
from .batch_api import delete_tank_cascade
|
||||
|
||||
from .s5_pipes import PIPE_STATUS_OPEN, PIPE_STATUS_CLOSED, PIPE_STATUS_CV
|
||||
from .s5_pipes import get_pipe_schema, add_pipe, get_pipe, set_pipe, get_all_pipes
|
||||
from .s5_pipes import (
|
||||
get_pipe_schema,
|
||||
add_pipe,
|
||||
get_pipe,
|
||||
set_pipe,
|
||||
get_all_pipes,
|
||||
get_pipes_by_property,
|
||||
)
|
||||
from .batch_api import delete_pipe_cascade
|
||||
|
||||
from .s6_pumps import get_pump_schema, add_pump, get_pump, set_pump, get_all_pumps
|
||||
|
||||
@@ -54,6 +54,52 @@ def get_all_pipes(name: str) -> list[dict[str, Any]]:
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_pipes_by_property(
|
||||
name: str,
|
||||
fields: list[str] | None = None,
|
||||
property_conditions: dict[str, Any] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
if not fields:
|
||||
fields = [
|
||||
'id',
|
||||
'node1',
|
||||
'node2',
|
||||
'length',
|
||||
'diameter',
|
||||
'roughness',
|
||||
'minor_loss',
|
||||
'status',
|
||||
]
|
||||
|
||||
rows = read_all(name, "select * from pipes")
|
||||
if rows == None:
|
||||
return []
|
||||
|
||||
result = []
|
||||
for row in rows:
|
||||
if property_conditions:
|
||||
matched = True
|
||||
for key, value in property_conditions.items():
|
||||
if row[key] != value:
|
||||
matched = False
|
||||
break
|
||||
if not matched:
|
||||
continue
|
||||
|
||||
d = {}
|
||||
for field in fields:
|
||||
value = row[field]
|
||||
if field in ('length', 'diameter', 'roughness', 'minor_loss') and value is not None:
|
||||
d[field] = float(value)
|
||||
elif field in ('id', 'node1', 'node2', 'status') and value is not None:
|
||||
d[field] = str(value)
|
||||
else:
|
||||
d[field] = value
|
||||
result.append(d)
|
||||
|
||||
return result
|
||||
|
||||
class Pipe(object):
|
||||
def __init__(self, input: dict[str, Any]) -> None:
|
||||
self.type = 'pipe'
|
||||
|
||||
Reference in New Issue
Block a user