Files
TJWaterServerBinary/app/algorithms/leakage_identifier.py
T
2026-03-03 16:29:59 +08:00

500 lines
18 KiB
Python

import wntr
import numpy as np
import pandas as pd
import os
import time
import argparse
from typing import Any, List, Dict, Union
from pymoo.core.problem import Problem
from pymoo.core.callback import Callback
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.optimize import minimize as pymoo_minimize
from pymoo.termination.default import DefaultSingleObjectiveTermination
class LeakageIdentifier:
FLOW_UNIT_TO_M3S = {
"m3/s": 1.0,
"m3/h": 1.0 / 3600.0,
"L/s": 1.0 / 1000.0,
"L/min": 1.0 / 60000.0,
}
@classmethod
def _flow_to_m3s(cls, value: float, unit: str) -> float:
if unit not in cls.FLOW_UNIT_TO_M3S:
raise ValueError(f"不支持的流量单位: {unit}")
return float(value) * cls.FLOW_UNIT_TO_M3S[unit]
@classmethod
def _flow_from_m3s(cls, value_m3s: float, unit: str) -> float:
if unit not in cls.FLOW_UNIT_TO_M3S:
raise ValueError(f"不支持的流量单位: {unit}")
return float(value_m3s) / cls.FLOW_UNIT_TO_M3S[unit]
@staticmethod
def _effective_area_ratios(
raw_ratios: Union["np.ndarray", Dict[str, float]],
area_ids: List[str],
nodes_by_area: Dict[str, List[str]],
allocatable_counts: Union[Dict[str, int], None] = None,
) -> Dict[str, float]:
"""将输入比例转换为有效区域比例,确保有效区域比例和为 1。"""
area_count = len(area_ids)
if area_count == 0:
return {}
if isinstance(raw_ratios, dict):
ratios = np.array(
[float(raw_ratios.get(area_id, 0.0)) for area_id in area_ids],
dtype=float,
)
else:
arr = np.asarray(raw_ratios, dtype=float).reshape(-1)
ratios = np.zeros(area_count, dtype=float)
fill_len = min(area_count, arr.shape[0])
if fill_len > 0:
ratios[:fill_len] = arr[:fill_len]
# 仅保留非负比例,负值按 0 处理
ratios = np.clip(ratios, a_min=0.0, a_max=None)
# 仅在有效区域(存在可分配节点)内归一化
if allocatable_counts is not None:
valid_mask = np.array(
[int(allocatable_counts.get(area_id, 0)) > 0 for area_id in area_ids],
dtype=bool,
)
else:
valid_mask = np.array(
[len(nodes_by_area.get(area_id, [])) > 0 for area_id in area_ids],
dtype=bool,
)
if not np.any(valid_mask):
raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。")
effective = np.zeros(area_count, dtype=float)
valid_sum = float(np.sum(ratios[valid_mask]))
if valid_sum > 0:
effective[valid_mask] = ratios[valid_mask] / valid_sum
else:
# 若输入全为 0,则在有效区域内均分,保证总和仍为 1
valid_count = int(np.sum(valid_mask))
effective[valid_mask] = 1.0 / valid_count
return {area_id: float(effective[idx]) for idx, area_id in enumerate(area_ids)}
@staticmethod
def _normalize_area_map_df(df: pd.DataFrame) -> pd.DataFrame:
"""标准化区域映射列名为 ID 和 Area。"""
if "ID" in df.columns and "Area" in df.columns:
return df
if "ID" in df.columns and "now" in df.columns:
df = df.rename(columns={"now": "Area"})
return df
df = df.copy()
df.columns = ["ID", "Area"] + list(df.columns[2:])
return df
def __init__(
self,
inp_path: str,
sensor_nodes: List[str],
area_map: Union[str, Dict[str, str]],
start_time: float = 0,
duration: float = 24,
timestep: float = 5,
q_sum: float = 0.2,
):
"""
初始化漏损识别器。
参数:
inp_path: EPANET .inp 文件路径。
sensor_nodes: 用作压力传感器的节点 ID 列表。
area_map: 节点到区域的映射。可以是 CSV 文件路径(列:ID, Area),也可以是字典 {NodeID: AreaID}。
start_time: 模拟开始时间(小时)。
duration: 模拟持续时间(小时)。
timestep: 模拟时间步长(分钟)。
q_sum: 假设的总漏损流量 (m3/s)。
"""
self.inp_path = inp_path
self.sensor_nodes = sensor_nodes
self.start_time = start_time
self.duration = duration
self.timestep = timestep
self.q_sum = q_sum
# 加载管网模型(仅一次)
self.wn = wntr.network.WaterNetworkModel(self.inp_path)
# 优化 WNTR 设置以提高速度
self.wn.options.hydraulic.demand_model = "DD"
self.wn.options.time.duration = float(self.duration) * 3600
self.wn.options.time.hydraulic_timestep = float(self.timestep) * 60
self.wn.options.time.pattern_timestep = float(self.timestep) * 60
self.wn.options.time.report_timestep = float(self.timestep) * 60
# 加载区域映射
if isinstance(area_map, str):
self.area_map_df = self._load_area_map(area_map)
elif isinstance(area_map, dict):
self.area_map_df = self._normalize_area_map_df(
pd.DataFrame(list(area_map.items()), columns=["ID", "Area"])
)
else:
raise ValueError("area_map 必须是 CSV 文件路径或字典。")
self.area_ids = sorted(self.area_map_df["Area"].unique())
self.num_areas = len(self.area_ids)
# 按区域对节点进行预分类,以便更快查找
self.nodes_by_area = {
area: self.area_map_df[self.area_map_df["Area"] == area]["ID"].tolist()
for area in self.area_ids
}
def _load_area_map(self, path: str) -> pd.DataFrame:
"""加载并验证节点-区域映射文件。"""
df = pd.read_csv(path, dtype={"ID": str, "Area": str})
return self._normalize_area_map_df(df)
def run_identification(
self,
observed_pressure_data: Union[
str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]
],
output_dir: str = "Results",
pop_size: int = 50,
max_gen: int = 100,
output_flow_unit: str = "m3/s",
save_result: bool = True,
ftol: float = 1e-3,
ftol_period: int = 15,
):
"""
运行遗传算法以识别漏损分布。
参数:
observed_pressure_data: 包含 SCADA 压力数据的 CSV 文件路径或 DataFrame/字典列表数据。
output_dir: 结果保存目录。
pop_size: GA 的种群大小。
max_gen: GA 的最大代数。
output_flow_unit: 输出漏损流量的单位。
save_result: 是否保存识别结果到本地 CSV。
ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。
ftol_period: 收敛检测的窗口代数。
"""
if save_result:
os.makedirs(output_dir, exist_ok=True)
# 加载观测数据
if isinstance(observed_pressure_data, str):
obs_df = pd.read_csv(observed_pressure_data)
observed_name = os.path.basename(observed_pressure_data)
elif isinstance(observed_pressure_data, pd.DataFrame):
obs_df = observed_pressure_data.copy()
observed_name = "observed_pressure.csv"
else:
obs_df = pd.DataFrame(observed_pressure_data)
observed_name = "observed_pressure.csv"
# 准备 pymoo 问题实例
problem = LeakageProblem(
self.wn,
self.nodes_by_area,
self.area_ids,
self.sensor_nodes,
obs_df,
q_sum=self.q_sum,
)
# 配置 pymoo GA 算法
n_var = self.num_areas
algorithm = GA(
pop_size=pop_size,
crossover=SBX(prob=0.9, eta=15),
mutation=PM(prob=1.0 / max(1, n_var), eta=20),
eliminate_duplicates=False,
)
# 终止条件:收敛检测 + 最大代数
termination = DefaultSingleObjectiveTermination(
ftol=ftol,
period=ftol_period,
n_max_gen=max_gen,
)
# 回调:记录每代信息
callback = _ProgressCallback()
t0 = time.time()
res = pymoo_minimize(
problem,
algorithm,
termination,
seed=42,
verbose=True,
callback=callback,
)
elapsed = time.time() - t0
# 提取最优解
best_ind = res.X # 最优个体(漏损比例原始值)
best_obj = float(res.F[0])
# 输出终止信息
print(f"\n优化完成。耗时: {elapsed:.1f}s")
print(f"总代数: {res.algorithm.n_gen}, 总评估次数: {problem._eval_count}")
print(f"最佳目标值: {best_obj:.6f}")
# 保存到文件
effective_ratio_map = self._effective_area_ratios(
best_ind,
self.area_ids,
self.nodes_by_area,
allocatable_counts=problem.allocatable_counts,
)
normalized_ratios = [
effective_ratio_map.get(area_id, 0.0) for area_id in self.area_ids
]
leakage_flow_m3s = [ratio * self.q_sum for ratio in normalized_ratios]
leakage_flow_output = [
self._flow_from_m3s(value_m3s, output_flow_unit)
for value_m3s in leakage_flow_m3s
]
result_df = pd.DataFrame(
{
"Area": self.area_ids,
"LeakageRatioRaw": best_ind,
"LeakageRatio": normalized_ratios,
"LeakageFlow_m3_per_s": leakage_flow_m3s,
f"LeakageFlow_{output_flow_unit.replace('/', '_per_')}": leakage_flow_output,
}
)
result_path = None
if save_result:
result_path = os.path.join(
output_dir, f"identified_leakage_{observed_name}"
)
result_df.to_csv(result_path, index=False)
print(f"结果已保存至 {result_path}")
result_df.attrs["result_path"] = result_path
return result_df
class _ProgressCallback(Callback):
"""每代回调:记录进度。"""
def __init__(self):
super().__init__()
self.gen_times = []
self._t_last = None
def notify(self, algorithm):
now = time.time()
if self._t_last is not None:
self.gen_times.append(now - self._t_last)
self._t_last = now
class LeakageProblem(Problem):
"""pymoo 批量评估问题定义。
搜索空间:n 维 [0, 1] 实数 -> 通过 _effective_area_ratios 归一化到单纯形。
目标:模拟压力与观测压力之间的归一化误差范数。
无显式约束(sum=1 由归一化自动保证)。
"""
def __init__(
self,
wn,
nodes_by_area,
area_ids,
sensor_nodes,
observed_data,
q_sum: float = 0.2,
):
n_var = len(area_ids)
super().__init__(
n_var=n_var,
n_obj=1,
n_ieq_constr=0,
xl=np.zeros(n_var),
xu=np.ones(n_var),
)
self.wn = wn
self.nodes_by_area = nodes_by_area
self.area_ids = area_ids
self.sensor_nodes = sensor_nodes
self.q_sum = q_sum
# 预处理观测数据以匹配模拟格式
try:
missing_sensors = [
s for s in self.sensor_nodes if s not in observed_data.columns
]
if not missing_sensors:
self.obs_matrix = observed_data[self.sensor_nodes].values
else:
self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)]
except Exception:
self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)]
duration_sec = float(self.wn.options.time.duration)
step_sec = float(self.wn.options.time.hydraulic_timestep)
if step_sec > 0:
max_steps = int(duration_sec / step_sec) + 1
self.obs_matrix = self.obs_matrix[:max_steps, :]
# 预先缓存每个区域的需水对象,减少每次适应度计算的节点查找
self.demand_objs_by_area = {}
for area_id in self.area_ids:
demand_objs = []
for node_name in self.nodes_by_area.get(area_id, []):
if node_name not in self.wn.node_name_list:
continue
node = self.wn.get_node(node_name)
if (
hasattr(node, "demand_timeseries_list")
and len(node.demand_timeseries_list) > 0
):
demand_objs.append(node.demand_timeseries_list[0])
self.demand_objs_by_area[area_id] = demand_objs
self.allocatable_counts = {
area_id: len(self.demand_objs_by_area.get(area_id, []))
for area_id in self.area_ids
}
if not any(count > 0 for count in self.allocatable_counts.values()):
raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。")
# 评估计数器(诊断用)
self._eval_count = 0
def _evaluate(self, X, out, *args, **kwargs):
"""批量评估种群。
X: 形状 (pop_size, n_var) 的决策变量矩阵。
"""
n_pop = X.shape[0]
self._eval_count += n_pop
F = np.zeros((n_pop, 1))
for i in range(n_pop):
F[i, 0] = self._evaluate_single(X[i])
out["F"] = F
def _evaluate_single(self, x):
"""评估单个个体,返回归一化误差范数。"""
leak_ratios = x
# 将漏损分布归一化
effective_ratio_map = LeakageIdentifier._effective_area_ratios(
leak_ratios,
self.area_ids,
self.nodes_by_area,
allocatable_counts=self.allocatable_counts,
)
# 跟踪修改以便稍后恢复
modifications = []
for j, area_id in enumerate(self.area_ids):
ratio = effective_ratio_map.get(area_id, 0.0)
if ratio <= 0:
continue
demand_objs = self.demand_objs_by_area.get(area_id, [])
if not demand_objs:
continue
per_node_leak = self.q_sum * ratio / len(demand_objs)
for demand_obj in demand_objs:
original_val = demand_obj.base_value
demand_obj.base_value = original_val + per_node_leak
modifications.append((demand_obj, original_val))
try:
sim = wntr.sim.EpanetSimulator(self.wn)
results = sim.run_sim()
sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes]
n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0])
sim_vals = sim_pressure.values[:n_steps, :]
obs_vals = self.obs_matrix[:n_steps, :]
diff = sim_vals - obs_vals
# 按行最大值归一化
row_max = np.max(np.abs(diff), axis=1, keepdims=True)
row_max[row_max == 0] = 1.0 # 防止除以零
normalized_diff = diff / row_max
# 目标:归一化差值矩阵的 2-范数
return float(np.linalg.norm(normalized_diff))
except Exception:
return 1e9
finally:
for demand_obj, original_val in modifications:
demand_obj.base_value = original_val
def main() -> int:
parser = argparse.ArgumentParser(description="漏损区域识别")
parser.add_argument("--inp", required=True, help=".inp 文件路径")
parser.add_argument("--map", help="节点-区域映射 CSV 路径")
parser.add_argument("--scada", help="SCADA 压力 CSV 路径 (观测数据)")
parser.add_argument("--sensors", help="传感器节点 ID 列表 (逗号分隔)")
parser.add_argument("--output", default="Results", help="输出目录")
parser.add_argument("--pop_size", type=int, default=50, help="种群大小")
parser.add_argument("--max_gen", type=int, default=100, help="最大代数")
parser.add_argument("--duration", type=float, default=24, help="模拟时长(小时)")
parser.add_argument("--q_sum", type=float, default=0.241, help="总漏损流量")
parser.add_argument(
"--q_sum_unit",
default="m3/s",
choices=list(LeakageIdentifier.FLOW_UNIT_TO_M3S.keys()),
help="q_sum 输入单位(建议与现场习惯一致,内部统一换算为 m3/s)",
)
args = parser.parse_args()
if not args.map or not args.scada or not args.sensors:
parser.error("--map、--scada、--sensors 为必填")
q_sum_m3s = LeakageIdentifier._flow_to_m3s(args.q_sum, args.q_sum_unit)
sensors = [sensor.strip() for sensor in args.sensors.split(",") if sensor.strip()]
identifier = LeakageIdentifier(
args.inp, sensors, args.map, duration=args.duration, q_sum=q_sum_m3s
)
identifier.run_identification(
args.scada,
args.output,
pop_size=args.pop_size,
max_gen=args.max_gen,
output_flow_unit=args.q_sum_unit,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())