From 05ca940c9f2c70c583bd3a479003b03b61d9f200 Mon Sep 17 00:00:00 2001 From: Jiang Date: Sat, 7 Mar 2026 15:31:04 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=88=86=E7=AE=A1=E5=AE=9A?= =?UTF-8?q?=E4=BD=8D=E7=AE=97=E6=B3=95=EF=BC=8C=E5=A2=9E=E5=8A=A0=E5=A4=9A?= =?UTF-8?q?=E8=BF=9B=E7=A8=8B=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../burst_location/burst_location.py | 15 ++++++ .../burst_location/burst_locator.py | 38 +++++++++++++- .../burst_location/leak_simulator.py | 50 +++++++++++++++---- 3 files changed, 93 insertions(+), 10 deletions(-) diff --git a/app/algorithms/burst_location/burst_location.py b/app/algorithms/burst_location/burst_location.py index e16f07d..996950c 100644 --- a/app/algorithms/burst_location/burst_location.py +++ b/app/algorithms/burst_location/burst_location.py @@ -1,5 +1,6 @@ import argparse import json +from multiprocessing import cpu_count from pathlib import Path from typing import Any, Iterable @@ -17,6 +18,8 @@ from .network_model import ( read_inf_inp_other, ) +DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4)) + def _read_id_list_json(path): if path is None: @@ -111,6 +114,7 @@ def run_burst_location( normal_flow: pd.Series | None = None, min_dpressure: float = 2.0, basic_pressure: float = 10.0, + n_workers: int = DEFAULT_N_WORKERS, ) -> dict[str, Any]: if pressure_scada_ids is None or len(pressure_scada_ids) == 0: raise ValueError("pressure_scada_ids cannot be empty.") @@ -171,6 +175,7 @@ def run_burst_location( similarity_mode = "CAD_new_gy" max_flow = pd.Series(dtype=float) + stage_timing: dict[str, Any] = {} located_pipe, elapsed_seconds, simulation_times, _, similarity_series = ( DN_search_multi_simple_add_flow_count_new( wn=wn, @@ -203,6 +208,8 @@ def run_burst_location( if_gy=0, pressure_threshold=float(min_dpressure), leak_mag=float(burst_leakage), + n_workers=max(1, int(n_workers)), + stage_timing=stage_timing, ) ) @@ -213,6 +220,7 @@ def run_burst_location( "simulation_times": int(simulation_times), "top_candidates": _build_top_candidates(similarity_series), "similarity_mode": similarity_mode, + "stage_timing_seconds": stage_timing, } @@ -252,6 +260,12 @@ def _parse_args(): default=10.0, help="(可选)基础服务压力,默认 10.0", ) + parser.add_argument( + "--n-workers", + type=int, + default=DEFAULT_N_WORKERS, + help="(可选)特征中心模拟进程数,默认 max(1, min(cpu_count()-1, 4))", + ) return parser.parse_args() @@ -268,6 +282,7 @@ def main(): normal_flow=_read_series_csv(args.normal_flow_csv), min_dpressure=args.min_dpressure, basic_pressure=args.basic_pressure, + n_workers=args.n_workers, ) print(json.dumps(result, ensure_ascii=False)) diff --git a/app/algorithms/burst_location/burst_locator.py b/app/algorithms/burst_location/burst_locator.py index 599ba59..5bea186 100644 --- a/app/algorithms/burst_location/burst_locator.py +++ b/app/algorithms/burst_location/burst_locator.py @@ -4,6 +4,7 @@ import copy import math import sys from datetime import datetime +from time import perf_counter import networkx as nx import numpy as np @@ -29,6 +30,7 @@ def _ensure_signatures_for_centers( pressure_monitor, flow_monitor, # 用来推断传感器列名 leak_mag, # 泄漏量,比如 400/3600 + n_workers=1, ): """ 只为缺失的中心补算 SLF(调用你现有的 cal_signature_pipe_multi_pf), @@ -63,7 +65,12 @@ def _ensure_signatures_for_centers( # 3) 若有缺失中心,仅为这些中心补算一次 if len(need) > 0: p_new, _ = cal_signature_pipe_multi_pf( - wn, leak_mag, need, timestep_list, sensor_name_all + wn, + leak_mag, + need, + timestep_list, + sensor_name_all, + n_workers=n_workers, ) # 初始化空缓存时,做一次“同构化” if pressure_leak_all is None or len(pressure_leak_all) == 0: @@ -206,6 +213,12 @@ def _dedupe_preserve_order(items): return output +def _accumulate_stage(stage_timing, stage_name, started_at): + stage_timing[stage_name] = stage_timing.get(stage_name, 0.0) + ( + perf_counter() - started_at + ) + + def cal_DtoTop1( G0, pipe_leak, located_pipe, pipe_start_node_all, pipe_end_node_all, pipe_length ): @@ -312,7 +325,11 @@ def DN_search_multi_simple_add_flow_count_new( if_gy, pressure_threshold, leak_mag=400 / 3600, + n_workers=1, + stage_timing=None, ): + if stage_timing is None: + stage_timing = {} iter_count = 0 all_node_iter = copy.deepcopy(all_node) candidate_pipe_input = copy.deepcopy(candidate_pipe_input_initial) # 可能漏损管段 @@ -345,6 +362,7 @@ def DN_search_multi_simple_add_flow_count_new( group_num = cal_group_num(candidate_pipe_input, group_basic_num) # group 分组,得出候选漏损中心 + stage_start = perf_counter() candidate_center_list, candidate_group_list, new_all_node = ( metis_grouping_pipe_weight( G0, @@ -361,12 +379,14 @@ def DN_search_multi_simple_add_flow_count_new( couple_node_length, ) ) + _accumulate_stage(stage_timing, "group_partitioning", stage_start) simulation_times = simulation_times + len(candidate_center_list) # pick_pressure_leak # pressure_leak = pressure_leak_all.loc[candidate_center_list].loc[:, :] # flow_leak = flow_leak_all.loc[candidate_center_list].loc[:, :] # —— 新增泄漏量(保持你现在的一致,或从外部传入)—— # —— 只为缺失中心补算,然后取本轮需要的中心子集 —— + stage_start = perf_counter() pressure_leak, flow_leak, pressure_leak_all, flow_leak_all = ( _ensure_signatures_for_centers( wn=wn, @@ -377,8 +397,10 @@ def DN_search_multi_simple_add_flow_count_new( pressure_monitor=pressure_monitor, flow_monitor=flow_monitor, leak_mag=leak_mag, + n_workers=n_workers, ) ) + _accumulate_stage(stage_timing, "signature_for_candidates", stage_start) # pressure_leak_f= pressure_leak.swaplevel() @@ -414,6 +436,7 @@ def DN_search_multi_simple_add_flow_count_new( # -------------------------------------------------------- # 只为 add_center 里还没算过的中心补算,并与本轮中心合并 if len(add_center) > 0: + stage_start = perf_counter() pressure_add, flow_add, pressure_leak_all, flow_leak_all = ( _ensure_signatures_for_centers( wn=wn, @@ -424,8 +447,10 @@ def DN_search_multi_simple_add_flow_count_new( pressure_monitor=pressure_monitor, flow_monitor=flow_monitor, leak_mag=leak_mag, # 与上面一致 + n_workers=n_workers, ) ) + _accumulate_stage(stage_timing, "signature_for_extra_centers", stage_start) pressure_leak = pd.concat([pressure_leak, pressure_add], axis=0) if (flow_leak is not None) and (flow_add is not None): flow_leak = pd.concat([flow_leak, flow_add], axis=0) @@ -437,6 +462,7 @@ def DN_search_multi_simple_add_flow_count_new( candidate_center_list_sup = _dedupe_preserve_order( candidate_center_list + add_center ) + stage_start = perf_counter() similarity, cos_h, dis_h, dis_f_h, break_flag = ( cal_similarity_all_multi_new_sq_improve_double_lzr( candidate_center_list_sup, @@ -463,6 +489,7 @@ def DN_search_multi_simple_add_flow_count_new( max_flow, ) ) + _accumulate_stage(stage_timing, "similarity_ranking", stage_start) if break_flag == 1: break @@ -473,6 +500,7 @@ def DN_search_multi_simple_add_flow_count_new( cut_ratio, new_similarity = extra_judge(new_similarity) else: cut_ratio = 1 + stage_start = perf_counter() final_area_t, final_center_t, all_node_new_1, if_end = ( area_output_num_ki_improve( candidate_center_list, @@ -485,6 +513,7 @@ def DN_search_multi_simple_add_flow_count_new( cut_ratio, ) ) + _accumulate_stage(stage_timing, "candidate_area_selection", stage_start) final_area = final_area + final_area_t final_center = final_center + final_center_t @@ -522,6 +551,7 @@ def DN_search_multi_simple_add_flow_count_new( final_area_pipe = list(final_area) # 确保是 list # 只为还没算过的管段补齐 SLF(按需计算) + stage_start = perf_counter() pressure_leak_sp, flow_leak_sp, pressure_leak_all, flow_leak_all = ( _ensure_signatures_for_centers( wn=wn, @@ -532,12 +562,15 @@ def DN_search_multi_simple_add_flow_count_new( pressure_monitor=pressure_monitor, flow_monitor=flow_monitor, leak_mag=leak_mag, + n_workers=n_workers, ) ) + _accumulate_stage(stage_timing, "signature_for_final_area", stage_start) # 如果你要精确统计模拟次数,这里可以加上“本次新补的数量”, # 做法:让 _ensure_signatures_for_centers 额外返回 need_cnt,再 simulation_times += need_cnt + stage_start = perf_counter() similarity_sp, cos_h, dis_h, dis_f_h, break_flag = ( cal_similarity_all_multi_new_sq_improve_double_lzr( final_area_pipe, @@ -564,6 +597,7 @@ def DN_search_multi_simple_add_flow_count_new( max_flow, ) ) + _accumulate_stage(stage_timing, "similarity_final", stage_start) else: dpressure = (pressure_predict - pressure_monitor).mean() @@ -599,6 +633,8 @@ def DN_search_multi_simple_add_flow_count_new( similarity_sp = similarity_sp.sort_values(ascending=False) t2 = datetime.now() dt = (t2 - t1).seconds + stage_timing["iterations"] = iter_count + 1 if len(dpressure) > 0 else 0 + stage_timing["total_elapsed_seconds"] = float(dt) return similarity_sp.index[0], dt, simulation_times, wn, similarity_sp diff --git a/app/algorithms/burst_location/leak_simulator.py b/app/algorithms/burst_location/leak_simulator.py index dd7a5bc..1de1564 100644 --- a/app/algorithms/burst_location/leak_simulator.py +++ b/app/algorithms/burst_location/leak_simulator.py @@ -1,12 +1,16 @@ """漏损模拟模块。""" import math +import multiprocessing as mp import sys import pandas as pd import wntr _PIPE2LEAKNODE = None +_SIGNATURE_WN = None +_SIGNATURE_LEAK_MAG = None +_SIGNATURE_SENSOR_NAME = None def simple_add_leak(wn, leak_mag, leak_pipe): @@ -387,7 +391,7 @@ def cal_sum_demand(demand): def cal_signature_pipe_multi_pf( - wn, leak_mag, candidate_center, timestep_list, sensor_name + wn, leak_mag, candidate_center, timestep_list, sensor_name, n_workers=1 ): candidate_center_num = len(candidate_center) pressure_leak = pd.DataFrame( @@ -398,17 +402,45 @@ def cal_signature_pipe_multi_pf( # columns=sensor_f_name) pressure_leak = pressure_leak.sort_index() # flow_leak = flow_leak.sort_index() - for i in range(candidate_center_num): - wn, pressure_output = leak_simulation_pipe_dd_multi_pf( - wn, leak_mag, candidate_center[i], sensor_name - ) - # leak_or_not_list.append(leak_or_not) - pressure_leak.loc[(candidate_center[i], slice(None)), :] = pressure_output.to_numpy() - # flow_leak.loc[candidate_center[i]].loc[:, :] = flow_output - sys.stdout.write("\r" + "已经完成计算" + str(i + 1) + "个特征中心") + can_fork = "fork" in mp.get_all_start_methods() + if n_workers > 1 and candidate_center_num > 1 and can_fork: + _set_signature_worker_context(wn, leak_mag, sensor_name) + worker_count = min(n_workers, candidate_center_num) + with mp.get_context("fork").Pool(processes=worker_count) as pool: + for i, (center_name, pressure_array) in enumerate( + pool.imap(_run_signature_for_center, candidate_center) + ): + pressure_leak.loc[(center_name, slice(None)), :] = pressure_array + sys.stdout.write("\r" + "已经完成计算" + str(i + 1) + "个特征中心") + _set_signature_worker_context(None, None, None) + else: + for i in range(candidate_center_num): + wn, pressure_output = leak_simulation_pipe_dd_multi_pf( + wn, leak_mag, candidate_center[i], sensor_name + ) + # leak_or_not_list.append(leak_or_not) + pressure_leak.loc[(candidate_center[i], slice(None)), :] = ( + pressure_output.to_numpy() + ) + # flow_leak.loc[candidate_center[i]].loc[:, :] = flow_output + sys.stdout.write("\r" + "已经完成计算" + str(i + 1) + "个特征中心") return pressure_leak, candidate_center +def _set_signature_worker_context(wn, leak_mag, sensor_name): + global _SIGNATURE_WN, _SIGNATURE_LEAK_MAG, _SIGNATURE_SENSOR_NAME + _SIGNATURE_WN = wn + _SIGNATURE_LEAK_MAG = leak_mag + _SIGNATURE_SENSOR_NAME = sensor_name + + +def _run_signature_for_center(center_name): + _, pressure_output = leak_simulation_pipe_dd_multi_pf( + _SIGNATURE_WN, _SIGNATURE_LEAK_MAG, center_name, _SIGNATURE_SENSOR_NAME + ) + return center_name, pressure_output.to_numpy() + + def pick_pipe(all_pipes, pipe_diameter, limited_diameter): candidate_pipe = [] for each_pipe in all_pipes: