From d0abad3c6556fdf74bc7a8fd71531cc5998774dc Mon Sep 17 00:00:00 2001 From: Jiang Date: Tue, 3 Mar 2026 16:29:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8pymoo=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E9=81=97=E4=BC=A0=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/algorithms/leakage_identifier.py | 349 ++++++++++++++------------- app/native/api/s40_schema.py | 31 +-- requirements.txt | 1 + 3 files changed, 197 insertions(+), 184 deletions(-) diff --git a/app/algorithms/leakage_identifier.py b/app/algorithms/leakage_identifier.py index 697be86..8717dc4 100644 --- a/app/algorithms/leakage_identifier.py +++ b/app/algorithms/leakage_identifier.py @@ -2,17 +2,17 @@ import wntr import numpy as np import pandas as pd import os +import time import argparse from typing import Any, List, Dict, Union -try: - import geatpy as ea - - _GEATPY_IMPORT_ERROR = None - -except Exception as import_error: - ea = None - _GEATPY_IMPORT_ERROR = import_error +from pymoo.core.problem import Problem +from pymoo.core.callback import Callback +from pymoo.algorithms.soo.nonconvex.ga import GA +from pymoo.operators.crossover.sbx import SBX +from pymoo.operators.mutation.pm import PM +from pymoo.optimize import minimize as pymoo_minimize +from pymoo.termination.default import DefaultSingleObjectiveTermination class LeakageIdentifier: @@ -175,6 +175,8 @@ class LeakageIdentifier: max_gen: int = 100, output_flow_unit: str = "m3/s", save_result: bool = True, + ftol: float = 1e-3, + ftol_period: int = 15, ): """ 运行遗传算法以识别漏损分布。 @@ -184,19 +186,15 @@ class LeakageIdentifier: output_dir: 结果保存目录。 pop_size: GA 的种群大小。 max_gen: GA 的最大代数。 + output_flow_unit: 输出漏损流量的单位。 save_result: 是否保存识别结果到本地 CSV。 + ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。 + ftol_period: 收敛检测的窗口代数。 """ - if ea is None: - raise ImportError( - "geatpy 无法导入,无法运行识别。请安装兼容版本或修复依赖。" - ) from _GEATPY_IMPORT_ERROR if save_result: os.makedirs(output_dir, exist_ok=True) # 加载观测数据 - # 假设观测数据格式:行=时间,列=传感器(或原始格式) - # 原始代码: scadapd = pd.read_csv(scada); Rs = scadapd.values[1 : Rlen + 1, :] - # 我们需要将其标准化。假设带有标题的标准 CSV 或匹配原始格式。 if isinstance(observed_pressure_data, str): obs_df = pd.read_csv(observed_pressure_data) observed_name = os.path.basename(observed_pressure_data) @@ -206,11 +204,8 @@ class LeakageIdentifier: else: obs_df = pd.DataFrame(observed_pressure_data) observed_name = "observed_pressure.csv" - # 提取特定传感器和时间步长的观测压力 - # 这部分需要与数据存储方式对齐。 - # 对于此重构,我们将遵循读取对应模拟步骤值的原始逻辑。 - # 准备问题实例 + # 准备 pymoo 问题实例 problem = LeakageProblem( self.wn, self.nodes_by_area, @@ -220,30 +215,44 @@ class LeakageIdentifier: q_sum=self.q_sum, ) - # 配置算法 - algorithm = ea.soea_SEGA_templet( - problem, ea.Population(Encoding="RI", NIND=pop_size) + # 配置 pymoo GA 算法 + n_var = self.num_areas + algorithm = GA( + pop_size=pop_size, + crossover=SBX(prob=0.9, eta=15), + mutation=PM(prob=1.0 / max(1, n_var), eta=20), + eliminate_duplicates=False, ) - algorithm.MAXGEN = max_gen - algorithm.mutOper.Pm = 0.5 # 变异概率 (for Real Encoding, this is usually probability per individual or variable) - algorithm.recOper.XOVR = 0.9 # 交叉概率 - algorithm.logTras = 1 # 每代记录一次 - # 运行 - res = ea.optimize( + # 终止条件:收敛检测 + 最大代数 + termination = DefaultSingleObjectiveTermination( + ftol=ftol, + period=ftol_period, + n_max_gen=max_gen, + ) + + # 回调:记录每代信息 + callback = _ProgressCallback() + + t0 = time.time() + res = pymoo_minimize( + problem, algorithm, + termination, + seed=42, verbose=True, - drawing=0, - outputMsg=True, - drawLog=False, - saveFlag=False, + callback=callback, ) + elapsed = time.time() - t0 - # 保存结果 - best_ind = res["Vars"][0] # 最优个体(漏损比例) - best_obj = res["ObjV"][0][0] # 最优目标函数值 + # 提取最优解 + best_ind = res.X # 最优个体(漏损比例原始值) + best_obj = float(res.F[0]) - print(f"优化完成。最佳目标值: {best_obj}") + # 输出终止信息 + print(f"\n优化完成。耗时: {elapsed:.1f}s") + print(f"总代数: {res.algorithm.n_gen}, 总评估次数: {problem._eval_count}") + print(f"最佳目标值: {best_obj:.6f}") # 保存到文件 effective_ratio_map = self._effective_area_ratios( @@ -281,168 +290,168 @@ class LeakageIdentifier: return result_df -if ea is not None: +class _ProgressCallback(Callback): + """每代回调:记录进度。""" - class LeakageProblem(ea.Problem): - def __init__( - self, - wn, - nodes_by_area, - area_ids, - sensor_nodes, - observed_data, - q_sum, - ): - name = "LeakageIdentification" - M = 1 # 单目标 - maxormins = [1] # 最小化 - Dim = len(area_ids) # 维度 = 区域数量 - varTypes = [0] * Dim # 连续变量 - lb = [0] * Dim # 下界 0 - ub = [1] * Dim # 上界 1 - lbin = [1] * Dim # 包含下界 - ubin = [1] * Dim # 包含上界 + def __init__(self): + super().__init__() + self.gen_times = [] + self._t_last = None - super().__init__(name, M, maxormins, Dim, varTypes, lb, ub, lbin, ubin) + def notify(self, algorithm): + now = time.time() + if self._t_last is not None: + self.gen_times.append(now - self._t_last) + self._t_last = now - self.wn = wn - self.nodes_by_area = nodes_by_area - self.area_ids = area_ids - self.sensor_nodes = sensor_nodes - self.q_sum = q_sum - # 预处理观测数据以匹配模拟格式 - # 提取对应于传感器节点的列 - try: - # 检查列是否存在 - missing_sensors = [ - s for s in self.sensor_nodes if s not in observed_data.columns - ] - if not missing_sensors: - self.obs_matrix = observed_data[self.sensor_nodes].values - else: - # 回退:如果标题不匹配,假设列顺序与传感器节点一致 - self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)] - except Exception: +class LeakageProblem(Problem): + """pymoo 批量评估问题定义。 + + 搜索空间:n 维 [0, 1] 实数 -> 通过 _effective_area_ratios 归一化到单纯形。 + 目标:模拟压力与观测压力之间的归一化误差范数。 + 无显式约束(sum=1 由归一化自动保证)。 + """ + + def __init__( + self, + wn, + nodes_by_area, + area_ids, + sensor_nodes, + observed_data, + q_sum: float = 0.2, + ): + n_var = len(area_ids) + + super().__init__( + n_var=n_var, + n_obj=1, + n_ieq_constr=0, + xl=np.zeros(n_var), + xu=np.ones(n_var), + ) + + self.wn = wn + self.nodes_by_area = nodes_by_area + self.area_ids = area_ids + self.sensor_nodes = sensor_nodes + self.q_sum = q_sum + + # 预处理观测数据以匹配模拟格式 + try: + missing_sensors = [ + s for s in self.sensor_nodes if s not in observed_data.columns + ] + if not missing_sensors: + self.obs_matrix = observed_data[self.sensor_nodes].values + else: self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)] + except Exception: + self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)] - duration_sec = float(self.wn.options.time.duration) - step_sec = float(self.wn.options.time.hydraulic_timestep) - if step_sec > 0: - max_steps = int(duration_sec / step_sec) + 1 - self.obs_matrix = self.obs_matrix[:max_steps, :] + duration_sec = float(self.wn.options.time.duration) + step_sec = float(self.wn.options.time.hydraulic_timestep) + if step_sec > 0: + max_steps = int(duration_sec / step_sec) + 1 + self.obs_matrix = self.obs_matrix[:max_steps, :] - # 预先缓存每个区域的需水对象,减少每次适应度计算的节点查找 - self.demand_objs_by_area = {} - for area_id in self.area_ids: - demand_objs = [] - for node_name in self.nodes_by_area.get(area_id, []): - if node_name not in self.wn.node_name_list: - continue - node = self.wn.get_node(node_name) - if ( - hasattr(node, "demand_timeseries_list") - and len(node.demand_timeseries_list) > 0 - ): - demand_objs.append(node.demand_timeseries_list[0]) - self.demand_objs_by_area[area_id] = demand_objs + # 预先缓存每个区域的需水对象,减少每次适应度计算的节点查找 + self.demand_objs_by_area = {} + for area_id in self.area_ids: + demand_objs = [] + for node_name in self.nodes_by_area.get(area_id, []): + if node_name not in self.wn.node_name_list: + continue + node = self.wn.get_node(node_name) + if ( + hasattr(node, "demand_timeseries_list") + and len(node.demand_timeseries_list) > 0 + ): + demand_objs.append(node.demand_timeseries_list[0]) + self.demand_objs_by_area[area_id] = demand_objs - self.allocatable_counts = { - area_id: len(self.demand_objs_by_area.get(area_id, [])) - for area_id in self.area_ids - } - if not any(count > 0 for count in self.allocatable_counts.values()): - raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") + self.allocatable_counts = { + area_id: len(self.demand_objs_by_area.get(area_id, [])) + for area_id in self.area_ids + } + if not any(count > 0 for count in self.allocatable_counts.values()): + raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") - def aimFunc(self, pop): - Vars = pop.Phen # 种群表现型(实数值) - NIND = Vars.shape[0] - ObjV = np.zeros((NIND, 1)) + # 评估计数器(诊断用) + self._eval_count = 0 - for i in range(NIND): - leak_ratios = Vars[i, :] + def _evaluate(self, X, out, *args, **kwargs): + """批量评估种群。 - # 1. 将漏损分布应用到模型 - effective_ratio_map = LeakageIdentifier._effective_area_ratios( - leak_ratios, - self.area_ids, - self.nodes_by_area, - allocatable_counts=self.allocatable_counts, - ) + X: 形状 (pop_size, n_var) 的决策变量矩阵。 + """ + n_pop = X.shape[0] + self._eval_count += n_pop - # 此时跟踪修改以便稍后恢复 - modifications = [] # (node_obj, original_base_demand) 列表 + F = np.zeros((n_pop, 1)) + for i in range(n_pop): + F[i, 0] = self._evaluate_single(X[i]) + out["F"] = F - for j, area_id in enumerate(self.area_ids): - ratio = effective_ratio_map.get(area_id, 0.0) - if ratio <= 0: - continue + def _evaluate_single(self, x): + """评估单个个体,返回归一化误差范数。""" + leak_ratios = x - demand_objs = self.demand_objs_by_area.get(area_id, []) - if not demand_objs: - continue + # 将漏损分布归一化 + effective_ratio_map = LeakageIdentifier._effective_area_ratios( + leak_ratios, + self.area_ids, + self.nodes_by_area, + allocatable_counts=self.allocatable_counts, + ) - # 将漏损分配给区域内的节点 - per_node_leak = self.q_sum * ratio / len(demand_objs) + # 跟踪修改以便稍后恢复 + modifications = [] - for demand_obj in demand_objs: - original_val = demand_obj.base_value - demand_obj.base_value = original_val + per_node_leak - modifications.append((demand_obj, original_val)) + for j, area_id in enumerate(self.area_ids): + ratio = effective_ratio_map.get(area_id, 0.0) + if ratio <= 0: + continue - # 2. 运行模拟 - try: - sim = wntr.sim.EpanetSimulator(self.wn) - results = sim.run_sim() + demand_objs = self.demand_objs_by_area.get(area_id, []) + if not demand_objs: + continue - # 3. 计算目标函数(误差) - sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes] + per_node_leak = self.q_sum * ratio / len(demand_objs) - # 对齐维度 - # 仅比较重叠的时间步长 - n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0]) - sim_vals = sim_pressure.values[:n_steps, :] - obs_vals = self.obs_matrix[:n_steps, :] + for demand_obj in demand_objs: + original_val = demand_obj.base_value + demand_obj.base_value = original_val + per_node_leak + modifications.append((demand_obj, original_val)) - # 差值 - diff = sim_vals - obs_vals + try: + sim = wntr.sim.EpanetSimulator(self.wn) + results = sim.run_sim() - # 按行最大值归一化(根据原始代码逻辑) - # R1 = R0 - Rs - # Rmax = R1.max(axis=1) -> 该时间步长的最大绝对差值? - # 原始代码: Rmax = R1.max(axis=1) (该时间步长所有传感器的最大值) - # 注意:如果最大差值为 0 或负数,原始代码逻辑可能有缺陷,使用绝对最大值更安全 - # Rmax = Rmax.reshape(-1, 1) - # R = R1 / Rmax + sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes] - # 计算每个时间步长的最大差值 - row_max = np.max(np.abs(diff), axis=1, keepdims=True) - row_max[row_max == 0] = 1.0 # 防止除以零 + n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0]) + sim_vals = sim_pressure.values[:n_steps, :] + obs_vals = self.obs_matrix[:n_steps, :] - normalized_diff = diff / row_max + diff = sim_vals - obs_vals - # 目标:归一化差值矩阵的 2-范数 - error = np.linalg.norm(normalized_diff) - ObjV[i] = error + # 按行最大值归一化 + row_max = np.max(np.abs(diff), axis=1, keepdims=True) + row_max[row_max == 0] = 1.0 # 防止除以零 - except Exception: - ObjV[i] = 1e9 # 失败时给予高惩罚 + normalized_diff = diff / row_max - # 4. 恢复模型更改 - for demand_obj, original_val in modifications: - demand_obj.base_value = original_val + # 目标:归一化差值矩阵的 2-范数 + return float(np.linalg.norm(normalized_diff)) - pop.ObjV = ObjV + except Exception: + return 1e9 - pass - -else: - - class LeakageProblem: - def __init__(self, *args, **kwargs): - raise ImportError( - "geatpy 无法导入,LeakageProblem 不可用。" - ) from _GEATPY_IMPORT_ERROR + finally: + for demand_obj, original_val in modifications: + demand_obj.base_value = original_val def main() -> int: diff --git a/app/native/api/s40_schema.py b/app/native/api/s40_schema.py index be21ffb..4a646ee 100644 --- a/app/native/api/s40_schema.py +++ b/app/native/api/s40_schema.py @@ -1,14 +1,17 @@ from .database import * from .s0_base import * + def get_scheme_schema(name: str) -> dict[str, dict[Any, Any]]: - return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, - 'name' : {'type': 'str' , 'optional': False , 'readonly': False}, - 'type' : {'type': 'str' , 'optional': False , 'readonly': False}, - "create_time": {'type': 'str' , 'optional': False , 'readonly': True }, - "start_time" : {'type': 'str' , 'optional': False , 'readonly': True }, - "detail" : {'type': 'str' , 'optional': False , 'readonly': True } } - + return { + "id": {"type": "str", "optional": False, "readonly": True}, + "name": {"type": "str", "optional": False, "readonly": False}, + "type": {"type": "str", "optional": False, "readonly": False}, + "create_time": {"type": "str", "optional": False, "readonly": True}, + "start_time": {"type": "str", "optional": False, "readonly": True}, + "detail": {"type": "str", "optional": False, "readonly": True}, + } + def get_scheme(name: str, schema_name: str) -> dict[Any, Any]: t = try_read(name, f"select * from scheme_list where scheme_name = '{schema_name}'") @@ -16,15 +19,15 @@ def get_scheme(name: str, schema_name: str) -> dict[Any, Any]: return {} d = {} - d['id'] = str(t['scheme_id']) - d['name'] = str(t['scheme_name']) - d['type'] = str(t['scheme_type']) - d['create_time'] = str(t['create_time']) - d['start_time'] = str(t['start_time']) - d['detail'] = str(t['detail']) + d["id"] = str(t["scheme_id"]) + d["name"] = str(t["scheme_name"]) + d["type"] = str(t["scheme_type"]) + d["create_time"] = str(t["create_time"]) + d["start_time"] = str(t["start_time"]) + d["detail"] = str(t["detail"]) return d + def get_all_schemes(name: str) -> list[dict[Any, Any]]: return read_all(name, "select * from scheme_list") - diff --git a/requirements.txt b/requirements.txt index 234d2f0..9391bd3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -165,3 +165,4 @@ wntr==1.3.2 wrapt==1.17.3 zipp==3.23.0 zmq==0.0.0 +pymoo==0.6.1.6 \ No newline at end of file