补全 realtime scheme 中的复合 存储、查询方法

This commit is contained in:
JIANG
2025-12-05 10:52:04 +08:00
parent ef2ad7e107
commit 03e2fb9fd8
7 changed files with 910 additions and 275 deletions

View File

@@ -3,7 +3,7 @@ from contextlib import asynccontextmanager
from typing import AsyncGenerator, Dict, Optional from typing import AsyncGenerator, Dict, Optional
import psycopg_pool import psycopg_pool
from psycopg.rows import dict_row from psycopg.rows import dict_row
import postgresql_info import api.postgresql_info as postgresql_info
# Configure logging # Configure logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,7 +27,9 @@ class Database:
open=False, # Don't open immediately, wait for startup open=False, # Don't open immediately, wait for startup
kwargs={"row_factory": dict_row}, # Return rows as dictionaries kwargs={"row_factory": dict_row}, # Return rows as dictionaries
) )
logger.info(f"PostgreSQL connection pool initialized for database: {target_db_name or 'default'}") logger.info(
f"PostgreSQL connection pool initialized for database: {target_db_name or 'default'}"
)
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize postgresql connection pool: {e}") logger.error(f"Failed to initialize postgresql connection pool: {e}")
raise raise
@@ -58,15 +60,17 @@ db = Database()
# 缓存不同数据库的实例 - 避免重复创建连接池 # 缓存不同数据库的实例 - 避免重复创建连接池
_database_instances: Dict[str, Database] = {} _database_instances: Dict[str, Database] = {}
def create_database_instance(db_name): def create_database_instance(db_name):
"""Create a new Database instance for a specific database.""" """Create a new Database instance for a specific database."""
return Database(db_name=db_name) return Database(db_name=db_name)
async def get_database_instance(db_name: Optional[str] = None) -> Database: async def get_database_instance(db_name: Optional[str] = None) -> Database:
"""Get or create a database instance for the specified database name.""" """Get or create a database instance for the specified database name."""
if not db_name: if not db_name:
return db # 返回默认数据库实例 return db # 返回默认数据库实例
if db_name not in _database_instances: if db_name not in _database_instances:
# 创建新的数据库实例 # 创建新的数据库实例
instance = create_database_instance(db_name) instance = create_database_instance(db_name)
@@ -74,14 +78,16 @@ async def get_database_instance(db_name: Optional[str] = None) -> Database:
await instance.open() await instance.open()
_database_instances[db_name] = instance _database_instances[db_name] = instance
logger.info(f"Created new database instance for: {db_name}") logger.info(f"Created new database instance for: {db_name}")
return _database_instances[db_name] return _database_instances[db_name]
async def get_db_connection(): async def get_db_connection():
"""Dependency for FastAPI to get a database connection.""" """Dependency for FastAPI to get a database connection."""
async with db.get_connection() as conn: async with db.get_connection() as conn:
yield conn yield conn
async def get_database_connection(db_name: Optional[str] = None): async def get_database_connection(db_name: Optional[str] = None):
""" """
FastAPI dependency to get database connection with optional database name. FastAPI dependency to get database connection with optional database name.
@@ -92,13 +98,14 @@ async def get_database_connection(db_name: Optional[str] = None):
async with instance.get_connection() as conn: async with instance.get_connection() as conn:
yield conn yield conn
async def cleanup_database_instances(): async def cleanup_database_instances():
"""Clean up all database instances (call this on application shutdown).""" """Clean up all database instances (call this on application shutdown)."""
for db_name, instance in _database_instances.items(): for db_name, instance in _database_instances.items():
await instance.close() await instance.close()
logger.info(f"Closed database instance for: {db_name}") logger.info(f"Closed database instance for: {db_name}")
_database_instances.clear() _database_instances.clear()
# 关闭默认数据库 # 关闭默认数据库
await db.close() await db.close()
logger.info("All database instances cleaned up.") logger.info("All database instances cleaned up.")

View File

@@ -1,10 +1,9 @@
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional from typing import Optional
from datetime import datetime
from psycopg import AsyncConnection from psycopg import AsyncConnection
from .database import get_database_instance from .database import get_database_instance
from .scada_info import query_pg_scada_info from .scada_info import ScadaRepository
router = APIRouter(prefix="/postgresql", tags=["postgresql"]) router = APIRouter(prefix="/postgresql", tags=["postgresql"])
@@ -26,15 +25,13 @@ async def get_scada_info_with_connection(
conn: AsyncConnection = Depends(get_database_connection), conn: AsyncConnection = Depends(get_database_connection),
): ):
""" """
使用连接池查询SCADA信息 使用连接池查询所有SCADA信息
""" """
try: try:
# 使用连接查询SCADA信息 # 使用ScadaRepository查询SCADA信息
async with conn.cursor() as cur: scada_data = await ScadaRepository.get_scadas_info(conn)
await cur.execute("SELECT * FROM scada_info")
scada_data = await cur.fetchall()
return {"success": True, "data": scada_data, "count": len(scada_data)} return {"success": True, "data": scada_data, "count": len(scada_data)}
except Exception as e: except Exception as e:
raise HTTPException( raise HTTPException(
status_code=500, detail=f"查询SCADA信息时发生错误: {str(e)}" status_code=500, detail=f"查询SCADA信息时发生错误: {str(e)}"
) )

File diff suppressed because it is too large Load Diff

View File

@@ -227,7 +227,7 @@ async def delete_scheme_nodes(
async def insert_scada_data( async def insert_scada_data(
data: List[dict], conn: AsyncConnection = Depends(get_database_connection) data: List[dict], conn: AsyncConnection = Depends(get_database_connection)
): ):
await ScadaRepository.insert_batch(conn, data) await ScadaRepository.insert_scada_batch(conn, data)
return {"message": f"Inserted {len(data)} records"} return {"message": f"Inserted {len(data)} records"}
@@ -238,18 +238,23 @@ async def get_scada_data(
end_time: datetime, end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection), conn: AsyncConnection = Depends(get_database_connection),
): ):
return await ScadaRepository.get_data_by_time(conn, device_id, start_time, end_time) return await ScadaRepository.get_scada_by_id_time_range(
conn, device_id, start_time, end_time
)
@router.get("/scada/{device_id}/field") @router.get("/scada/{device_id}/field")
async def get_scada_field( async def get_scada_field(
device_id: str, device_id: str,
time: datetime, start_time: datetime,
end_time: datetime,
field: str, field: str,
conn: AsyncConnection = Depends(get_database_connection), conn: AsyncConnection = Depends(get_database_connection),
): ):
try: try:
return await ScadaRepository.get_field(conn, time, device_id, field) return await ScadaRepository.get_scada_field_by_id_time_range(
conn, device_id, start_time, end_time, field
)
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@@ -263,7 +268,7 @@ async def update_scada_field(
conn: AsyncConnection = Depends(get_database_connection), conn: AsyncConnection = Depends(get_database_connection),
): ):
try: try:
await ScadaRepository.update_field(conn, time, device_id, field, value) await ScadaRepository.update_scada_field(conn, time, device_id, field, value)
return {"message": "Updated successfully"} return {"message": "Updated successfully"}
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
@@ -276,5 +281,5 @@ async def delete_scada_data(
end_time: datetime, end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection), conn: AsyncConnection = Depends(get_database_connection),
): ):
await ScadaRepository.delete_data_by_time(conn, device_id, start_time, end_time) await ScadaRepository.delete_scada_by_id_time(conn, device_id, start_time, end_time)
return {"message": "Deleted successfully"} return {"message": "Deleted successfully"}

View File

@@ -1,5 +1,5 @@
from typing import List, Any, Optional from typing import List, Any, Dict
from datetime import datetime from datetime import datetime, timedelta
from psycopg import AsyncConnection, sql from psycopg import AsyncConnection, sql
@@ -266,3 +266,150 @@ class RealtimeRepository:
"DELETE FROM realtime.node_simulation WHERE time >= %s AND time <= %s", "DELETE FROM realtime.node_simulation WHERE time >= %s AND time <= %s",
(start_time, end_time), (start_time, end_time),
) )
# --- 复合查询 ---
@staticmethod
async def store_realtime_simulation_result(
conn: AsyncConnection,
node_result_list: List[Dict[str, any]],
link_result_list: List[Dict[str, any]],
result_start_time: str,
):
"""
Store realtime simulation results to TimescaleDB.
Args:
conn: Database connection
node_result_list: List of node simulation results
link_result_list: List of link simulation results
result_start_time: Start time for the results (ISO format string)
"""
# Convert result_start_time string to datetime if needed
if isinstance(result_start_time, str):
simulation_time = datetime.fromisoformat(
result_start_time.replace("Z", "+00:00")
)
else:
simulation_time = result_start_time
# Prepare node data for batch insert
node_data = []
for node_result in node_result_list:
node_data.append(
{
"time": simulation_time,
"id": node_result.get("id"),
"actual_demand": node_result.get("actual_demand"),
"total_head": node_result.get("total_head"),
"pressure": node_result.get("pressure"),
"quality": node_result.get("quality"),
}
)
# Prepare link data for batch insert
link_data = []
for link_result in link_result_list:
link_data.append(
{
"time": simulation_time,
"id": link_result.get("id"),
"flow": link_result.get("flow"),
"friction": link_result.get("friction"),
"headloss": link_result.get("headloss"),
"quality": link_result.get("quality"),
"reaction": link_result.get("reaction"),
"setting": link_result.get("setting"),
"status": link_result.get("status"),
"velocity": link_result.get("velocity"),
}
)
# Insert data using batch methods
if node_data:
await RealtimeRepository.insert_nodes_batch(conn, node_data)
if link_data:
await RealtimeRepository.insert_links_batch(conn, link_data)
@staticmethod
async def query_all_record_by_time_property(
conn: AsyncConnection,
query_time: str,
type: str,
property: str,
) -> list:
"""
Query all records by time and property from TimescaleDB.
Args:
conn: Database connection
query_time: Time to query (ISO format string)
type: Type of data ("node" or "link")
property: Property/field to query
Returns:
List of records matching the criteria
"""
# Convert query_time string to datetime
if isinstance(query_time, str):
target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00"))
else:
target_time = query_time
# Create time range: query_time ± 1 second
start_time = target_time - timedelta(seconds=1)
end_time = target_time + timedelta(seconds=1)
# Query based on type
if type.lower() == "node":
return await RealtimeRepository.get_nodes_field_by_time_range(
conn, start_time, end_time, property
)
elif type.lower() == "link":
return await RealtimeRepository.get_links_field_by_time_range(
conn, start_time, end_time, property
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")
@staticmethod
async def query_simulation_result_by_ID_time(
conn: AsyncConnection,
ID: str,
type: str,
query_time: str,
) -> list[dict]:
"""
Query simulation results by ID and time from TimescaleDB.
Args:
conn: Database connection
ID: The ID of the node or link
type: Type of data ("node" or "link")
query_time: Time to query (ISO format string)
Returns:
List of records matching the criteria
"""
# Convert query_time string to datetime
if isinstance(query_time, str):
target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00"))
else:
target_time = query_time
# Create time range: query_time ± 1 second
start_time = target_time - timedelta(seconds=1)
end_time = target_time + timedelta(seconds=1)
# Query based on type
if type.lower() == "node":
return await RealtimeRepository.get_node_by_time_range(
conn, start_time, end_time, ID
)
elif type.lower() == "link":
return await RealtimeRepository.get_link_by_time_range(
conn, start_time, end_time, ID
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")

View File

@@ -25,7 +25,7 @@ class ScadaRepository:
) )
@staticmethod @staticmethod
async def get_scada_by_id_time( async def get_scada_by_id_time_range(
conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime
) -> List[dict]: ) -> List[dict]:
async with conn.cursor() as cur: async with conn.cursor() as cur:
@@ -36,19 +36,23 @@ class ScadaRepository:
return await cur.fetchall() return await cur.fetchall()
@staticmethod @staticmethod
async def get_scada_field_by_id_time( async def get_scada_field_by_id_time_range(
conn: AsyncConnection, time: datetime, device_id: str, field: str conn: AsyncConnection,
device_id: str,
start_time: datetime,
end_time: datetime,
field: str,
) -> Any: ) -> Any:
valid_fields = {"monitored_value", "cleaned_value"} valid_fields = {"monitored_value", "cleaned_value"}
if field not in valid_fields: if field not in valid_fields:
raise ValueError(f"Invalid field: {field}") raise ValueError(f"Invalid field: {field}")
query = sql.SQL( query = sql.SQL(
"SELECT {} FROM scada.scada_data WHERE time = %s AND device_id = %s" "SELECT {} FROM scada.scada_data WHERE time >= %s AND time <= %s AND device_id = %s"
).format(sql.Identifier(field)) ).format(sql.Identifier(field))
async with conn.cursor() as cur: async with conn.cursor() as cur:
await cur.execute(query, (time, device_id)) await cur.execute(query, (start_time, end_time, device_id))
row = await cur.fetchone() row = await cur.fetchone()
return row[field] if row else None return row[field] if row else None
@@ -68,7 +72,7 @@ class ScadaRepository:
await cur.execute(query, (value, time, device_id)) await cur.execute(query, (value, time, device_id))
@staticmethod @staticmethod
async def delete_scada_by_id_time( async def delete_scada_by_id_time_range(
conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime
): ):
async with conn.cursor() as cur: async with conn.cursor() as cur:

View File

@@ -1,5 +1,5 @@
from typing import List, Any, Optional from typing import List, Any, Dict
from datetime import datetime from datetime import datetime, timedelta
from psycopg import AsyncConnection, sql from psycopg import AsyncConnection, sql
@@ -286,3 +286,158 @@ class SchemeRepository:
"DELETE FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s", "DELETE FROM scheme.node_simulation WHERE scheme = %s AND time >= %s AND time <= %s",
(scheme, start_time, end_time), (scheme, start_time, end_time),
) )
# --- 复合查询 ---
@staticmethod
async def store_scheme_simulation_result(
conn: AsyncConnection,
scheme: str,
node_result_list: List[Dict[str, any]],
link_result_list: List[Dict[str, any]],
result_start_time: str,
):
"""
Store scheme simulation results to TimescaleDB.
Args:
conn: Database connection
scheme: Scheme name
node_result_list: List of node simulation results
link_result_list: List of link simulation results
result_start_time: Start time for the results (ISO format string)
"""
# Convert result_start_time string to datetime if needed
if isinstance(result_start_time, str):
simulation_time = datetime.fromisoformat(
result_start_time.replace("Z", "+00:00")
)
else:
simulation_time = result_start_time
# Prepare node data for batch insert
node_data = []
for node_result in node_result_list:
node_data.append(
{
"time": simulation_time,
"scheme": scheme,
"id": node_result.get("id"),
"actual_demand": node_result.get("actual_demand"),
"total_head": node_result.get("total_head"),
"pressure": node_result.get("pressure"),
"quality": node_result.get("quality"),
}
)
# Prepare link data for batch insert
link_data = []
for link_result in link_result_list:
link_data.append(
{
"time": simulation_time,
"scheme": scheme,
"id": link_result.get("id"),
"flow": link_result.get("flow"),
"friction": link_result.get("friction"),
"headloss": link_result.get("headloss"),
"quality": link_result.get("quality"),
"reaction": link_result.get("reaction"),
"setting": link_result.get("setting"),
"status": link_result.get("status"),
"velocity": link_result.get("velocity"),
}
)
# Insert data using batch methods
if node_data:
await SchemeRepository.insert_nodes_batch(conn, node_data)
if link_data:
await SchemeRepository.insert_links_batch(conn, link_data)
@staticmethod
async def query_all_record_by_scheme_time_property(
conn: AsyncConnection,
scheme: str,
query_time: str,
type: str,
property: str,
) -> list:
"""
Query all records by scheme, time and property from TimescaleDB.
Args:
conn: Database connection
scheme: Scheme name
query_time: Time to query (ISO format string)
type: Type of data ("node" or "link")
property: Property/field to query
Returns:
List of records matching the criteria
"""
# Convert query_time string to datetime
if isinstance(query_time, str):
target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00"))
else:
target_time = query_time
# Create time range: query_time ± 1 second
start_time = target_time - timedelta(seconds=1)
end_time = target_time + timedelta(seconds=1)
# Query based on type
if type.lower() == "node":
return await SchemeRepository.get_nodes_field_by_scheme_and_time_range(
conn, scheme, start_time, end_time, property
)
elif type.lower() == "link":
return await SchemeRepository.get_links_field_by_scheme_and_time_range(
conn, scheme, start_time, end_time, property
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")
@staticmethod
async def query_scheme_simulation_result_by_ID_time(
conn: AsyncConnection,
scheme: str,
ID: str,
type: str,
query_time: str,
) -> list[dict]:
"""
Query scheme simulation results by ID and time from TimescaleDB.
Args:
conn: Database connection
scheme: Scheme name
ID: The ID of the node or link
type: Type of data ("node" or "link")
query_time: Time to query (ISO format string)
Returns:
List of records matching the criteria
"""
# Convert query_time string to datetime
if isinstance(query_time, str):
target_time = datetime.fromisoformat(query_time.replace("Z", "+00:00"))
else:
target_time = query_time
# Create time range: query_time ± 1 second
start_time = target_time - timedelta(seconds=1)
end_time = target_time + timedelta(seconds=1)
# Query based on type
if type.lower() == "node":
return await SchemeRepository.get_node_by_scheme_and_time_range(
conn, scheme, start_time, end_time, ID
)
elif type.lower() == "link":
return await SchemeRepository.get_link_by_scheme_and_time_range(
conn, scheme, start_time, end_time, ID
)
else:
raise ValueError(f"Invalid type: {type}. Must be 'node' or 'link'")