优化漏损识别器,支持多进程评估

This commit is contained in:
2026-03-05 18:18:28 +08:00
parent b8aee14c00
commit 63d3458fb4
8 changed files with 425 additions and 182 deletions
+218 -69
View File
@@ -1,10 +1,11 @@
import wntr
import numpy as np
import pandas as pd
import os
import time
import argparse
from typing import Any, List, Dict, Union
import pandas as pd
import os
import time
import argparse
from multiprocessing import Pool, cpu_count
from typing import Any, List, Dict, Union
from pymoo.core.problem import Problem
from pymoo.core.callback import Callback
@@ -12,10 +13,123 @@ from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.optimize import minimize as pymoo_minimize
from pymoo.termination.default import DefaultSingleObjectiveTermination
class LeakageIdentifier:
from pymoo.termination.default import DefaultSingleObjectiveTermination
_worker_data: dict[str, Any] = {}
DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4))
def _cleanup_temp_files(prefix: str) -> None:
for ext in [".inp", ".rpt", ".bin", ".out"]:
temp_file = prefix + ext
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except OSError:
pass
def _worker_init(
inp_path: str,
sensor_nodes: list[str],
area_ids: list[str],
nodes_by_area: dict[str, list[str]],
obs_matrix: np.ndarray,
q_sum: float,
duration_sec: float,
timestep_sec: float,
) -> None:
global _worker_data
wn = wntr.network.WaterNetworkModel(inp_path)
wn.options.hydraulic.demand_model = "DD"
wn.options.time.duration = duration_sec
wn.options.time.hydraulic_timestep = timestep_sec
wn.options.time.pattern_timestep = timestep_sec
wn.options.time.report_timestep = timestep_sec
demand_objs_by_area = {}
allocatable_counts = {}
for area_id in area_ids:
demand_objs = []
for node_name in nodes_by_area.get(area_id, []):
if node_name not in wn.node_name_list:
continue
node = wn.get_node(node_name)
if (
hasattr(node, "demand_timeseries_list")
and len(node.demand_timeseries_list) > 0
):
demand_objs.append(node.demand_timeseries_list[0])
demand_objs_by_area[area_id] = demand_objs
allocatable_counts[area_id] = len(demand_objs)
_worker_data = {
"wn": wn,
"sensor_nodes": sensor_nodes,
"area_ids": area_ids,
"nodes_by_area": nodes_by_area,
"demand_objs_by_area": demand_objs_by_area,
"allocatable_counts": allocatable_counts,
"obs_matrix": obs_matrix,
"q_sum": q_sum,
}
def _worker_evaluate(raw_ratios: np.ndarray) -> float:
d = _worker_data
effective_ratio_map = LeakageIdentifier._effective_area_ratios(
raw_ratios,
d["area_ids"],
d["nodes_by_area"],
allocatable_counts=d["allocatable_counts"],
)
modifications = []
for area_id in d["area_ids"]:
ratio = effective_ratio_map.get(area_id, 0.0)
if ratio <= 0:
continue
demand_objs = d["demand_objs_by_area"].get(area_id, [])
if not demand_objs:
continue
per_node_leak = d["q_sum"] * ratio / len(demand_objs)
for demand_obj in demand_objs:
original_val = demand_obj.base_value
demand_obj.base_value = original_val + per_node_leak
modifications.append((demand_obj, original_val))
temp_dir = os.path.abspath(os.path.join("temp", "leakage"))
os.makedirs(temp_dir, exist_ok=True)
prefix = os.path.join(temp_dir, f"temp_{os.getpid()}")
try:
sim = wntr.sim.EpanetSimulator(d["wn"])
results = sim.run_sim(file_prefix=prefix)
sim_pressure = results.node["pressure"].loc[:, d["sensor_nodes"]]
n_steps = min(sim_pressure.shape[0], d["obs_matrix"].shape[0])
sim_vals = sim_pressure.values[:n_steps, :]
obs_vals = d["obs_matrix"][:n_steps, :]
diff = sim_vals - obs_vals
row_max = np.max(np.abs(diff), axis=1, keepdims=True)
row_max[row_max == 0] = 1.0
normalized_diff = diff / row_max
return float(np.linalg.norm(normalized_diff))
except Exception:
return 1e9
finally:
for demand_obj, original_val in modifications:
demand_obj.base_value = original_val
_cleanup_temp_files(prefix)
class LeakageIdentifier:
FLOW_UNIT_TO_M3S = {
"m3/s": 1.0,
"m3/h": 1.0 / 3600.0,
@@ -165,19 +279,20 @@ class LeakageIdentifier:
df = pd.read_csv(path, dtype={"ID": str, "Area": str})
return self._normalize_area_map_df(df)
def run_identification(
self,
observed_pressure_data: Union[
str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]
],
def run_identification(
self,
observed_pressure_data: Union[
str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]
],
output_dir: str = "Results",
pop_size: int = 50,
max_gen: int = 100,
output_flow_unit: str = "m3/s",
save_result: bool = True,
ftol: float = 1e-3,
ftol_period: int = 15,
):
save_result: bool = True,
ftol: float = 1e-3,
ftol_period: int = 15,
n_workers: int = DEFAULT_N_WORKERS,
):
"""
运行遗传算法以识别漏损分布。
@@ -187,10 +302,11 @@ class LeakageIdentifier:
pop_size: GA 的种群大小。
max_gen: GA 的最大代数。
output_flow_unit: 输出漏损流量的单位。
save_result: 是否保存识别结果到本地 CSV。
ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。
ftol_period: 收敛检测的窗口代数。
"""
save_result: 是否保存识别结果到本地 CSV。
ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。
ftol_period: 收敛检测的窗口代数。
n_workers: 并行工作进程数(1=串行,>1=并行评估)。
"""
if save_result:
os.makedirs(output_dir, exist_ok=True)
@@ -206,14 +322,16 @@ class LeakageIdentifier:
observed_name = "observed_pressure.csv"
# 准备 pymoo 问题实例
problem = LeakageProblem(
self.wn,
self.nodes_by_area,
self.area_ids,
self.sensor_nodes,
obs_df,
q_sum=self.q_sum,
)
problem = LeakageProblem(
self.wn,
self.nodes_by_area,
self.area_ids,
self.sensor_nodes,
obs_df,
q_sum=self.q_sum,
n_workers=n_workers,
inp_path=os.path.abspath(self.inp_path),
)
# 配置 pymoo GA 算法
n_var = self.num_areas
@@ -234,16 +352,19 @@ class LeakageIdentifier:
# 回调:记录每代信息
callback = _ProgressCallback()
t0 = time.time()
res = pymoo_minimize(
problem,
algorithm,
termination,
seed=42,
verbose=True,
callback=callback,
)
elapsed = time.time() - t0
t0 = time.time()
try:
res = pymoo_minimize(
problem,
algorithm,
termination,
seed=42,
verbose=True,
callback=callback,
)
finally:
problem.close()
elapsed = time.time() - t0
# 提取最优解
best_ind = res.X # 最优个体(漏损比例原始值)
@@ -305,7 +426,7 @@ class _ProgressCallback(Callback):
self._t_last = now
class LeakageProblem(Problem):
class LeakageProblem(Problem):
"""pymoo 批量评估问题定义。
搜索空间:n 维 [0, 1] 实数 -> 通过 _effective_area_ratios 归一化到单纯形。
@@ -313,15 +434,17 @@ class LeakageProblem(Problem):
无显式约束(sum=1 由归一化自动保证)。
"""
def __init__(
self,
wn,
nodes_by_area,
area_ids,
sensor_nodes,
observed_data,
q_sum: float = 0.2,
):
def __init__(
self,
wn,
nodes_by_area,
area_ids,
sensor_nodes,
observed_data,
q_sum: float = 0.2,
n_workers: int = DEFAULT_N_WORKERS,
inp_path: str | None = None,
):
n_var = len(area_ids)
super().__init__(
@@ -335,8 +458,10 @@ class LeakageProblem(Problem):
self.wn = wn
self.nodes_by_area = nodes_by_area
self.area_ids = area_ids
self.sensor_nodes = sensor_nodes
self.q_sum = q_sum
self.sensor_nodes = sensor_nodes
self.q_sum = q_sum
self.n_workers = max(1, int(n_workers))
self.inp_path = inp_path
# 预处理观测数据以匹配模拟格式
try:
@@ -375,11 +500,31 @@ class LeakageProblem(Problem):
area_id: len(self.demand_objs_by_area.get(area_id, []))
for area_id in self.area_ids
}
if not any(count > 0 for count in self.allocatable_counts.values()):
raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。")
# 评估计数器(诊断用)
self._eval_count = 0
if not any(count > 0 for count in self.allocatable_counts.values()):
raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。")
# 评估计数器(诊断用)
self._eval_count = 0
self._pool = None
if self.n_workers > 1:
if not self.inp_path:
raise ValueError("并行评估需要提供 inp_path。")
duration_sec = float(self.wn.options.time.duration)
timestep_sec = float(self.wn.options.time.hydraulic_timestep)
self._pool = Pool(
processes=self.n_workers,
initializer=_worker_init,
initargs=(
self.inp_path,
list(self.sensor_nodes),
list(self.area_ids),
{k: list(v) for k, v in self.nodes_by_area.items()},
self.obs_matrix.copy(),
self.q_sum,
duration_sec,
timestep_sec,
),
)
def _evaluate(self, X, out, *args, **kwargs):
"""批量评估种群。
@@ -389,10 +534,15 @@ class LeakageProblem(Problem):
n_pop = X.shape[0]
self._eval_count += n_pop
F = np.zeros((n_pop, 1))
for i in range(n_pop):
F[i, 0] = self._evaluate_single(X[i])
out["F"] = F
if self._pool is not None:
results = self._pool.map(_worker_evaluate, [X[i] for i in range(n_pop)])
out["F"] = np.array(results, dtype=float).reshape(-1, 1)
return
F = np.zeros((n_pop, 1))
for i in range(n_pop):
F[i, 0] = self._evaluate_single(X[i])
out["F"] = F
def _evaluate_single(self, x):
"""评估单个个体,返回归一化误差范数。"""
@@ -457,14 +607,13 @@ class LeakageProblem(Problem):
for demand_obj, original_val in modifications:
demand_obj.base_value = original_val
# 操作完成后删除临时文件
for ext in [".inp", ".rpt", ".bin", ".out"]:
temp_file = prefix + ext
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except OSError:
pass
_cleanup_temp_files(prefix)
def close(self) -> None:
if self._pool is not None:
self._pool.close()
self._pool.join()
self._pool = None
def main() -> int: