Files
TJWaterServerBinary/postgresql/internal_queries.py
2025-12-17 17:02:22 +08:00

84 lines
3.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
from typing import List, Optional
from fastapi.logger import logger
import postgresql_info
import psycopg
class InternalQueries:
@staticmethod
def get_links_by_property(
fields: Optional[List[str]] = None,
property_conditions: Optional[dict] = None,
db_name: str = None,
max_retries: int = 3,
) -> List[dict]:
"""
查询pg数据库中,pipes 的指定字段记录或根据属性筛选
:param fields: 要查询的字段列表,如 ["id", "diameter", "status"],默认查询所有字段
:param property: 可选的筛选条件字典,如 {"status": "Open"} 或 {"diameter": 300}
:param db_name: 数据库名称
:param max_retries: 最大重试次数
:return: 包含所有记录的列表,每条记录为一个字典
"""
# 如果未指定字段,查询所有字段
if not fields:
fields = [
"id",
"node1",
"node2",
"length",
"diameter",
"roughness",
"minor_loss",
"status",
]
for attempt in range(max_retries):
try:
conn_string = (
postgresql_info.get_pgconn_string(db_name=db_name)
if db_name
else postgresql_info.get_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
with conn.cursor() as cur:
# 构建SELECT子句
select_fields = ", ".join(fields)
base_query = f"""
SELECT {select_fields}
FROM public.pipes
"""
# 如果提供了筛选条件构建WHERE子句
if property_conditions:
conditions = []
params = []
for key, value in property_conditions.items():
conditions.append(f"{key} = %s")
params.append(value)
query = base_query + " WHERE " + " AND ".join(conditions)
cur.execute(query, params)
else:
cur.execute(base_query)
records = cur.fetchall()
# 将查询结果转换为字典列表
pipes = []
for record in records:
pipe_dict = {}
for idx, field in enumerate(fields):
pipe_dict[field] = record[idx]
pipes.append(pipe_dict)
return pipes
break # 成功
except Exception as e:
logger.error(f"查询尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
raise