优化爆管定位算法,增加多进程支持

This commit is contained in:
2026-03-07 15:31:04 +08:00
parent 0f8d33291d
commit 05ca940c9f
3 changed files with 93 additions and 10 deletions
+37 -1
View File
@@ -4,6 +4,7 @@ import copy
import math
import sys
from datetime import datetime
from time import perf_counter
import networkx as nx
import numpy as np
@@ -29,6 +30,7 @@ def _ensure_signatures_for_centers(
pressure_monitor,
flow_monitor, # 用来推断传感器列名
leak_mag, # 泄漏量,比如 400/3600
n_workers=1,
):
"""
只为缺失的中心补算 SLF(调用你现有的 cal_signature_pipe_multi_pf),
@@ -63,7 +65,12 @@ def _ensure_signatures_for_centers(
# 3) 若有缺失中心,仅为这些中心补算一次
if len(need) > 0:
p_new, _ = cal_signature_pipe_multi_pf(
wn, leak_mag, need, timestep_list, sensor_name_all
wn,
leak_mag,
need,
timestep_list,
sensor_name_all,
n_workers=n_workers,
)
# 初始化空缓存时,做一次“同构化”
if pressure_leak_all is None or len(pressure_leak_all) == 0:
@@ -206,6 +213,12 @@ def _dedupe_preserve_order(items):
return output
def _accumulate_stage(stage_timing, stage_name, started_at):
stage_timing[stage_name] = stage_timing.get(stage_name, 0.0) + (
perf_counter() - started_at
)
def cal_DtoTop1(
G0, pipe_leak, located_pipe, pipe_start_node_all, pipe_end_node_all, pipe_length
):
@@ -312,7 +325,11 @@ def DN_search_multi_simple_add_flow_count_new(
if_gy,
pressure_threshold,
leak_mag=400 / 3600,
n_workers=1,
stage_timing=None,
):
if stage_timing is None:
stage_timing = {}
iter_count = 0
all_node_iter = copy.deepcopy(all_node)
candidate_pipe_input = copy.deepcopy(candidate_pipe_input_initial) # 可能漏损管段
@@ -345,6 +362,7 @@ def DN_search_multi_simple_add_flow_count_new(
group_num = cal_group_num(candidate_pipe_input, group_basic_num)
# group 分组,得出候选漏损中心
stage_start = perf_counter()
candidate_center_list, candidate_group_list, new_all_node = (
metis_grouping_pipe_weight(
G0,
@@ -361,12 +379,14 @@ def DN_search_multi_simple_add_flow_count_new(
couple_node_length,
)
)
_accumulate_stage(stage_timing, "group_partitioning", stage_start)
simulation_times = simulation_times + len(candidate_center_list)
# pick_pressure_leak
# pressure_leak = pressure_leak_all.loc[candidate_center_list].loc[:, :]
# flow_leak = flow_leak_all.loc[candidate_center_list].loc[:, :]
# —— 新增泄漏量(保持你现在的一致,或从外部传入)——
# —— 只为缺失中心补算,然后取本轮需要的中心子集 ——
stage_start = perf_counter()
pressure_leak, flow_leak, pressure_leak_all, flow_leak_all = (
_ensure_signatures_for_centers(
wn=wn,
@@ -377,8 +397,10 @@ def DN_search_multi_simple_add_flow_count_new(
pressure_monitor=pressure_monitor,
flow_monitor=flow_monitor,
leak_mag=leak_mag,
n_workers=n_workers,
)
)
_accumulate_stage(stage_timing, "signature_for_candidates", stage_start)
# pressure_leak_f= pressure_leak.swaplevel()
@@ -414,6 +436,7 @@ def DN_search_multi_simple_add_flow_count_new(
# --------------------------------------------------------
# 只为 add_center 里还没算过的中心补算,并与本轮中心合并
if len(add_center) > 0:
stage_start = perf_counter()
pressure_add, flow_add, pressure_leak_all, flow_leak_all = (
_ensure_signatures_for_centers(
wn=wn,
@@ -424,8 +447,10 @@ def DN_search_multi_simple_add_flow_count_new(
pressure_monitor=pressure_monitor,
flow_monitor=flow_monitor,
leak_mag=leak_mag, # 与上面一致
n_workers=n_workers,
)
)
_accumulate_stage(stage_timing, "signature_for_extra_centers", stage_start)
pressure_leak = pd.concat([pressure_leak, pressure_add], axis=0)
if (flow_leak is not None) and (flow_add is not None):
flow_leak = pd.concat([flow_leak, flow_add], axis=0)
@@ -437,6 +462,7 @@ def DN_search_multi_simple_add_flow_count_new(
candidate_center_list_sup = _dedupe_preserve_order(
candidate_center_list + add_center
)
stage_start = perf_counter()
similarity, cos_h, dis_h, dis_f_h, break_flag = (
cal_similarity_all_multi_new_sq_improve_double_lzr(
candidate_center_list_sup,
@@ -463,6 +489,7 @@ def DN_search_multi_simple_add_flow_count_new(
max_flow,
)
)
_accumulate_stage(stage_timing, "similarity_ranking", stage_start)
if break_flag == 1:
break
@@ -473,6 +500,7 @@ def DN_search_multi_simple_add_flow_count_new(
cut_ratio, new_similarity = extra_judge(new_similarity)
else:
cut_ratio = 1
stage_start = perf_counter()
final_area_t, final_center_t, all_node_new_1, if_end = (
area_output_num_ki_improve(
candidate_center_list,
@@ -485,6 +513,7 @@ def DN_search_multi_simple_add_flow_count_new(
cut_ratio,
)
)
_accumulate_stage(stage_timing, "candidate_area_selection", stage_start)
final_area = final_area + final_area_t
final_center = final_center + final_center_t
@@ -522,6 +551,7 @@ def DN_search_multi_simple_add_flow_count_new(
final_area_pipe = list(final_area) # 确保是 list
# 只为还没算过的管段补齐 SLF(按需计算)
stage_start = perf_counter()
pressure_leak_sp, flow_leak_sp, pressure_leak_all, flow_leak_all = (
_ensure_signatures_for_centers(
wn=wn,
@@ -532,12 +562,15 @@ def DN_search_multi_simple_add_flow_count_new(
pressure_monitor=pressure_monitor,
flow_monitor=flow_monitor,
leak_mag=leak_mag,
n_workers=n_workers,
)
)
_accumulate_stage(stage_timing, "signature_for_final_area", stage_start)
# 如果你要精确统计模拟次数,这里可以加上“本次新补的数量”,
# 做法:让 _ensure_signatures_for_centers 额外返回 need_cnt,再 simulation_times += need_cnt
stage_start = perf_counter()
similarity_sp, cos_h, dis_h, dis_f_h, break_flag = (
cal_similarity_all_multi_new_sq_improve_double_lzr(
final_area_pipe,
@@ -564,6 +597,7 @@ def DN_search_multi_simple_add_flow_count_new(
max_flow,
)
)
_accumulate_stage(stage_timing, "similarity_final", stage_start)
else:
dpressure = (pressure_predict - pressure_monitor).mean()
@@ -599,6 +633,8 @@ def DN_search_multi_simple_add_flow_count_new(
similarity_sp = similarity_sp.sort_values(ascending=False)
t2 = datetime.now()
dt = (t2 - t1).seconds
stage_timing["iterations"] = iter_count + 1 if len(dpressure) > 0 else 0
stage_timing["total_elapsed_seconds"] = float(dt)
return similarity_sp.index[0], dt, simulation_times, wn, similarity_sp