新增内部查询方法,替换simulation中scada设备查询方法

This commit is contained in:
JIANG
2025-12-09 15:10:10 +08:00
parent 357bacbf8a
commit 1d55bf4992
5 changed files with 102 additions and 42 deletions

Binary file not shown.

View File

@@ -774,8 +774,8 @@ def run_simulation(
if globals.reservoirs_id: if globals.reservoirs_id:
# reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'} # reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'}
# 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'} # 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'}
reservoir_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( reservoir_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=list(globals.reservoirs_id.values()), device_ids=list(globals.reservoirs_id.values()),
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
# 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'} # 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'}
@@ -798,8 +798,8 @@ def run_simulation(
set_pattern(name_c, cs) set_pattern(name_c, cs)
if globals.tanks_id: if globals.tanks_id:
# 修改tank初始液位 # 修改tank初始液位
tank_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( tank_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=list(globals.tanks_id.values()), device_ids=list(globals.tanks_id.values()),
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
tank_dict = { tank_dict = {
@@ -814,11 +814,9 @@ def run_simulation(
set_tank(name_c, cs) set_tank(name_c, cs)
if globals.fixed_pumps_id: if globals.fixed_pumps_id:
# 修改工频泵的pattern # 修改工频泵的pattern
fixed_pump_SCADA_data_dict = ( fixed_pump_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
influxdb_api.query_SCADA_data_by_device_ID_and_time( device_ids=list(globals.fixed_pumps_id.values()),
query_ids_list=list(globals.fixed_pumps_id.values()), query_time=modify_pattern_start_time,
query_time=modify_pattern_start_time,
)
) )
# print(fixed_pump_SCADA_data_dict) # print(fixed_pump_SCADA_data_dict)
fixed_pump_dict = { fixed_pump_dict = {
@@ -840,8 +838,8 @@ def run_simulation(
if globals.variable_pumps_id: if globals.variable_pumps_id:
# 修改变频泵的pattern # 修改变频泵的pattern
variable_pump_SCADA_data_dict = ( variable_pump_SCADA_data_dict = (
influxdb_api.query_SCADA_data_by_device_ID_and_time( TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=list(globals.variable_pumps_id.values()), device_ids=list(globals.variable_pumps_id.values()),
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
) )
@@ -860,8 +858,8 @@ def run_simulation(
set_pattern(name_c, cs) set_pattern(name_c, cs)
if globals.demand_id: if globals.demand_id:
# 基于实时数据修改大用户节点的pattern # 基于实时数据修改大用户节点的pattern
demand_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( demand_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=list(globals.demand_id.values()), device_ids=list(globals.demand_id.values()),
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
demand_dict = { demand_dict = {
@@ -887,8 +885,8 @@ def run_simulation(
if globals.source_outflow_pattern_id: if globals.source_outflow_pattern_id:
# 基于实时的出厂流量计数据修改出厂流量计绑定的pattern # 基于实时的出厂流量计数据修改出厂流量计绑定的pattern
source_outflow_SCADA_data_dict = ( source_outflow_SCADA_data_dict = (
influxdb_api.query_SCADA_data_by_device_ID_and_time( TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=list(globals.source_outflow_pattern_id.values()), device_ids=list(globals.source_outflow_pattern_id.values()),
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
) )
@@ -925,8 +923,8 @@ def run_simulation(
if globals.realtime_pipe_flow_pattern_id: if globals.realtime_pipe_flow_pattern_id:
# 基于实时的pipe_flow类数据修改pipe_flow类绑定的pattern # 基于实时的pipe_flow类数据修改pipe_flow类绑定的pattern
realtime_pipe_flow_SCADA_data_dict = ( realtime_pipe_flow_SCADA_data_dict = (
influxdb_api.query_SCADA_data_by_device_ID_and_time( TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=list(globals.realtime_pipe_flow_pattern_id.values()), device_ids=list(globals.realtime_pipe_flow_pattern_id.values()),
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
) )
@@ -962,8 +960,8 @@ def run_simulation(
query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region) query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region)
temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id
temp_realtime_pipe_flow_SCADA_data_dict = ( temp_realtime_pipe_flow_SCADA_data_dict = (
influxdb_api.query_SCADA_data_by_device_ID_and_time( TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=list(temp_realtime_pipe_flow_pattern_id.values()), device_ids=list(temp_realtime_pipe_flow_pattern_id.values()),
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
) )
@@ -1020,14 +1018,14 @@ def run_simulation(
globals.non_realtime_region_patterns.get(region, []) globals.non_realtime_region_patterns.get(region, [])
) )
region_source_outflow_data_dict = ( region_source_outflow_data_dict = (
influxdb_api.query_SCADA_data_by_device_ID_and_time( TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=temp_source_outflow_region_id, device_ids=temp_source_outflow_region_id,
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
) )
region_realtime_region_pipe_flow_and_demand_data_dict = ( region_realtime_region_pipe_flow_and_demand_data_dict = (
influxdb_api.query_SCADA_data_by_device_ID_and_time( TimescaleInternalStorage.query_scada_by_ids_time(
query_ids_list=temp_realtime_region_pipe_flow_and_demand_id, device_ids=temp_realtime_region_pipe_flow_and_demand_id,
query_time=modify_pattern_start_time, query_time=modify_pattern_start_time,
) )
) )

View File

@@ -4,6 +4,8 @@ from fastapi.logger import logger
from timescaledb.schemas.scheme import SchemeRepository from timescaledb.schemas.scheme import SchemeRepository
from timescaledb.schemas.realtime import RealtimeRepository from timescaledb.schemas.realtime import RealtimeRepository
import timescaledb.timescaledb_info as timescaledb_info import timescaledb.timescaledb_info as timescaledb_info
from datetime import datetime, timedelta
from timescaledb.schemas.scada import ScadaRepository
import psycopg import psycopg
import time import time
@@ -75,3 +77,46 @@ class InternalStorage:
time.sleep(1) # 重试前等待 time.sleep(1) # 重试前等待
else: else:
raise # 达到最大重试次数后抛出异常 raise # 达到最大重试次数后抛出异常
@staticmethod
def query_scada_by_ids_time(
device_ids: List[str],
query_time: str,
db_name: str = None,
max_retries: int = 3,
) -> dict:
"""查询指定时间点的 SCADA 数据"""
# 解析时间,假设是北京时间
beijing_time = datetime.fromisoformat(query_time)
start_time = beijing_time - timedelta(seconds=1)
end_time = beijing_time + timedelta(seconds=1)
for attempt in range(max_retries):
try:
conn_string = (
timescaledb_info.get_pgconn_string(db_name=db_name)
if db_name
else timescaledb_info.get_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
rows = ScadaRepository.get_scada_by_ids_time_range_sync(
conn, device_ids, start_time, end_time
)
# 处理结果,返回每个 device_id 的第一个值
result = {}
for device_id in device_ids:
device_rows = [
row for row in rows if row["device_id"] == device_id
]
if device_rows:
result[device_id] = device_rows[0]["monitored_value"]
else:
result[device_id] = None
return result
except Exception as e:
logger.error(f"查询尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
raise

View File

@@ -358,20 +358,20 @@ async def insert_scada_data(
@router.get("/scada") @router.get("/scada")
async def get_scada_by_id_time_range( async def get_scada_by_ids_time_range(
device_id: str, device_ids: List[str],
start_time: datetime, start_time: datetime,
end_time: datetime, end_time: datetime,
conn: AsyncConnection = Depends(get_database_connection), conn: AsyncConnection = Depends(get_database_connection),
): ):
return await ScadaRepository.get_scada_by_id_time_range( return await ScadaRepository.get_scada_by_ids_time_range(
conn, device_id, start_time, end_time conn, device_ids, start_time, end_time
) )
@router.get("/scada/{device_id}/field") @router.get("/scada/field")
async def get_scada_field_by_id_time_range( async def get_scada_field_by_ids_time_range(
device_id: str, device_ids: List[str],
start_time: datetime, start_time: datetime,
end_time: datetime, end_time: datetime,
field: str, field: str,
@@ -379,7 +379,7 @@ async def get_scada_field_by_id_time_range(
): ):
try: try:
return await ScadaRepository.get_scada_field_by_id_time_range( return await ScadaRepository.get_scada_field_by_id_time_range(
conn, device_id, start_time, end_time, field conn, device_ids, start_time, end_time, field
) )
except ValueError as e: except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))

View File

@@ -1,6 +1,6 @@
from typing import List, Any from typing import List, Any
from datetime import datetime from datetime import datetime
from psycopg import AsyncConnection, sql from psycopg import AsyncConnection, Connection, sql
class ScadaRepository: class ScadaRepository:
@@ -25,36 +25,53 @@ class ScadaRepository:
) )
@staticmethod @staticmethod
async def get_scada_by_id_time_range( async def get_scada_by_ids_time_range(
conn: AsyncConnection, device_id: str, start_time: datetime, end_time: datetime conn: AsyncConnection,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
) -> List[dict]: ) -> List[dict]:
async with conn.cursor() as cur: async with conn.cursor() as cur:
await cur.execute( await cur.execute(
"SELECT * FROM scada.scada_data WHERE device_id = %s AND time >= %s AND time <= %s", "SELECT * FROM scada.scada_data WHERE device_id = ANY(%s) AND time >= %s AND time <= %s",
(device_id, start_time, end_time), (device_ids, start_time, end_time),
) )
return await cur.fetchall() return await cur.fetchall()
@staticmethod
def get_scada_by_ids_time_range_sync(
conn: Connection,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
) -> List[dict]:
with conn.cursor() as cur:
cur.execute(
"SELECT * FROM scada.scada_data WHERE device_id = ANY(%s) AND time >= %s AND time <= %s",
(device_ids, start_time, end_time),
)
return cur.fetchall()
@staticmethod @staticmethod
async def get_scada_field_by_id_time_range( async def get_scada_field_by_id_time_range(
conn: AsyncConnection, conn: AsyncConnection,
device_id: str, device_ids: List[str],
start_time: datetime, start_time: datetime,
end_time: datetime, end_time: datetime,
field: str, field: str,
) -> Any: ) -> List[dict]:
valid_fields = {"monitored_value", "cleaned_value"} valid_fields = {"monitored_value", "cleaned_value"}
if field not in valid_fields: if field not in valid_fields:
raise ValueError(f"Invalid field: {field}") raise ValueError(f"Invalid field: {field}")
query = sql.SQL( query = sql.SQL(
"SELECT {} FROM scada.scada_data WHERE time >= %s AND time <= %s AND device_id = %s" "SELECT device_id, {} FROM scada.scada_data WHERE time >= %s AND time <= %s AND device_id = ANY(%s)"
).format(sql.Identifier(field)) ).format(sql.Identifier(field))
async with conn.cursor() as cur: async with conn.cursor() as cur:
await cur.execute(query, (start_time, end_time, device_id)) await cur.execute(query, (start_time, end_time, device_ids))
row = await cur.fetchone() rows = await cur.fetchall()
return row[field] if row else None return [{"device_id": row["device_id"], field: row[field]} for row in rows]
@staticmethod @staticmethod
async def update_scada_field( async def update_scada_field(