From 5ab10886a06ae4d5807644e2adbb3e5f54445577 Mon Sep 17 00:00:00 2001 From: JIANG Date: Wed, 26 Nov 2025 10:10:00 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=9F=E4=B8=80=20pg=20=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=BA=93=E7=9A=84=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + influxdb_api.py | 10 ++++++---- online_Analysis.py | 23 ++++++++++++----------- postgresql_info.py | 15 +++++++++++++++ simulation.py | 19 ++++++++++--------- 5 files changed, 44 insertions(+), 24 deletions(-) create mode 100644 postgresql_info.py diff --git a/.gitignore b/.gitignore index 0d20b64..29f322e 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ *.pyc +.env diff --git a/influxdb_api.py b/influxdb_api.py index bd74fdf..b964f60 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -29,6 +29,7 @@ import pytz import influxdb_info import project_info import time_api +from postgresql_info import get_pgconn_string # influxdb数据库连接信息 url = influxdb_info.url @@ -46,7 +47,7 @@ def query_pg_scada_info_realtime(name: str) -> None: :return: """ # 连接数据库 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -123,7 +124,7 @@ def query_pg_scada_info_non_realtime(name: str) -> None: # DingZQ, 2025-03-21 # close_project(name) # 连接数据库 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -199,7 +200,7 @@ def query_pg_scada_info(name: str) -> list[dict]: :return: 包含所有记录的列表,每条记录为一个字典 """ # 连接数据库 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) records_list = [] try: @@ -3360,7 +3361,8 @@ def query_corresponding_query_id_and_element_id(name: str) -> None: :return: """ # 连接数据库 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = f"dbname={db_name} host={db_host} user={db_user} password={db_password}" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: diff --git a/online_Analysis.py b/online_Analysis.py index d07a883..4a6b7d1 100644 --- a/online_Analysis.py +++ b/online_Analysis.py @@ -22,6 +22,7 @@ import project_info import api_ex.kmeans_sensor import api_ex.Fdataclean import api_ex.Pdataclean +from postgresql_info import get_pgconn_string ############################################################ # burst analysis 01 @@ -765,7 +766,7 @@ def submit_scada_info(name: str, coord_id: str) -> None: print(f"检测到的文件编码:{file_encoding}") try: # 动态替换数据库名称 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect(conn_string) as conn: @@ -841,7 +842,7 @@ def create_user(name: str, username: str, password: str): """ try: # 动态替换数据库名称 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -866,7 +867,7 @@ def delete_user(name: str, username: str): """ try: # 动态替换数据库名称 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -886,7 +887,7 @@ def scheme_name_exists(name: str, scheme_name: str) -> bool: :return: 如果存在返回 True,否则返回 False """ try: - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: cur.execute("SELECT COUNT(*) FROM scheme_list WHERE scheme_name = %s", (scheme_name,)) @@ -913,7 +914,7 @@ def store_scheme_info(name: str, scheme_name: str, scheme_type: str, username: s :return: """ try: - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: sql = """ @@ -937,7 +938,7 @@ def delete_scheme_info(name: str, scheme_name: str) -> None: :param scheme_name: 要删除的方案名称 """ try: - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 使用参数化查询删除方案记录 @@ -957,7 +958,7 @@ def query_scheme_list(name: str) -> list: """ try: # 动态替换数据库名称 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -982,7 +983,7 @@ def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str): """ try: # 动态连接到指定的数据库 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: # 读取 Shapefile 文件 gdf = gpd.read_file(shp_file_path) @@ -1018,7 +1019,7 @@ def submit_risk_probability_result(name: str, result_file_path: str) -> None: try: # 动态替换数据库名称 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) # 连接到 PostgreSQL 数据库 with psycopg.connect(conn_string) as conn: @@ -1072,7 +1073,7 @@ def pressure_sensor_placement_sensitivity(name: str, scheme_name: str, sensor_nu """ sensor_location = sensitivity.get_ID(name=name, sensor_num=sensor_number, min_diameter=min_diameter) try: - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: sql = """ @@ -1104,7 +1105,7 @@ def pressure_sensor_placement_kmeans(name: str, scheme_name: str, sensor_number: dump_inp(name,inp_name,'2') sensor_location = api_ex.kmeans_sensor.kmeans_sensor_placement(name=name, sensor_num=sensor_number, min_diameter=min_diameter) try: - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: sql = """ diff --git a/postgresql_info.py b/postgresql_info.py new file mode 100644 index 0000000..5da5122 --- /dev/null +++ b/postgresql_info.py @@ -0,0 +1,15 @@ +from dotenv import load_dotenv +import os + +load_dotenv() + +pg_name = os.getenv("DB_NAME") or "TJWater" +pg_host = os.getenv("DB_HOST") or "127.0.0.1" +pg_user = os.getenv("DB_USER") or "tjwater" +pg_password = os.getenv("DB_PASSWORD") or "Tjwater@123456" + + +def get_pgconn_string( + db_name=pg_name, db_host=pg_host, db_user=pg_user, db_password=pg_password +): + return f"dbname={db_name} host={db_host} user={db_user} password={db_password}" diff --git a/simulation.py b/simulation.py index 6d6efff..ff92ea6 100644 --- a/simulation.py +++ b/simulation.py @@ -19,6 +19,7 @@ import logging import globals import uuid import project_info +from postgresql_info import get_pgconn_string logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') @@ -30,7 +31,7 @@ def query_corresponding_element_id_and_query_id(name: str) -> None: :return: """ # 连接数据库 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -73,7 +74,7 @@ def query_corresponding_pattern_id_and_query_id(name: str) -> None: :return: """ # 连接数据库 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -107,7 +108,7 @@ def query_non_realtime_region(name: str) -> dict: """ source_outflow_regions = [] # 用于存储所有 region(包含重复的) # 构建连接字符串 - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: # 连接到数据库 with psycopg.connect(conn_string) as conn: @@ -165,7 +166,7 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c """ globals.non_realtime_region_patterns = {region: [] for region in globals.source_outflow_region.keys()} region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()} - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -226,7 +227,7 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi globals.realtime_region_pipe_flow_and_demand_id = {region: [] for region in globals.source_outflow_region.keys()} # 创建一个映射,从 frozenset(ids) 到 region_key region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()} - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -283,7 +284,7 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_ :param column_prefix: 需要提取的列的前缀 :return: pipe_flow_region_patterns 字典 """ - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -334,7 +335,7 @@ def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict: :param SCADA_ID: SCADA设备的ID :return: 包含associated_element_id和api_query_id的字典 """ - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: # 使用 psycopg.connect 创建连接 with psycopg.connect(conn_string) as conn: @@ -383,7 +384,7 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict, if not all_ids: logging.warning("No associated_source_outflow_id found in source_outflow_region.") return globals.source_outflow_region_id - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -431,7 +432,7 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real globals.source_outflow_region_patterns = {region: [] for region in globals.source_outflow_region_id.keys()} globals.realtime_region_pipe_flow_and_demand_patterns = {region: [] for region in globals.realtime_region_pipe_flow_and_demand_id.keys()} - conn_string = f"dbname={name} host=127.0.0.1" + conn_string = get_pgconn_string(db_name=name) try: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: