Files
TJWaterServerBinary/app/algorithms/valve_isolation.py
2026-02-03 11:53:16 +08:00

100 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from collections import defaultdict, deque
from typing import Any
from app.services.tjnetwork import (
get_network_link_nodes,
is_node,
is_link,
get_link_properties,
)
VALVE_LINK_TYPE = "valve"
def _parse_link_entry(link_entry: str) -> tuple[str, str, str, str]:
parts = link_entry.split(":", 3)
if len(parts) != 4:
raise ValueError(f"Invalid link entry format: {link_entry}")
return parts[0], parts[1], parts[2], parts[3]
def valve_isolation_analysis(
network: str, accident_elements: str | list[str]
) -> dict[str, Any]:
"""
关阀搜索/分析:基于拓扑结构确定事故隔离所需关阀。
:param network: 模型名称
:param accident_elements: 事故点(节点或管道/泵/阀门ID可以是单个ID字符串或ID列表
:return: dict包含受影响节点、必须关闭阀门、可选阀门等信息
"""
if isinstance(accident_elements, str):
target_elements = [accident_elements]
else:
target_elements = accident_elements
start_nodes = set()
for element in target_elements:
if is_node(network, element):
start_nodes.add(element)
elif is_link(network, element):
link_props = get_link_properties(network, element)
node1 = link_props.get("node1")
node2 = link_props.get("node2")
if not node1 or not node2:
# 如果是批量处理,可以选择跳过错误或记录错误,这里暂时保持严谨抛出异常
raise ValueError(f"Accident link {element} missing node endpoints")
start_nodes.add(node1)
start_nodes.add(node2)
else:
raise ValueError(f"Accident element {element} not found")
adjacency: dict[str, set[str]] = defaultdict(set)
valve_links: dict[str, tuple[str, str]] = {}
for link_entry in get_network_link_nodes(network):
link_id, link_type, node1, node2 = _parse_link_entry(link_entry)
link_type_name = str(link_type).lower()
if link_type_name == VALVE_LINK_TYPE:
valve_links[link_id] = (node1, node2)
continue
adjacency[node1].add(node2)
adjacency[node2].add(node1)
affected_nodes: set[str] = set()
queue = deque(start_nodes)
while queue:
node = queue.popleft()
if node in affected_nodes:
continue
affected_nodes.add(node)
for neighbor in adjacency.get(node, []):
if neighbor not in affected_nodes:
queue.append(neighbor)
must_close_valves: list[str] = []
optional_valves: list[str] = []
for valve_id, (node1, node2) in valve_links.items():
in_node1 = node1 in affected_nodes
in_node2 = node2 in affected_nodes
if in_node1 and in_node2:
optional_valves.append(valve_id)
elif in_node1 or in_node2:
must_close_valves.append(valve_id)
must_close_valves.sort()
optional_valves.sort()
result = {
"accident_elements": target_elements,
"affected_nodes": sorted(affected_nodes),
"must_close_valves": must_close_valves,
"optional_valves": optional_valves,
"isolatable": len(must_close_valves) > 0,
}
if len(target_elements) == 1:
result["accident_element"] = target_elements[0]
return result