From a7e3b6aff912b7c317faace0e250774a58e93506 Mon Sep 17 00:00:00 2001 From: Jiang Date: Sat, 7 Mar 2026 15:34:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20wn=5Finp=5Fpath=20?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E4=BB=A5=E6=94=AF=E6=8C=81=E5=A4=9A=E8=BF=9B?= =?UTF-8?q?=E7=A8=8B=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../burst_location/burst_location.py | 1 + .../burst_location/burst_locator.py | 6 + .../burst_location/leak_simulator.py | 123 +++++++++++++++--- 3 files changed, 110 insertions(+), 20 deletions(-) diff --git a/app/algorithms/burst_location/burst_location.py b/app/algorithms/burst_location/burst_location.py index 996950c..1c8122a 100644 --- a/app/algorithms/burst_location/burst_location.py +++ b/app/algorithms/burst_location/burst_location.py @@ -179,6 +179,7 @@ def run_burst_location( located_pipe, elapsed_seconds, simulation_times, _, similarity_series = ( DN_search_multi_simple_add_flow_count_new( wn=wn, + wn_inp_path=str(inp_path), G0=G0, all_node=all_node, node_x=node_x, diff --git a/app/algorithms/burst_location/burst_locator.py b/app/algorithms/burst_location/burst_locator.py index 5bea186..e081c86 100644 --- a/app/algorithms/burst_location/burst_locator.py +++ b/app/algorithms/burst_location/burst_locator.py @@ -23,6 +23,7 @@ from .similarity_calculator import ( def _ensure_signatures_for_centers( wn, + wn_inp_path, center_list, # 本轮要用到的中心(list[str]) pressure_leak_all, flow_leak_all, # 全量缓存(可为空 DF) @@ -71,6 +72,7 @@ def _ensure_signatures_for_centers( timestep_list, sensor_name_all, n_workers=n_workers, + wn_inp_path=wn_inp_path, ) # 初始化空缓存时,做一次“同构化” if pressure_leak_all is None or len(pressure_leak_all) == 0: @@ -296,6 +298,7 @@ def cal_SD(located_pipe, real_pipe, pipe_x, pipe_y): def DN_search_multi_simple_add_flow_count_new( wn, + wn_inp_path, G0, all_node, node_x, @@ -390,6 +393,7 @@ def DN_search_multi_simple_add_flow_count_new( pressure_leak, flow_leak, pressure_leak_all, flow_leak_all = ( _ensure_signatures_for_centers( wn=wn, + wn_inp_path=wn_inp_path, center_list=candidate_center_list, pressure_leak_all=pressure_leak_all, flow_leak_all=flow_leak_all, @@ -440,6 +444,7 @@ def DN_search_multi_simple_add_flow_count_new( pressure_add, flow_add, pressure_leak_all, flow_leak_all = ( _ensure_signatures_for_centers( wn=wn, + wn_inp_path=wn_inp_path, center_list=add_center, pressure_leak_all=pressure_leak_all, flow_leak_all=flow_leak_all, @@ -555,6 +560,7 @@ def DN_search_multi_simple_add_flow_count_new( pressure_leak_sp, flow_leak_sp, pressure_leak_all, flow_leak_all = ( _ensure_signatures_for_centers( wn=wn, + wn_inp_path=wn_inp_path, center_list=final_area_pipe, # 这次要用的“最终区域里的所有管段” pressure_leak_all=pressure_leak_all, # 累积缓存(会被更新) flow_leak_all=flow_leak_all, diff --git a/app/algorithms/burst_location/leak_simulator.py b/app/algorithms/burst_location/leak_simulator.py index 1de1564..213cd0a 100644 --- a/app/algorithms/burst_location/leak_simulator.py +++ b/app/algorithms/burst_location/leak_simulator.py @@ -2,15 +2,55 @@ import math import multiprocessing as mp +import os import sys import pandas as pd import wntr _PIPE2LEAKNODE = None -_SIGNATURE_WN = None -_SIGNATURE_LEAK_MAG = None -_SIGNATURE_SENSOR_NAME = None +_SIGNATURE_WORKER_DATA = {} + + +def _cleanup_temp_files(prefix): + for ext in [".inp", ".rpt", ".bin", ".out"]: + temp_file = prefix + ext + if os.path.exists(temp_file): + try: + os.remove(temp_file) + except OSError: + pass + + +def _make_temp_prefix(tag): + temp_dir = os.path.abspath(os.path.join("temp", "burst_location")) + os.makedirs(temp_dir, exist_ok=True) + safe_tag = str(tag).replace(os.sep, "_").replace(" ", "_") + return os.path.join(temp_dir, f"{safe_tag}_{os.getpid()}") + + +def _snapshot_hydraulic_options(wn): + options = wn.options + return { + "demand_model": options.hydraulic.demand_model, + "duration": float(options.time.duration), + "hydraulic_timestep": float(options.time.hydraulic_timestep), + "pattern_timestep": float(options.time.pattern_timestep), + "report_timestep": float(options.time.report_timestep), + "required_pressure": float(options.hydraulic.required_pressure), + "minimum_pressure": float(options.hydraulic.minimum_pressure), + } + + +def _apply_hydraulic_options(wn, option_values): + options = wn.options + options.hydraulic.demand_model = option_values["demand_model"] + options.time.duration = option_values["duration"] + options.time.hydraulic_timestep = option_values["hydraulic_timestep"] + options.time.pattern_timestep = option_values["pattern_timestep"] + options.time.report_timestep = option_values["report_timestep"] + options.hydraulic.required_pressure = option_values["required_pressure"] + options.hydraulic.minimum_pressure = option_values["minimum_pressure"] def simple_add_leak(wn, leak_mag, leak_pipe): @@ -209,7 +249,9 @@ def ensure_mid_node(wn, leak_pipe): raise KeyError(f"Cannot ensure mid node for pipe '{leak_pipe}'.") -def leak_simulation_pipe_dd_multi_pf(wn, leak_mag, leak_pipe, sensor_name): +def leak_simulation_pipe_dd_multi_pf( + wn, leak_mag, leak_pipe, sensor_name, file_prefix=None +): """ 优化版: - 不再 remove/add link/node @@ -233,7 +275,10 @@ def leak_simulation_pipe_dd_multi_pf(wn, leak_mag, leak_pipe, sensor_name): # 仿真 sim = wntr.sim.EpanetSimulator(wn) - results = sim.run_sim() + if file_prefix is None: + results = sim.run_sim() + else: + results = sim.run_sim(file_prefix=file_prefix) # 输出(保持列顺序) pressure_output = results.node["pressure"].loc[:, sensor_name] @@ -242,6 +287,8 @@ def leak_simulation_pipe_dd_multi_pf(wn, leak_mag, leak_pipe, sensor_name): finally: # 关闭泄漏:还原 base_value ts_obj.base_value = orig_base + if file_prefix is not None: + _cleanup_temp_files(file_prefix) def prepare_leak_infrastructure(wn, candidate_pipes): @@ -391,7 +438,13 @@ def cal_sum_demand(demand): def cal_signature_pipe_multi_pf( - wn, leak_mag, candidate_center, timestep_list, sensor_name, n_workers=1 + wn, + leak_mag, + candidate_center, + timestep_list, + sensor_name, + n_workers=1, + wn_inp_path=None, ): candidate_center_num = len(candidate_center) pressure_leak = pd.DataFrame( @@ -402,21 +455,41 @@ def cal_signature_pipe_multi_pf( # columns=sensor_f_name) pressure_leak = pressure_leak.sort_index() # flow_leak = flow_leak.sort_index() - can_fork = "fork" in mp.get_all_start_methods() - if n_workers > 1 and candidate_center_num > 1 and can_fork: - _set_signature_worker_context(wn, leak_mag, sensor_name) + can_parallel = ( + n_workers > 1 + and candidate_center_num > 1 + and wn_inp_path is not None + and len(str(wn_inp_path)) > 0 + ) + if can_parallel: + option_values = _snapshot_hydraulic_options(wn) worker_count = min(n_workers, candidate_center_num) - with mp.get_context("fork").Pool(processes=worker_count) as pool: + start_methods = mp.get_all_start_methods() + context_name = "spawn" if "spawn" in start_methods else start_methods[0] + with mp.get_context(context_name).Pool( + processes=worker_count, + initializer=_signature_worker_init, + initargs=( + str(wn_inp_path), + float(leak_mag), + list(sensor_name), + option_values, + ), + ) as pool: for i, (center_name, pressure_array) in enumerate( - pool.imap(_run_signature_for_center, candidate_center) + pool.imap(_signature_worker_run_center, candidate_center) ): pressure_leak.loc[(center_name, slice(None)), :] = pressure_array sys.stdout.write("\r" + "已经完成计算" + str(i + 1) + "个特征中心") - _set_signature_worker_context(None, None, None) else: for i in range(candidate_center_num): + temp_prefix = _make_temp_prefix(f"sig_{i}") wn, pressure_output = leak_simulation_pipe_dd_multi_pf( - wn, leak_mag, candidate_center[i], sensor_name + wn, + leak_mag, + candidate_center[i], + sensor_name, + file_prefix=temp_prefix, ) # leak_or_not_list.append(leak_or_not) pressure_leak.loc[(candidate_center[i], slice(None)), :] = ( @@ -427,16 +500,26 @@ def cal_signature_pipe_multi_pf( return pressure_leak, candidate_center -def _set_signature_worker_context(wn, leak_mag, sensor_name): - global _SIGNATURE_WN, _SIGNATURE_LEAK_MAG, _SIGNATURE_SENSOR_NAME - _SIGNATURE_WN = wn - _SIGNATURE_LEAK_MAG = leak_mag - _SIGNATURE_SENSOR_NAME = sensor_name +def _signature_worker_init(inp_path, leak_mag, sensor_name, option_values): + global _SIGNATURE_WORKER_DATA + wn = wntr.network.WaterNetworkModel(inp_path) + _apply_hydraulic_options(wn, option_values) + _SIGNATURE_WORKER_DATA = { + "wn": wn, + "leak_mag": leak_mag, + "sensor_name": sensor_name, + } -def _run_signature_for_center(center_name): +def _signature_worker_run_center(center_name): + data = _SIGNATURE_WORKER_DATA + temp_prefix = _make_temp_prefix(f"sig_worker_{center_name}") _, pressure_output = leak_simulation_pipe_dd_multi_pf( - _SIGNATURE_WN, _SIGNATURE_LEAK_MAG, center_name, _SIGNATURE_SENSOR_NAME + data["wn"], + data["leak_mag"], + center_name, + data["sensor_name"], + file_prefix=temp_prefix, ) return center_name, pressure_output.to_numpy()