重构爆管定位算法,增加多进程支持与可视化功能

This commit is contained in:
2026-03-08 20:01:21 +08:00
parent a7e3b6aff9
commit 9a4a91c328
5 changed files with 311 additions and 67 deletions
+89 -40
View File
@@ -1,11 +1,14 @@
import argparse
import json
import logging
from multiprocessing import cpu_count
from pathlib import Path
from typing import Any, Iterable
import pandas as pd
from app.algorithms.burst_location import leak_simulator
from .burst_locator import (
DN_search_multi_simple_add_flow_count_new,
)
@@ -18,7 +21,9 @@ from .network_model import (
read_inf_inp_other,
)
DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4))
# DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4))
DEFAULT_N_WORKERS = max(1, cpu_count() - 1)
logger = logging.getLogger(__name__)
def _read_id_list_json(path):
@@ -115,6 +120,10 @@ def run_burst_location(
min_dpressure: float = 2.0,
basic_pressure: float = 10.0,
n_workers: int = DEFAULT_N_WORKERS,
partition_on_full_graph: bool = True,
visualize_partition: bool = True,
visualize_pause_seconds: float = 0.3,
final_candidates_csv_path: str | None = "temp/burst_location/final_round_candidates.csv",
) -> dict[str, Any]:
if pressure_scada_ids is None or len(pressure_scada_ids) == 0:
raise ValueError("pressure_scada_ids cannot be empty.")
@@ -137,14 +146,30 @@ def run_burst_location(
minimum_p=0.0,
)
all_node, _, node_coordinates, candidate_pipe, _, _, pipe_length, _ = read_inf_inp(
wn
(
all_node,
_,
node_coordinates,
all_pipe,
_,
_,
pipe_length,
pipe_diameter,
) = read_inf_inp(wn)
candidate_pipe, _ = leak_simulator.cal_possible_pipe(
burst_leakage, all_pipe, pipe_diameter
)
_, pipe_start_node_all, pipe_end_node_all = read_inf_inp_other(wn)
node_x, node_y = cal_node_coordinate(all_node, node_coordinates)
G0 = construct_graph(wn)
node_pipe_dic, couple_node_length = _build_node_pipe_maps(
all_node, candidate_pipe, pipe_start_node_all, pipe_end_node_all, pipe_length
all_node,
all_pipe,
pipe_start_node_all,
pipe_end_node_all,
pipe_length,
)
all_node_series = pd.Series(range(len(all_node)), index=all_node)
@@ -176,43 +201,59 @@ def run_burst_location(
max_flow = pd.Series(dtype=float)
stage_timing: dict[str, Any] = {}
located_pipe, elapsed_seconds, simulation_times, _, similarity_series = (
DN_search_multi_simple_add_flow_count_new(
wn=wn,
wn_inp_path=str(inp_path),
G0=G0,
all_node=all_node,
node_x=node_x,
node_y=node_y,
pipe_start_node_all=pipe_start_node_all,
pipe_end_node_all=pipe_end_node_all,
couple_node_length=couple_node_length,
node_pipe_dic=node_pipe_dic,
all_node_series=all_node_series,
top_group_ratio=0.3,
top_pipe_num_max=80,
top_pipe_num_min=10,
candidate_pipe_input_initial=candidate_pipe,
similarity_mode=similarity_mode,
pressure_monitor=pressure_monitor,
pressure_predict=pressure_predict,
pressure_normal=pressure_normal,
pressure_leak_all=None,
flow_monitor=flow_monitor,
flow_predict=flow_predict,
flow_normal=flow_normal,
flow_leak_all=None,
timestep_list=timestep_list,
max_flow=max_flow,
group_basic_num=30,
Top_sensor_num=min(5, len(pressure_ids)),
if_gy=0,
pressure_threshold=float(min_dpressure),
leak_mag=float(burst_leakage),
n_workers=max(1, int(n_workers)),
stage_timing=stage_timing,
try:
(
located_pipe,
elapsed_seconds,
simulation_times,
_,
similarity_series,
exit_condition,
final_candidates_csv,
) = (
DN_search_multi_simple_add_flow_count_new(
wn=wn,
wn_inp_path=str(inp_path),
G0=G0,
all_node=all_node,
node_x=node_x,
node_y=node_y,
pipe_start_node_all=pipe_start_node_all,
pipe_end_node_all=pipe_end_node_all,
couple_node_length=couple_node_length,
node_pipe_dic=node_pipe_dic,
all_node_series=all_node_series,
top_group_ratio=0.3,
top_pipe_num_max=80,
top_pipe_num_min=10,
candidate_pipe_input_initial=candidate_pipe,
similarity_mode=similarity_mode,
pressure_monitor=pressure_monitor,
pressure_predict=pressure_predict,
pressure_normal=pressure_normal,
pressure_leak_all=None,
flow_monitor=flow_monitor,
flow_predict=flow_predict,
flow_normal=flow_normal,
flow_leak_all=None,
timestep_list=timestep_list,
max_flow=max_flow,
group_basic_num=30,
Top_sensor_num=min(5, len(pressure_ids)),
if_gy=0,
pressure_threshold=float(min_dpressure),
leak_mag=float(burst_leakage),
n_workers=max(1, int(n_workers)),
stage_timing=stage_timing,
partition_on_full_graph=partition_on_full_graph,
visualize_partition=visualize_partition,
visualize_pause_seconds=visualize_pause_seconds,
final_candidates_csv_path=final_candidates_csv_path,
)
)
)
except Exception as exc:
logger.exception("Burst location algorithm execution failed.")
raise RuntimeError(f"Failed to run burst location algorithm: {exc}") from exc
return {
"located_pipe": located_pipe,
@@ -221,6 +262,8 @@ def run_burst_location(
"simulation_times": int(simulation_times),
"top_candidates": _build_top_candidates(similarity_series),
"similarity_mode": similarity_mode,
"exit_condition": exit_condition,
"final_candidates_csv": final_candidates_csv,
"stage_timing_seconds": stage_timing,
}
@@ -267,6 +310,11 @@ def _parse_args():
default=DEFAULT_N_WORKERS,
help="(可选)特征中心模拟进程数,默认 max(1, min(cpu_count()-1, 4))",
)
parser.add_argument(
"--final-candidates-csv-path",
default="temp/burst_location/final_round_candidates.csv",
help="(可选)最后一轮候选管道明细 CSV 输出路径",
)
return parser.parse_args()
@@ -284,6 +332,7 @@ def main():
min_dpressure=args.min_dpressure,
basic_pressure=args.basic_pressure,
n_workers=args.n_workers,
final_candidates_csv_path=args.final_candidates_csv_path,
)
print(json.dumps(result, ensure_ascii=False))
+117 -9
View File
@@ -2,6 +2,7 @@
import copy
import math
import os
import sys
from datetime import datetime
from time import perf_counter
@@ -11,7 +12,11 @@ import numpy as np
import pandas as pd
from .leak_simulator import cal_signature_pipe_multi_pf
from .network_partitioner import cal_group_num, metis_grouping_pipe_weight
from .network_partitioner import (
cal_group_num,
metis_grouping_pipe_weight,
visualize_metis_partition,
)
from .similarity_calculator import (
adjust_ratio,
cal_similarity_all_multi_new_sq_improve_double_lzr,
@@ -30,7 +35,7 @@ def _ensure_signatures_for_centers(
timestep_list, # 你现有的时序列表
pressure_monitor,
flow_monitor, # 用来推断传感器列名
leak_mag, # 泄漏量,比如 400/3600
leak_mag,
n_workers=1,
):
"""
@@ -221,6 +226,41 @@ def _accumulate_stage(stage_timing, stage_name, started_at):
)
def _write_last_round_candidates_csv(
csv_path,
exit_condition,
iteration_count,
similarity_mode,
candidate_details,
fallback_similarity,
):
if not csv_path:
return None
timestamp_suffix = datetime.now().strftime("%Y%m%d_%H%M%S")
base_path, ext = os.path.splitext(csv_path)
ext = ext or ".csv"
output_path = f"{base_path}_{timestamp_suffix}{ext}"
if candidate_details is not None and len(candidate_details) > 0:
export_df = candidate_details.copy()
if export_df.index.name == "pipe_id":
export_df = export_df.reset_index()
else:
export_df = pd.DataFrame(
{
"pipe_id": [str(pipe_id) for pipe_id in fallback_similarity.index],
"final_similarity": [float(value) for value in fallback_similarity.values],
}
)
export_df["exit_condition"] = exit_condition
export_df["iterations"] = int(iteration_count)
export_df["similarity_mode"] = similarity_mode
parent_dir = os.path.dirname(output_path)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
export_df.to_csv(output_path, index=False, encoding="utf-8-sig")
return output_path
def cal_DtoTop1(
G0, pipe_leak, located_pipe, pipe_start_node_all, pipe_end_node_all, pipe_length
):
@@ -327,12 +367,18 @@ def DN_search_multi_simple_add_flow_count_new(
Top_sensor_num,
if_gy,
pressure_threshold,
leak_mag=400 / 3600,
leak_mag,
n_workers=1,
stage_timing=None,
partition_on_full_graph=True,
visualize_partition=False,
visualize_pause_seconds=0.3,
final_candidates_csv_path=None,
):
if stage_timing is None:
stage_timing = {}
exit_condition = "unknown"
final_candidates_csv = None
iter_count = 0
all_node_iter = copy.deepcopy(all_node)
candidate_pipe_input = copy.deepcopy(candidate_pipe_input_initial) # 可能漏损管段
@@ -351,6 +397,8 @@ def DN_search_multi_simple_add_flow_count_new(
effective_sensor = list(dpressure.index)
simulation_times = 0 # 模拟次数
if len(dpressure) > 0:
break_flag = 0
last_round_candidate_details = None
cos_h = 0
dis_h = 0
@@ -363,6 +411,7 @@ def DN_search_multi_simple_add_flow_count_new(
final_area = []
final_center = []
group_num = cal_group_num(candidate_pipe_input, group_basic_num)
partition_nodes = all_node if partition_on_full_graph else all_node_iter
# group 分组,得出候选漏损中心
stage_start = perf_counter()
@@ -370,7 +419,7 @@ def DN_search_multi_simple_add_flow_count_new(
metis_grouping_pipe_weight(
G0,
wn,
all_node_iter,
partition_nodes,
candidate_pipe_input,
group_num,
node_x,
@@ -383,6 +432,23 @@ def DN_search_multi_simple_add_flow_count_new(
)
)
_accumulate_stage(stage_timing, "group_partitioning", stage_start)
if visualize_partition:
visualize_metis_partition(
G0,
candidate_center_list,
candidate_group_list,
node_x,
node_y,
pipe_start_node_all,
pipe_end_node_all,
title=(
f"METIS Partition Iteration {iter_count + 1} | "
f"candidate pipes={len(candidate_pipe_input)} "
f"groups={len(candidate_group_list)}"
),
block=False,
pause_seconds=visualize_pause_seconds,
)
simulation_times = simulation_times + len(candidate_center_list)
# pick_pressure_leak
# pressure_leak = pressure_leak_all.loc[candidate_center_list].loc[:, :]
@@ -455,7 +521,9 @@ def DN_search_multi_simple_add_flow_count_new(
n_workers=n_workers,
)
)
_accumulate_stage(stage_timing, "signature_for_extra_centers", stage_start)
_accumulate_stage(
stage_timing, "signature_for_extra_centers", stage_start
)
pressure_leak = pd.concat([pressure_leak, pressure_add], axis=0)
if (flow_leak is not None) and (flow_add is not None):
flow_leak = pd.concat([flow_leak, flow_add], axis=0)
@@ -468,7 +536,7 @@ def DN_search_multi_simple_add_flow_count_new(
candidate_center_list + add_center
)
stage_start = perf_counter()
similarity, cos_h, dis_h, dis_f_h, break_flag = (
similarity, cos_h, dis_h, dis_f_h, break_flag, similarity_details = (
cal_similarity_all_multi_new_sq_improve_double_lzr(
candidate_center_list_sup,
similarity_mode,
@@ -494,8 +562,10 @@ def DN_search_multi_simple_add_flow_count_new(
max_flow,
)
)
last_round_candidate_details = similarity_details
_accumulate_stage(stage_timing, "similarity_ranking", stage_start)
if break_flag == 1:
exit_condition = "similarity_break_flag"
break
new_similarity = update_similarity(
@@ -525,12 +595,15 @@ def DN_search_multi_simple_add_flow_count_new(
final_area = list(set(final_area))
final_center = list(set(final_center))
if if_end == 1:
exit_condition = "candidate_area_if_end"
break
elif len(candidate_pipe_input) == len(final_area):
exit_condition = "candidate_size_no_change"
break
else:
candidate_pipe_input = final_area
all_node_iter = all_node_new_1
if not partition_on_full_graph:
all_node_iter = all_node_new_1
iter_count += 1
sys.stdout.write(
"\r"
@@ -577,7 +650,14 @@ def DN_search_multi_simple_add_flow_count_new(
# 做法:让 _ensure_signatures_for_centers 额外返回 need_cnt,再 simulation_times += need_cnt
stage_start = perf_counter()
similarity_sp, cos_h, dis_h, dis_f_h, break_flag = (
(
similarity_sp,
cos_h,
dis_h,
dis_f_h,
break_flag,
similarity_details,
) = (
cal_similarity_all_multi_new_sq_improve_double_lzr(
final_area_pipe,
similarity_mode,
@@ -603,6 +683,7 @@ def DN_search_multi_simple_add_flow_count_new(
max_flow,
)
)
last_round_candidate_details = similarity_details
_accumulate_stage(stage_timing, "similarity_final", stage_start)
else:
@@ -628,7 +709,16 @@ def DN_search_multi_simple_add_flow_count_new(
)
t2 = datetime.now()
dt = (t2 - t1).seconds
final_candidates_csv = _write_last_round_candidates_csv(
csv_path=final_candidates_csv_path,
exit_condition=exit_condition,
iteration_count=iter_count + 1,
similarity_mode=similarity_mode,
candidate_details=last_round_candidate_details,
fallback_similarity=similarity_sp,
)
else:
exit_condition = "no_effective_sensor_after_threshold"
dpressure = (pressure_predict - pressure_monitor).mean()
dpressure = dpressure.abs()
@@ -639,9 +729,27 @@ def DN_search_multi_simple_add_flow_count_new(
similarity_sp = similarity_sp.sort_values(ascending=False)
t2 = datetime.now()
dt = (t2 - t1).seconds
final_candidates_csv = _write_last_round_candidates_csv(
csv_path=final_candidates_csv_path,
exit_condition=exit_condition,
iteration_count=0,
similarity_mode=similarity_mode,
candidate_details=None,
fallback_similarity=similarity_sp,
)
stage_timing["iterations"] = iter_count + 1 if len(dpressure) > 0 else 0
stage_timing["total_elapsed_seconds"] = float(dt)
return similarity_sp.index[0], dt, simulation_times, wn, similarity_sp
stage_timing["exit_condition"] = exit_condition
stage_timing["final_candidates_csv"] = final_candidates_csv
return (
similarity_sp.index[0],
dt,
simulation_times,
wn,
similarity_sp,
exit_condition,
final_candidates_csv,
)
class BurstLocator:
@@ -25,9 +25,7 @@ def pick_center_pipe(node_x, node_y, candidate_pipe, pipe_start_node, pipe_end_n
start_nodes = pipe_start_node[candidate_pipe_list]
end_nodes = pipe_end_node[candidate_pipe_list]
x_vals = (
node_x[start_nodes].to_numpy() + node_x[start_nodes].to_numpy()
) / 2.0
x_vals = (node_x[start_nodes].to_numpy() + node_x[start_nodes].to_numpy()) / 2.0
y_vals = (node_y[end_nodes].to_numpy() + node_y[end_nodes].to_numpy()) / 2.0
mean_x = float(np.mean(x_vals))
mean_y = float(np.mean(y_vals))
@@ -227,7 +225,16 @@ def metis_grouping_pipe_weight(
def visualize_metis_partition(
G, center_pipes, pipe_groups, node_x, node_y, pipe_start_node_all, pipe_end_node_all
G,
center_pipes,
pipe_groups,
node_x,
node_y,
pipe_start_node_all,
pipe_end_node_all,
title: str | None = None,
block: bool = True,
pause_seconds: float | None = None,
):
"""
可视化METIS分区结果(单图模式)
@@ -240,7 +247,8 @@ def visualize_metis_partition(
pipe_start_node_all: 管道起点字典(dict)
pipe_end_node_all: 管道终点字典(dict)
"""
plt.figure(figsize=(9, 10))
fig = plt.figure("metis_partition_convergence", figsize=(22.51, 12.48))
fig.clf()
# 生成颜色映射(自动扩展颜色数量)
colors = plt.cm.tab20(np.linspace(0, 1, len(pipe_groups)))
@@ -294,15 +302,16 @@ def visualize_metis_partition(
# --- 添加图例和标注 ---
# 分组图例
group_labels = [f"Group {i + 1}" for i in range(len(pipe_groups))]
plt.legend(
legend_handles,
group_labels,
loc="upper right",
title="Partitions",
fontsize=8,
title_fontsize=10,
)
if legend_handles:
group_labels = [f"Group {i + 1}" for i in range(len(pipe_groups))]
plt.legend(
legend_handles,
group_labels,
loc="upper right",
title="Partitions",
fontsize=8,
title_fontsize=10,
)
# 中心管道标注(可选)
for i, center in enumerate(center_pipes):
@@ -325,14 +334,17 @@ def visualize_metis_partition(
)
# --- 图形美化 ---
plt.title("Water Network Partitioning Overview", fontsize=14, pad=20)
plt.title(title or "Water Network Partitioning Overview", fontsize=14, pad=20)
plt.xlabel("X Coordinate", fontsize=10)
plt.ylabel("Y Coordinate", fontsize=10)
plt.grid(True, alpha=0.2, linestyle=":")
plt.tight_layout()
# 显示图形
plt.show()
plt.show(block=block)
if pause_seconds is not None:
plt.pause(max(0.0, float(pause_seconds)))
return fig
def generate_adjlist_with_all_edges(G, delimiter):
@@ -231,6 +231,35 @@ def cal_sq_all_multi(
cos_sensor_num,
flow_sensor_num,
):
"""融合多种相似性并输出按时刻与候选管段组织的综合相似度。
该函数会根据模式开关(是否仅流量、是否仅 COS、是否包含流量)对
`similarity_cos`、`similarity_dis`、`similarity_f` 做标准化,并计算
权重 `sq_cos/sq_dis/sq_f` 后进行加权融合。
Args:
similarity_cos: 压力余弦相似性(DataFrame/Series,通常为时刻 x 候选管段)。
similarity_dis: 压力距离相似性(DataFrame/Series,通常为时刻 x 候选管段)。
similarity_f: 流量距离相似性(DataFrame/Series,通常为时刻 x 候选管段)。
candidate_pipe: 候选管段列表,用于输出列索引。
timestep_list_spc: 时刻列表,用于输出行索引。
if_flow: 是否启用流量相似性(1 启用,0 禁用)。
if_only_cos: 相似性模式标识(0: COS+DIS;1: COS;其他值按分支定义处理)。
if_only_flow: 是否仅使用流量相似性(1 是,0 否)。
cos_h_input: 外部给定的 COS 权重(强制权重模式下使用)。
dis_h_input: 外部给定的 DIS 权重(强制权重模式下使用)。
dis_f_h_input: 外部给定的流量权重(强制权重模式下使用)。
if_compalsive: 是否使用外部强制权重(1 使用输入权重,0 自动计算权重)。
cos_sensor_num: 压力传感器数量,用于权重调整。
flow_sensor_num: 流量传感器数量,用于权重调整。
Returns:
tuple[pd.DataFrame | pd.Series, float, float, float]:
- output_similarity_pd: 综合相似性结果。
- sq_cos: 最终 COS 权重。
- sq_dis: 最终 DIS 权重。
- sq_f: 最终流量权重。
"""
if if_only_flow == 1:
similarity_f, h_f = cal_sq_single_array(
similarity_f.values.reshape((-1, 1)), if_direct=2
@@ -429,6 +458,7 @@ def cal_similarity_all_multi_new_sq_improve_double_lzr(
max_flow,
):
similarity = pd.Series(dtype=float, index=candidate_pipe)
similarity_detail: pd.DataFrame | None = None
important_p_sensor = cal_top_sensors(monitor_p, predict_p, Top_sensor_num)
# important_f_sensor, basic_f = cal_top_f_sensor(normal_f)
important_f_sensor = monitor_f.columns
@@ -548,13 +578,57 @@ def cal_similarity_all_multi_new_sq_improve_double_lzr(
:, each_candidate
].mean()
similarity = similarity.sort_values(ascending=False)
detail_index = [str(pipe) for pipe in candidate_pipe]
similarity_detail = pd.DataFrame(index=detail_index)
similarity_detail.index.name = "pipe_id"
if isinstance(total_similarity_cos, pd.DataFrame) and len(total_similarity_cos) > 0:
pressure_cos_mean = (
total_similarity_cos.mean(axis=0)
.reindex(candidate_pipe)
.to_numpy(dtype=float)
)
else:
pressure_cos_mean = np.full(len(candidate_pipe), np.nan)
if isinstance(total_similarity_dis, pd.DataFrame) and len(total_similarity_dis) > 0:
pressure_dis_mean = (
total_similarity_dis.mean(axis=0)
.reindex(candidate_pipe)
.to_numpy(dtype=float)
)
else:
pressure_dis_mean = np.full(len(candidate_pipe), np.nan)
if isinstance(total_similarity_dis_f, pd.DataFrame) and len(total_similarity_dis_f) > 0:
flow_dis_mean = (
total_similarity_dis_f.mean(axis=0)
.reindex(candidate_pipe)
.to_numpy(dtype=float)
)
else:
flow_dis_mean = np.full(len(candidate_pipe), np.nan)
similarity_detail["pressure_cos_mean"] = pressure_cos_mean
similarity_detail["pressure_dis_mean"] = pressure_dis_mean
similarity_detail["flow_dis_mean"] = flow_dis_mean
similarity_detail["weight_cos"] = float(cos_h)
similarity_detail["weight_dis"] = float(dis_h)
similarity_detail["weight_flow"] = float(dis_f_h)
similarity_detail["final_similarity"] = (
similarity.reindex(candidate_pipe).to_numpy(dtype=float)
)
similarity_detail["similarity_rank"] = (
similarity_detail["final_similarity"].rank(method="dense", ascending=False)
).astype(int)
similarity_detail["pressure_sensor_count"] = int(len(important_p_sensor))
similarity_detail["flow_sensor_count"] = int(len(important_f_sensor))
similarity_detail = similarity_detail.sort_values(
by="final_similarity", ascending=False
)
else:
break_flag = 1
similarity = 0
cos_h = 0
dis_h = 0
dis_f_h = 0
return similarity, cos_h, dis_h, dis_f_h, break_flag
return similarity, cos_h, dis_h, dis_f_h, break_flag, similarity_detail
def cal_similarity_all_cos_dis(
@@ -744,6 +818,7 @@ def adjust_ratio(similarity_mode, cos_h, dis_h, dis_f_h, low_limit=0.1):
return cos_h, dis_h, dis_f_h
# 返回相似性计算的模式(不同权重),是否计算流量相似性,是否只计算cos相似性,是否只计算流量相似性。
def decode_mode(similarity_mode):
if similarity_mode == "COS":
if_flow = 0