重构时序数据库连接逻辑,移除冗余代码

This commit is contained in:
2026-03-13 16:18:11 +08:00
parent c137adedad
commit 1673396e1a
5 changed files with 50 additions and 53 deletions
+34
View File
@@ -116,3 +116,37 @@ def get_pg_config() -> dict:
def get_pg_password() -> str:
"""Return PostgreSQL password (use with care)."""
return settings.DB_PASSWORD
def get_timescaledb_pgconn_string(
db_name: Optional[str] = None,
db_host: Optional[str] = None,
db_port: Optional[str] = None,
db_user: Optional[str] = None,
db_password: Optional[str] = None,
) -> str:
"""Return TimescaleDB connection string in psycopg conninfo format."""
resolved_db_name = db_name or settings.TIMESCALEDB_DB_NAME
resolved_db_host = db_host or settings.TIMESCALEDB_DB_HOST
resolved_db_port = db_port or settings.TIMESCALEDB_DB_PORT
resolved_db_user = db_user or settings.TIMESCALEDB_DB_USER
resolved_db_password = db_password or settings.TIMESCALEDB_DB_PASSWORD
return (
f"dbname={resolved_db_name} host={resolved_db_host} port={resolved_db_port} "
f"user={resolved_db_user} password={resolved_db_password}"
)
def get_timescaledb_pg_config() -> dict:
"""Return TimescaleDB configuration except password."""
return {
"name": settings.TIMESCALEDB_DB_NAME,
"host": settings.TIMESCALEDB_DB_HOST,
"port": settings.TIMESCALEDB_DB_PORT,
"user": settings.TIMESCALEDB_DB_USER,
}
def get_timescaledb_pg_password() -> str:
"""Return TimescaleDB password (use with care)."""
return settings.TIMESCALEDB_DB_PASSWORD
-1
View File
@@ -1,3 +1,2 @@
from .database import *
from .timescaledb_info import *
from .composite_queries import CompositeQueries
+5 -5
View File
@@ -3,7 +3,7 @@ from contextlib import asynccontextmanager
from typing import AsyncGenerator, Dict, Optional
import psycopg_pool
from psycopg.rows import dict_row
import app.infra.db.timescaledb.timescaledb_info as timescaledb_info
from app.core.config import get_timescaledb_pgconn_string
# Configure logging
logger = logging.getLogger(__name__)
@@ -21,9 +21,9 @@ class Database:
# Get connection string, handling default case where target_db_name might be None
if target_db_name:
conn_string = timescaledb_info.get_pgconn_string(db_name=target_db_name)
conn_string = get_timescaledb_pgconn_string(db_name=target_db_name)
else:
conn_string = timescaledb_info.get_pgconn_string()
conn_string = get_timescaledb_pgconn_string()
try:
self.pool = psycopg_pool.AsyncConnectionPool(
@@ -54,8 +54,8 @@ class Database:
"""Get the TimescaleDB connection string."""
target_db_name = db_name or self.db_name
if target_db_name:
return timescaledb_info.get_pgconn_string(db_name=target_db_name)
return timescaledb_info.get_pgconn_string()
return get_timescaledb_pgconn_string(db_name=target_db_name)
return get_timescaledb_pgconn_string()
@asynccontextmanager
async def get_connection(self) -> AsyncGenerator:
+11 -11
View File
@@ -6,9 +6,9 @@ import psycopg
from psycopg import sql
from psycopg.rows import dict_row
import time
from app.core.config import get_timescaledb_pgconn_string
from app.infra.db.timescaledb.repositories.scheme import SchemeRepository
from app.infra.db.timescaledb.repositories.realtime import RealtimeRepository
import app.infra.db.timescaledb.timescaledb_info as timescaledb_info
from app.infra.db.timescaledb.repositories.scada import ScadaRepository
@@ -25,9 +25,9 @@ class InternalStorage:
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
get_timescaledb_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
else get_timescaledb_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
RealtimeRepository.store_realtime_simulation_result_sync(
@@ -56,9 +56,9 @@ class InternalStorage:
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
get_timescaledb_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
else get_timescaledb_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
SchemeRepository.store_scheme_simulation_result_sync(
@@ -97,9 +97,9 @@ class InternalQueries:
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
get_timescaledb_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
else get_timescaledb_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
rows = ScadaRepository.get_scada_by_ids_time_range_sync(
@@ -144,9 +144,9 @@ class InternalQueries:
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
get_timescaledb_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
else get_timescaledb_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
rows = ScadaRepository.get_scada_by_ids_time_range_sync(
@@ -257,9 +257,9 @@ class InternalQueries:
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
get_timescaledb_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
else get_timescaledb_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
with conn.cursor(row_factory=dict_row) as cur:
@@ -1,36 +0,0 @@
from dotenv import load_dotenv
import os
load_dotenv()
pg_name = os.getenv("TIMESCALEDB_DB_NAME")
pg_host = os.getenv("TIMESCALEDB_DB_HOST")
pg_port = os.getenv("TIMESCALEDB_DB_PORT")
pg_user = os.getenv("TIMESCALEDB_DB_USER")
pg_password = os.getenv("TIMESCALEDB_DB_PASSWORD")
def get_pgconn_string(
db_name=pg_name,
db_host=pg_host,
db_port=pg_port,
db_user=pg_user,
db_password=pg_password,
):
"""返回 PostgreSQL 连接字符串"""
return f"dbname={db_name} host={db_host} port={db_port} user={db_user} password={db_password}"
def get_pg_config():
"""返回 PostgreSQL 配置变量的字典"""
return {
"name": pg_name,
"host": pg_host,
"port": pg_port,
"user": pg_user,
}
def get_pg_password():
"""返回密码(谨慎使用)"""
return pg_password