统一 pg 数据库的连接

This commit is contained in:
JIANG
2025-11-26 10:10:00 +08:00
parent 15d9476858
commit 5ab10886a0
5 changed files with 44 additions and 24 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
*.pyc
.env

View File

@@ -29,6 +29,7 @@ import pytz
import influxdb_info
import project_info
import time_api
from postgresql_info import get_pgconn_string
# influxdb数据库连接信息
url = influxdb_info.url
@@ -46,7 +47,7 @@ def query_pg_scada_info_realtime(name: str) -> None:
:return:
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -123,7 +124,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None:
# DingZQ, 2025-03-21
# close_project(name)
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -199,7 +200,7 @@ def query_pg_scada_info(name: str) -> list[dict]:
:return: 包含所有记录的列表,每条记录为一个字典
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
records_list = []
try:
@@ -3360,7 +3361,8 @@ def query_corresponding_query_id_and_element_id(name: str) -> None:
:return:
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = f"dbname={db_name} host={db_host} user={db_user} password={db_password}"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:

View File

@@ -22,6 +22,7 @@ import project_info
import api_ex.kmeans_sensor
import api_ex.Fdataclean
import api_ex.Pdataclean
from postgresql_info import get_pgconn_string
############################################################
# burst analysis 01
@@ -765,7 +766,7 @@ def submit_scada_info(name: str, coord_id: str) -> None:
print(f"检测到的文件编码:{file_encoding}")
try:
# 动态替换数据库名称
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect(conn_string) as conn:
@@ -841,7 +842,7 @@ def create_user(name: str, username: str, password: str):
"""
try:
# 动态替换数据库名称
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -866,7 +867,7 @@ def delete_user(name: str, username: str):
"""
try:
# 动态替换数据库名称
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -886,7 +887,7 @@ def scheme_name_exists(name: str, scheme_name: str) -> bool:
:return: 如果存在返回 True否则返回 False
"""
try:
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM scheme_list WHERE scheme_name = %s", (scheme_name,))
@@ -913,7 +914,7 @@ def store_scheme_info(name: str, scheme_name: str, scheme_type: str, username: s
:return:
"""
try:
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
sql = """
@@ -937,7 +938,7 @@ def delete_scheme_info(name: str, scheme_name: str) -> None:
:param scheme_name: 要删除的方案名称
"""
try:
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 使用参数化查询删除方案记录
@@ -957,7 +958,7 @@ def query_scheme_list(name: str) -> list:
"""
try:
# 动态替换数据库名称
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -982,7 +983,7 @@ def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str):
"""
try:
# 动态连接到指定的数据库
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
# 读取 Shapefile 文件
gdf = gpd.read_file(shp_file_path)
@@ -1018,7 +1019,7 @@ def submit_risk_probability_result(name: str, result_file_path: str) -> None:
try:
# 动态替换数据库名称
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
# 连接到 PostgreSQL 数据库
with psycopg.connect(conn_string) as conn:
@@ -1072,7 +1073,7 @@ def pressure_sensor_placement_sensitivity(name: str, scheme_name: str, sensor_nu
"""
sensor_location = sensitivity.get_ID(name=name, sensor_num=sensor_number, min_diameter=min_diameter)
try:
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
sql = """
@@ -1104,7 +1105,7 @@ def pressure_sensor_placement_kmeans(name: str, scheme_name: str, sensor_number:
dump_inp(name,inp_name,'2')
sensor_location = api_ex.kmeans_sensor.kmeans_sensor_placement(name=name, sensor_num=sensor_number, min_diameter=min_diameter)
try:
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
sql = """

15
postgresql_info.py Normal file
View File

@@ -0,0 +1,15 @@
from dotenv import load_dotenv
import os
load_dotenv()
pg_name = os.getenv("DB_NAME") or "TJWater"
pg_host = os.getenv("DB_HOST") or "127.0.0.1"
pg_user = os.getenv("DB_USER") or "tjwater"
pg_password = os.getenv("DB_PASSWORD") or "Tjwater@123456"
def get_pgconn_string(
db_name=pg_name, db_host=pg_host, db_user=pg_user, db_password=pg_password
):
return f"dbname={db_name} host={db_host} user={db_user} password={db_password}"

View File

@@ -19,6 +19,7 @@ import logging
import globals
import uuid
import project_info
from postgresql_info import get_pgconn_string
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -30,7 +31,7 @@ def query_corresponding_element_id_and_query_id(name: str) -> None:
:return:
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -73,7 +74,7 @@ def query_corresponding_pattern_id_and_query_id(name: str) -> None:
:return:
"""
# 连接数据库
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -107,7 +108,7 @@ def query_non_realtime_region(name: str) -> dict:
"""
source_outflow_regions = [] # 用于存储所有 region包含重复的
# 构建连接字符串
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
# 连接到数据库
with psycopg.connect(conn_string) as conn:
@@ -165,7 +166,7 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c
"""
globals.non_realtime_region_patterns = {region: [] for region in globals.source_outflow_region.keys()}
region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()}
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -226,7 +227,7 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi
globals.realtime_region_pipe_flow_and_demand_id = {region: [] for region in globals.source_outflow_region.keys()}
# 创建一个映射,从 frozenset(ids) 到 region_key
region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()}
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -283,7 +284,7 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_
:param column_prefix: 需要提取的列的前缀
:return: pipe_flow_region_patterns 字典
"""
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -334,7 +335,7 @@ def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict:
:param SCADA_ID: SCADA设备的ID
:return: 包含associated_element_id和api_query_id的字典
"""
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
# 使用 psycopg.connect 创建连接
with psycopg.connect(conn_string) as conn:
@@ -383,7 +384,7 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict,
if not all_ids:
logging.warning("No associated_source_outflow_id found in source_outflow_region.")
return globals.source_outflow_region_id
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -431,7 +432,7 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real
globals.source_outflow_region_patterns = {region: [] for region in globals.source_outflow_region_id.keys()}
globals.realtime_region_pipe_flow_and_demand_patterns = {region: [] for region in
globals.realtime_region_pipe_flow_and_demand_id.keys()}
conn_string = f"dbname={name} host=127.0.0.1"
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur: