增加 wn_inp_path 参数以支持多进程处理
This commit is contained in:
@@ -179,6 +179,7 @@ def run_burst_location(
|
|||||||
located_pipe, elapsed_seconds, simulation_times, _, similarity_series = (
|
located_pipe, elapsed_seconds, simulation_times, _, similarity_series = (
|
||||||
DN_search_multi_simple_add_flow_count_new(
|
DN_search_multi_simple_add_flow_count_new(
|
||||||
wn=wn,
|
wn=wn,
|
||||||
|
wn_inp_path=str(inp_path),
|
||||||
G0=G0,
|
G0=G0,
|
||||||
all_node=all_node,
|
all_node=all_node,
|
||||||
node_x=node_x,
|
node_x=node_x,
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ from .similarity_calculator import (
|
|||||||
|
|
||||||
def _ensure_signatures_for_centers(
|
def _ensure_signatures_for_centers(
|
||||||
wn,
|
wn,
|
||||||
|
wn_inp_path,
|
||||||
center_list, # 本轮要用到的中心(list[str])
|
center_list, # 本轮要用到的中心(list[str])
|
||||||
pressure_leak_all,
|
pressure_leak_all,
|
||||||
flow_leak_all, # 全量缓存(可为空 DF)
|
flow_leak_all, # 全量缓存(可为空 DF)
|
||||||
@@ -71,6 +72,7 @@ def _ensure_signatures_for_centers(
|
|||||||
timestep_list,
|
timestep_list,
|
||||||
sensor_name_all,
|
sensor_name_all,
|
||||||
n_workers=n_workers,
|
n_workers=n_workers,
|
||||||
|
wn_inp_path=wn_inp_path,
|
||||||
)
|
)
|
||||||
# 初始化空缓存时,做一次“同构化”
|
# 初始化空缓存时,做一次“同构化”
|
||||||
if pressure_leak_all is None or len(pressure_leak_all) == 0:
|
if pressure_leak_all is None or len(pressure_leak_all) == 0:
|
||||||
@@ -296,6 +298,7 @@ def cal_SD(located_pipe, real_pipe, pipe_x, pipe_y):
|
|||||||
|
|
||||||
def DN_search_multi_simple_add_flow_count_new(
|
def DN_search_multi_simple_add_flow_count_new(
|
||||||
wn,
|
wn,
|
||||||
|
wn_inp_path,
|
||||||
G0,
|
G0,
|
||||||
all_node,
|
all_node,
|
||||||
node_x,
|
node_x,
|
||||||
@@ -390,6 +393,7 @@ def DN_search_multi_simple_add_flow_count_new(
|
|||||||
pressure_leak, flow_leak, pressure_leak_all, flow_leak_all = (
|
pressure_leak, flow_leak, pressure_leak_all, flow_leak_all = (
|
||||||
_ensure_signatures_for_centers(
|
_ensure_signatures_for_centers(
|
||||||
wn=wn,
|
wn=wn,
|
||||||
|
wn_inp_path=wn_inp_path,
|
||||||
center_list=candidate_center_list,
|
center_list=candidate_center_list,
|
||||||
pressure_leak_all=pressure_leak_all,
|
pressure_leak_all=pressure_leak_all,
|
||||||
flow_leak_all=flow_leak_all,
|
flow_leak_all=flow_leak_all,
|
||||||
@@ -440,6 +444,7 @@ def DN_search_multi_simple_add_flow_count_new(
|
|||||||
pressure_add, flow_add, pressure_leak_all, flow_leak_all = (
|
pressure_add, flow_add, pressure_leak_all, flow_leak_all = (
|
||||||
_ensure_signatures_for_centers(
|
_ensure_signatures_for_centers(
|
||||||
wn=wn,
|
wn=wn,
|
||||||
|
wn_inp_path=wn_inp_path,
|
||||||
center_list=add_center,
|
center_list=add_center,
|
||||||
pressure_leak_all=pressure_leak_all,
|
pressure_leak_all=pressure_leak_all,
|
||||||
flow_leak_all=flow_leak_all,
|
flow_leak_all=flow_leak_all,
|
||||||
@@ -555,6 +560,7 @@ def DN_search_multi_simple_add_flow_count_new(
|
|||||||
pressure_leak_sp, flow_leak_sp, pressure_leak_all, flow_leak_all = (
|
pressure_leak_sp, flow_leak_sp, pressure_leak_all, flow_leak_all = (
|
||||||
_ensure_signatures_for_centers(
|
_ensure_signatures_for_centers(
|
||||||
wn=wn,
|
wn=wn,
|
||||||
|
wn_inp_path=wn_inp_path,
|
||||||
center_list=final_area_pipe, # 这次要用的“最终区域里的所有管段”
|
center_list=final_area_pipe, # 这次要用的“最终区域里的所有管段”
|
||||||
pressure_leak_all=pressure_leak_all, # 累积缓存(会被更新)
|
pressure_leak_all=pressure_leak_all, # 累积缓存(会被更新)
|
||||||
flow_leak_all=flow_leak_all,
|
flow_leak_all=flow_leak_all,
|
||||||
|
|||||||
@@ -2,15 +2,55 @@
|
|||||||
|
|
||||||
import math
|
import math
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import wntr
|
import wntr
|
||||||
|
|
||||||
_PIPE2LEAKNODE = None
|
_PIPE2LEAKNODE = None
|
||||||
_SIGNATURE_WN = None
|
_SIGNATURE_WORKER_DATA = {}
|
||||||
_SIGNATURE_LEAK_MAG = None
|
|
||||||
_SIGNATURE_SENSOR_NAME = None
|
|
||||||
|
def _cleanup_temp_files(prefix):
|
||||||
|
for ext in [".inp", ".rpt", ".bin", ".out"]:
|
||||||
|
temp_file = prefix + ext
|
||||||
|
if os.path.exists(temp_file):
|
||||||
|
try:
|
||||||
|
os.remove(temp_file)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def _make_temp_prefix(tag):
|
||||||
|
temp_dir = os.path.abspath(os.path.join("temp", "burst_location"))
|
||||||
|
os.makedirs(temp_dir, exist_ok=True)
|
||||||
|
safe_tag = str(tag).replace(os.sep, "_").replace(" ", "_")
|
||||||
|
return os.path.join(temp_dir, f"{safe_tag}_{os.getpid()}")
|
||||||
|
|
||||||
|
|
||||||
|
def _snapshot_hydraulic_options(wn):
|
||||||
|
options = wn.options
|
||||||
|
return {
|
||||||
|
"demand_model": options.hydraulic.demand_model,
|
||||||
|
"duration": float(options.time.duration),
|
||||||
|
"hydraulic_timestep": float(options.time.hydraulic_timestep),
|
||||||
|
"pattern_timestep": float(options.time.pattern_timestep),
|
||||||
|
"report_timestep": float(options.time.report_timestep),
|
||||||
|
"required_pressure": float(options.hydraulic.required_pressure),
|
||||||
|
"minimum_pressure": float(options.hydraulic.minimum_pressure),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _apply_hydraulic_options(wn, option_values):
|
||||||
|
options = wn.options
|
||||||
|
options.hydraulic.demand_model = option_values["demand_model"]
|
||||||
|
options.time.duration = option_values["duration"]
|
||||||
|
options.time.hydraulic_timestep = option_values["hydraulic_timestep"]
|
||||||
|
options.time.pattern_timestep = option_values["pattern_timestep"]
|
||||||
|
options.time.report_timestep = option_values["report_timestep"]
|
||||||
|
options.hydraulic.required_pressure = option_values["required_pressure"]
|
||||||
|
options.hydraulic.minimum_pressure = option_values["minimum_pressure"]
|
||||||
|
|
||||||
|
|
||||||
def simple_add_leak(wn, leak_mag, leak_pipe):
|
def simple_add_leak(wn, leak_mag, leak_pipe):
|
||||||
@@ -209,7 +249,9 @@ def ensure_mid_node(wn, leak_pipe):
|
|||||||
raise KeyError(f"Cannot ensure mid node for pipe '{leak_pipe}'.")
|
raise KeyError(f"Cannot ensure mid node for pipe '{leak_pipe}'.")
|
||||||
|
|
||||||
|
|
||||||
def leak_simulation_pipe_dd_multi_pf(wn, leak_mag, leak_pipe, sensor_name):
|
def leak_simulation_pipe_dd_multi_pf(
|
||||||
|
wn, leak_mag, leak_pipe, sensor_name, file_prefix=None
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
优化版:
|
优化版:
|
||||||
- 不再 remove/add link/node
|
- 不再 remove/add link/node
|
||||||
@@ -233,7 +275,10 @@ def leak_simulation_pipe_dd_multi_pf(wn, leak_mag, leak_pipe, sensor_name):
|
|||||||
|
|
||||||
# 仿真
|
# 仿真
|
||||||
sim = wntr.sim.EpanetSimulator(wn)
|
sim = wntr.sim.EpanetSimulator(wn)
|
||||||
|
if file_prefix is None:
|
||||||
results = sim.run_sim()
|
results = sim.run_sim()
|
||||||
|
else:
|
||||||
|
results = sim.run_sim(file_prefix=file_prefix)
|
||||||
|
|
||||||
# 输出(保持列顺序)
|
# 输出(保持列顺序)
|
||||||
pressure_output = results.node["pressure"].loc[:, sensor_name]
|
pressure_output = results.node["pressure"].loc[:, sensor_name]
|
||||||
@@ -242,6 +287,8 @@ def leak_simulation_pipe_dd_multi_pf(wn, leak_mag, leak_pipe, sensor_name):
|
|||||||
finally:
|
finally:
|
||||||
# 关闭泄漏:还原 base_value
|
# 关闭泄漏:还原 base_value
|
||||||
ts_obj.base_value = orig_base
|
ts_obj.base_value = orig_base
|
||||||
|
if file_prefix is not None:
|
||||||
|
_cleanup_temp_files(file_prefix)
|
||||||
|
|
||||||
|
|
||||||
def prepare_leak_infrastructure(wn, candidate_pipes):
|
def prepare_leak_infrastructure(wn, candidate_pipes):
|
||||||
@@ -391,7 +438,13 @@ def cal_sum_demand(demand):
|
|||||||
|
|
||||||
|
|
||||||
def cal_signature_pipe_multi_pf(
|
def cal_signature_pipe_multi_pf(
|
||||||
wn, leak_mag, candidate_center, timestep_list, sensor_name, n_workers=1
|
wn,
|
||||||
|
leak_mag,
|
||||||
|
candidate_center,
|
||||||
|
timestep_list,
|
||||||
|
sensor_name,
|
||||||
|
n_workers=1,
|
||||||
|
wn_inp_path=None,
|
||||||
):
|
):
|
||||||
candidate_center_num = len(candidate_center)
|
candidate_center_num = len(candidate_center)
|
||||||
pressure_leak = pd.DataFrame(
|
pressure_leak = pd.DataFrame(
|
||||||
@@ -402,21 +455,41 @@ def cal_signature_pipe_multi_pf(
|
|||||||
# columns=sensor_f_name)
|
# columns=sensor_f_name)
|
||||||
pressure_leak = pressure_leak.sort_index()
|
pressure_leak = pressure_leak.sort_index()
|
||||||
# flow_leak = flow_leak.sort_index()
|
# flow_leak = flow_leak.sort_index()
|
||||||
can_fork = "fork" in mp.get_all_start_methods()
|
can_parallel = (
|
||||||
if n_workers > 1 and candidate_center_num > 1 and can_fork:
|
n_workers > 1
|
||||||
_set_signature_worker_context(wn, leak_mag, sensor_name)
|
and candidate_center_num > 1
|
||||||
|
and wn_inp_path is not None
|
||||||
|
and len(str(wn_inp_path)) > 0
|
||||||
|
)
|
||||||
|
if can_parallel:
|
||||||
|
option_values = _snapshot_hydraulic_options(wn)
|
||||||
worker_count = min(n_workers, candidate_center_num)
|
worker_count = min(n_workers, candidate_center_num)
|
||||||
with mp.get_context("fork").Pool(processes=worker_count) as pool:
|
start_methods = mp.get_all_start_methods()
|
||||||
|
context_name = "spawn" if "spawn" in start_methods else start_methods[0]
|
||||||
|
with mp.get_context(context_name).Pool(
|
||||||
|
processes=worker_count,
|
||||||
|
initializer=_signature_worker_init,
|
||||||
|
initargs=(
|
||||||
|
str(wn_inp_path),
|
||||||
|
float(leak_mag),
|
||||||
|
list(sensor_name),
|
||||||
|
option_values,
|
||||||
|
),
|
||||||
|
) as pool:
|
||||||
for i, (center_name, pressure_array) in enumerate(
|
for i, (center_name, pressure_array) in enumerate(
|
||||||
pool.imap(_run_signature_for_center, candidate_center)
|
pool.imap(_signature_worker_run_center, candidate_center)
|
||||||
):
|
):
|
||||||
pressure_leak.loc[(center_name, slice(None)), :] = pressure_array
|
pressure_leak.loc[(center_name, slice(None)), :] = pressure_array
|
||||||
sys.stdout.write("\r" + "已经完成计算" + str(i + 1) + "个特征中心")
|
sys.stdout.write("\r" + "已经完成计算" + str(i + 1) + "个特征中心")
|
||||||
_set_signature_worker_context(None, None, None)
|
|
||||||
else:
|
else:
|
||||||
for i in range(candidate_center_num):
|
for i in range(candidate_center_num):
|
||||||
|
temp_prefix = _make_temp_prefix(f"sig_{i}")
|
||||||
wn, pressure_output = leak_simulation_pipe_dd_multi_pf(
|
wn, pressure_output = leak_simulation_pipe_dd_multi_pf(
|
||||||
wn, leak_mag, candidate_center[i], sensor_name
|
wn,
|
||||||
|
leak_mag,
|
||||||
|
candidate_center[i],
|
||||||
|
sensor_name,
|
||||||
|
file_prefix=temp_prefix,
|
||||||
)
|
)
|
||||||
# leak_or_not_list.append(leak_or_not)
|
# leak_or_not_list.append(leak_or_not)
|
||||||
pressure_leak.loc[(candidate_center[i], slice(None)), :] = (
|
pressure_leak.loc[(candidate_center[i], slice(None)), :] = (
|
||||||
@@ -427,16 +500,26 @@ def cal_signature_pipe_multi_pf(
|
|||||||
return pressure_leak, candidate_center
|
return pressure_leak, candidate_center
|
||||||
|
|
||||||
|
|
||||||
def _set_signature_worker_context(wn, leak_mag, sensor_name):
|
def _signature_worker_init(inp_path, leak_mag, sensor_name, option_values):
|
||||||
global _SIGNATURE_WN, _SIGNATURE_LEAK_MAG, _SIGNATURE_SENSOR_NAME
|
global _SIGNATURE_WORKER_DATA
|
||||||
_SIGNATURE_WN = wn
|
wn = wntr.network.WaterNetworkModel(inp_path)
|
||||||
_SIGNATURE_LEAK_MAG = leak_mag
|
_apply_hydraulic_options(wn, option_values)
|
||||||
_SIGNATURE_SENSOR_NAME = sensor_name
|
_SIGNATURE_WORKER_DATA = {
|
||||||
|
"wn": wn,
|
||||||
|
"leak_mag": leak_mag,
|
||||||
|
"sensor_name": sensor_name,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def _run_signature_for_center(center_name):
|
def _signature_worker_run_center(center_name):
|
||||||
|
data = _SIGNATURE_WORKER_DATA
|
||||||
|
temp_prefix = _make_temp_prefix(f"sig_worker_{center_name}")
|
||||||
_, pressure_output = leak_simulation_pipe_dd_multi_pf(
|
_, pressure_output = leak_simulation_pipe_dd_multi_pf(
|
||||||
_SIGNATURE_WN, _SIGNATURE_LEAK_MAG, center_name, _SIGNATURE_SENSOR_NAME
|
data["wn"],
|
||||||
|
data["leak_mag"],
|
||||||
|
center_name,
|
||||||
|
data["sensor_name"],
|
||||||
|
file_prefix=temp_prefix,
|
||||||
)
|
)
|
||||||
return center_name, pressure_output.to_numpy()
|
return center_name, pressure_output.to_numpy()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user