更新方法以支持多id查询

This commit is contained in:
JIANG
2025-12-10 15:28:38 +08:00
parent 8a9345dfcc
commit 6582bf8879
2 changed files with 103 additions and 78 deletions

View File

@@ -17,133 +17,140 @@ class CompositeQueries:
async def get_scada_associated_realtime_simulation_data(
timescale_conn: AsyncConnection,
postgres_conn: AsyncConnection,
device_id: str,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
field: str,
) -> Optional[Any]:
) -> List[Optional[Any]]:
"""
获取 SCADA 关联的 link/node 模拟值
根据传入的 SCADA device_id找到关联的 link/node
根据传入的 SCADA device_ids,找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
postgres_conn: PostgreSQL 异步连接
device_id: SCADA 设备ID
device_ids: SCADA 设备ID列表
start_time: 开始时间
end_time: 结束时间
field: 要查询的字段名
Returns:
模拟数据值,如果没有找到则返回 None
模拟数据值列表,如果没有找到则对应位置返回 None
Raises:
ValueError: 当 SCADA 设备未找到或字段无效时
"""
results = []
# 1. 查询所有 SCADA 信息
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
# 2. 根据 device_id 找到对应的 SCADA 信息
target_scada = None
for scada in scada_infos:
if scada["id"] == device_id:
target_scada = scada
break
for device_id in device_ids:
# 2. 根据 device_id 找到对应的 SCADA 信息
target_scada = None
for scada in scada_infos:
if scada["id"] == device_id:
target_scada = scada
break
if not target_scada:
raise ValueError(f"SCADA device {device_id} not found")
if not target_scada:
raise ValueError(f"SCADA device {device_id} not found")
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
element_id = target_scada["associated_element_id"]
scada_type = target_scada["type"]
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
element_id = target_scada["associated_element_id"]
scada_type = target_scada["type"]
if scada_type.lower() == "pipe_flow":
# 查询 link 模拟数据
return await RealtimeRepository.get_link_field_by_time_range(
timescale_conn, start_time, end_time, element_id, field
)
elif scada_type.lower() == "pressure":
# 查询 node 模拟数据
return await RealtimeRepository.get_node_field_by_time_range(
timescale_conn, start_time, end_time, element_id, field
)
else:
raise ValueError(f"Unknown SCADA type: {scada_type}")
if scada_type.lower() == "pipe_flow":
# 查询 link 模拟数据
res = await RealtimeRepository.get_link_field_by_time_range(
timescale_conn, start_time, end_time, element_id, field
)
elif scada_type.lower() == "pressure":
# 查询 node 模拟数据
res = await RealtimeRepository.get_node_field_by_time_range(
timescale_conn, start_time, end_time, element_id, field
)
else:
raise ValueError(f"Unknown SCADA type: {scada_type}")
results.append(res)
return results
@staticmethod
async def get_scada_associated_scheme_simulation_data(
timescale_conn: AsyncConnection,
postgres_conn: AsyncConnection,
device_id: str,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
scheme_type: str,
scheme_name: str,
field: str,
) -> Optional[Any]:
) -> List[Optional[Any]]:
"""
获取 SCADA 关联的 link/node 模拟值
根据传入的 SCADA device_id找到关联的 link/node
根据传入的 SCADA device_ids,找到关联的 link/node
并根据对应的 type查询对应的模拟数据
Args:
timescale_conn: TimescaleDB 异步连接
postgres_conn: PostgreSQL 异步连接
device_id: SCADA 设备ID
device_ids: SCADA 设备ID列表
start_time: 开始时间
end_time: 结束时间
field: 要查询的字段名
Returns:
模拟数据值,如果没有找到则返回 None
模拟数据值列表,如果没有找到则对应位置返回 None
Raises:
ValueError: 当 SCADA 设备未找到或字段无效时
"""
results = []
# 1. 查询所有 SCADA 信息
scada_infos = await PostgreScadaRepository.get_scadas(postgres_conn)
# 2. 根据 device_id 找到对应的 SCADA 信息
target_scada = None
for scada in scada_infos:
if scada["id"] == device_id:
target_scada = scada
break
for device_id in device_ids:
# 2. 根据 device_id 找到对应的 SCADA 信息
target_scada = None
for scada in scada_infos:
if scada["id"] == device_id:
target_scada = scada
break
if not target_scada:
raise ValueError(f"SCADA device {device_id} not found")
if not target_scada:
raise ValueError(f"SCADA device {device_id} not found")
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
element_id = target_scada["associated_element_id"]
scada_type = target_scada["type"]
# 3. 根据 type 和 associated_element_id 查询对应的模拟数据
element_id = target_scada["associated_element_id"]
scada_type = target_scada["type"]
if scada_type.lower() == "pipe_flow":
# 查询 link 模拟数据
return await SchemeRepository.get_link_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
element_id,
field,
)
elif scada_type.lower() == "pressure":
# 查询 node 模拟数据
return await SchemeRepository.get_node_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
element_id,
field,
)
else:
raise ValueError(f"Unknown SCADA type: {scada_type}")
if scada_type.lower() == "pipe_flow":
# 查询 link 模拟数据
res = await SchemeRepository.get_link_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
element_id,
"flow",
)
elif scada_type.lower() == "pressure":
# 查询 node 模拟数据
res = await SchemeRepository.get_node_field_by_scheme_and_time_range(
timescale_conn,
scheme_type,
scheme_name,
start_time,
end_time,
element_id,
"pressure",
)
else:
raise ValueError(f"Unknown SCADA type: {scada_type}")
results.append(res)
return results
@staticmethod
async def get_element_associated_scada_data(

View File

@@ -361,7 +361,7 @@ async def insert_scada_data(
return {"message": f"Inserted {len(data)} records"}
@router.get("/scada")
@router.get("/scada/by-ids-time-range")
async def get_scada_by_ids_time_range(
device_ids: List[str],
start_time: datetime,
@@ -373,7 +373,7 @@ async def get_scada_by_ids_time_range(
)
@router.get("/scada/field")
@router.get("/scada/by-ids-field-time-range")
async def get_scada_field_by_ids_time_range(
device_ids: List[str],
start_time: datetime,
@@ -404,7 +404,7 @@ async def update_scada_field(
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/scada")
@router.delete("/scada/by-id-time-range")
async def delete_scada_data(
device_id: str,
start_time: datetime,
@@ -422,23 +422,41 @@ async def delete_scada_data(
@router.get("/composite/scada-simulation")
async def get_scada_associated_simulation_data(
device_id: str,
device_ids: List[str],
start_time: datetime,
end_time: datetime,
field: str,
scheme_type: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
scheme_name: str = Query(None, description="指定方案名称,若为空则查询实时数据"),
timescale_conn: AsyncConnection = Depends(get_database_connection),
postgres_conn: AsyncConnection = Depends(get_postgres_connection),
):
"""
获取 SCADA 关联的 link/node 模拟值
根据传入的 SCADA device_id找到关联的 link/node
根据传入的 SCADA device_ids,找到关联的 link/node
并根据对应的 type查询对应的模拟数据
"""
try:
result = await CompositeQueries.get_scada_associated_simulation_data(
timescale_conn, postgres_conn, device_id, start_time, end_time, field
)
if scheme_type and scheme_name:
result = await CompositeQueries.get_scada_associated_scheme_simulation_data(
timescale_conn,
postgres_conn,
device_ids,
start_time,
end_time,
scheme_type,
scheme_name,
)
else:
result = (
await CompositeQueries.get_scada_associated_realtime_simulation_data(
timescale_conn,
postgres_conn,
device_ids,
start_time,
end_time,
)
)
if result is None:
raise HTTPException(status_code=404, detail="No simulation data found")
return result