Files
TJWaterServerBinary/app/algorithms/valve_isolation.py

87 lines
2.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from collections import defaultdict, deque
from typing import Any
from app.services.tjnetwork import (
get_link_properties,
get_link_type,
get_network_link_nodes,
is_link,
is_node,
)
VALVE_LINK_TYPE = "valve"
def _parse_link_entry(link_entry: str) -> tuple[str, str, str, str]:
parts = link_entry.split(":", 3)
if len(parts) != 4:
raise ValueError(f"Invalid link entry format: {link_entry}")
return parts[0], parts[1], parts[2], parts[3]
def valve_isolation_analysis(network: str, accident_element: str) -> dict[str, Any]:
"""
关阀搜索/分析:基于拓扑结构确定事故隔离所需关阀。
:param network: 模型名称
:param accident_element: 事故点(节点或管道/泵/阀门ID
:return: dict包含受影响节点、必须关闭阀门、可选阀门等信息
"""
if is_node(network, accident_element):
start_nodes = {accident_element}
accident_type = "node"
elif is_link(network, accident_element):
accident_type = get_link_type(network, accident_element)
link_props = get_link_properties(network, accident_element)
node1 = link_props.get("node1")
node2 = link_props.get("node2")
if not node1 or not node2:
raise ValueError("Accident link missing node endpoints")
start_nodes = {node1, node2}
else:
raise ValueError("Accident element not found")
adjacency: dict[str, set[str]] = defaultdict(set)
valve_links: dict[str, tuple[str, str]] = {}
for link_entry in get_network_link_nodes(network):
link_id, link_type, node1, node2 = _parse_link_entry(link_entry)
link_type_name = str(link_type).lower()
if link_type_name == VALVE_LINK_TYPE:
valve_links[link_id] = (node1, node2)
continue
adjacency[node1].add(node2)
adjacency[node2].add(node1)
affected_nodes: set[str] = set()
queue = deque(start_nodes)
while queue:
node = queue.popleft()
if node in affected_nodes:
continue
affected_nodes.add(node)
for neighbor in adjacency.get(node, []):
if neighbor not in affected_nodes:
queue.append(neighbor)
must_close_valves: list[str] = []
optional_valves: list[str] = []
for valve_id, (node1, node2) in valve_links.items():
in_node1 = node1 in affected_nodes
in_node2 = node2 in affected_nodes
if in_node1 and in_node2:
optional_valves.append(valve_id)
elif in_node1 or in_node2:
must_close_valves.append(valve_id)
must_close_valves.sort()
optional_valves.sort()
return {
"accident_element": accident_element,
"accident_type": accident_type,
"affected_nodes": sorted(affected_nodes),
"must_close_valves": must_close_valves,
"optional_valves": optional_valves,
"isolatable": len(must_close_valves) > 0,
}