Files
TJWaterServerBinary/app/algorithms/leakage_identifier.py
T

491 lines
19 KiB
Python

import wntr
import numpy as np
import pandas as pd
import os
import argparse
from typing import Any, List, Dict, Union
try:
import geatpy as ea
_GEATPY_IMPORT_ERROR = None
except Exception as import_error:
ea = None
_GEATPY_IMPORT_ERROR = import_error
class LeakageIdentifier:
FLOW_UNIT_TO_M3S = {
"m3/s": 1.0,
"m3/h": 1.0 / 3600.0,
"L/s": 1.0 / 1000.0,
"L/min": 1.0 / 60000.0,
}
@classmethod
def _flow_to_m3s(cls, value: float, unit: str) -> float:
if unit not in cls.FLOW_UNIT_TO_M3S:
raise ValueError(f"不支持的流量单位: {unit}")
return float(value) * cls.FLOW_UNIT_TO_M3S[unit]
@classmethod
def _flow_from_m3s(cls, value_m3s: float, unit: str) -> float:
if unit not in cls.FLOW_UNIT_TO_M3S:
raise ValueError(f"不支持的流量单位: {unit}")
return float(value_m3s) / cls.FLOW_UNIT_TO_M3S[unit]
@staticmethod
def _effective_area_ratios(
raw_ratios: Union["np.ndarray", Dict[str, float]],
area_ids: List[str],
nodes_by_area: Dict[str, List[str]],
allocatable_counts: Union[Dict[str, int], None] = None,
) -> Dict[str, float]:
"""将输入比例转换为有效区域比例,确保有效区域比例和为 1。"""
area_count = len(area_ids)
if area_count == 0:
return {}
if isinstance(raw_ratios, dict):
ratios = np.array(
[float(raw_ratios.get(area_id, 0.0)) for area_id in area_ids],
dtype=float,
)
else:
arr = np.asarray(raw_ratios, dtype=float).reshape(-1)
ratios = np.zeros(area_count, dtype=float)
fill_len = min(area_count, arr.shape[0])
if fill_len > 0:
ratios[:fill_len] = arr[:fill_len]
# 仅保留非负比例,负值按 0 处理
ratios = np.clip(ratios, a_min=0.0, a_max=None)
# 仅在有效区域(存在可分配节点)内归一化
if allocatable_counts is not None:
valid_mask = np.array(
[int(allocatable_counts.get(area_id, 0)) > 0 for area_id in area_ids],
dtype=bool,
)
else:
valid_mask = np.array(
[len(nodes_by_area.get(area_id, [])) > 0 for area_id in area_ids],
dtype=bool,
)
if not np.any(valid_mask):
raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。")
effective = np.zeros(area_count, dtype=float)
valid_sum = float(np.sum(ratios[valid_mask]))
if valid_sum > 0:
effective[valid_mask] = ratios[valid_mask] / valid_sum
else:
# 若输入全为 0,则在有效区域内均分,保证总和仍为 1
valid_count = int(np.sum(valid_mask))
effective[valid_mask] = 1.0 / valid_count
return {area_id: float(effective[idx]) for idx, area_id in enumerate(area_ids)}
@staticmethod
def _normalize_area_map_df(df: pd.DataFrame) -> pd.DataFrame:
"""标准化区域映射列名为 ID 和 Area。"""
if "ID" in df.columns and "Area" in df.columns:
return df
if "ID" in df.columns and "now" in df.columns:
df = df.rename(columns={"now": "Area"})
return df
df = df.copy()
df.columns = ["ID", "Area"] + list(df.columns[2:])
return df
def __init__(
self,
inp_path: str,
sensor_nodes: List[str],
area_map: Union[str, Dict[str, str]],
start_time: float = 0,
duration: float = 24,
timestep: float = 5,
q_sum: float = 0.2,
):
"""
初始化漏损识别器。
参数:
inp_path: EPANET .inp 文件路径。
sensor_nodes: 用作压力传感器的节点 ID 列表。
area_map: 节点到区域的映射。可以是 CSV 文件路径(列:ID, Area),也可以是字典 {NodeID: AreaID}。
start_time: 模拟开始时间(小时)。
duration: 模拟持续时间(小时)。
timestep: 模拟时间步长(分钟)。
q_sum: 假设的总漏损流量 (m3/s)。
"""
self.inp_path = inp_path
self.sensor_nodes = sensor_nodes
self.start_time = start_time
self.duration = duration
self.timestep = timestep
self.q_sum = q_sum
# 加载管网模型(仅一次)
self.wn = wntr.network.WaterNetworkModel(self.inp_path)
# 优化 WNTR 设置以提高速度
self.wn.options.hydraulic.demand_model = "DD"
self.wn.options.time.duration = float(self.duration) * 3600
self.wn.options.time.hydraulic_timestep = float(self.timestep) * 60
self.wn.options.time.pattern_timestep = float(self.timestep) * 60
self.wn.options.time.report_timestep = float(self.timestep) * 60
# 加载区域映射
if isinstance(area_map, str):
self.area_map_df = self._load_area_map(area_map)
elif isinstance(area_map, dict):
self.area_map_df = self._normalize_area_map_df(
pd.DataFrame(list(area_map.items()), columns=["ID", "Area"])
)
else:
raise ValueError("area_map 必须是 CSV 文件路径或字典。")
self.area_ids = sorted(self.area_map_df["Area"].unique())
self.num_areas = len(self.area_ids)
# 按区域对节点进行预分类,以便更快查找
self.nodes_by_area = {
area: self.area_map_df[self.area_map_df["Area"] == area]["ID"].tolist()
for area in self.area_ids
}
def _load_area_map(self, path: str) -> pd.DataFrame:
"""加载并验证节点-区域映射文件。"""
df = pd.read_csv(path, dtype={"ID": str, "Area": str})
return self._normalize_area_map_df(df)
def run_identification(
self,
observed_pressure_data: Union[
str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]]
],
output_dir: str = "Results",
pop_size: int = 50,
max_gen: int = 100,
output_flow_unit: str = "m3/s",
save_result: bool = True,
):
"""
运行遗传算法以识别漏损分布。
参数:
observed_pressure_data: 包含 SCADA 压力数据的 CSV 文件路径或 DataFrame/字典列表数据。
output_dir: 结果保存目录。
pop_size: GA 的种群大小。
max_gen: GA 的最大代数。
save_result: 是否保存识别结果到本地 CSV。
"""
if ea is None:
raise ImportError(
"geatpy 无法导入,无法运行识别。请安装兼容版本或修复依赖。"
) from _GEATPY_IMPORT_ERROR
if save_result:
os.makedirs(output_dir, exist_ok=True)
# 加载观测数据
# 假设观测数据格式:行=时间,列=传感器(或原始格式)
# 原始代码: scadapd = pd.read_csv(scada); Rs = scadapd.values[1 : Rlen + 1, :]
# 我们需要将其标准化。假设带有标题的标准 CSV 或匹配原始格式。
if isinstance(observed_pressure_data, str):
obs_df = pd.read_csv(observed_pressure_data)
observed_name = os.path.basename(observed_pressure_data)
elif isinstance(observed_pressure_data, pd.DataFrame):
obs_df = observed_pressure_data.copy()
observed_name = "observed_pressure.csv"
else:
obs_df = pd.DataFrame(observed_pressure_data)
observed_name = "observed_pressure.csv"
# 提取特定传感器和时间步长的观测压力
# 这部分需要与数据存储方式对齐。
# 对于此重构,我们将遵循读取对应模拟步骤值的原始逻辑。
# 准备问题实例
problem = LeakageProblem(
self.wn,
self.nodes_by_area,
self.area_ids,
self.sensor_nodes,
obs_df,
q_sum=self.q_sum,
)
# 配置算法
algorithm = ea.soea_SEGA_templet(
problem, ea.Population(Encoding="RI", NIND=pop_size)
)
algorithm.MAXGEN = max_gen
algorithm.mutOper.Pm = 0.5 # 变异概率 (for Real Encoding, this is usually probability per individual or variable)
algorithm.recOper.XOVR = 0.9 # 交叉概率
algorithm.logTras = 1 # 每代记录一次
# 运行
res = ea.optimize(
algorithm,
verbose=True,
drawing=0,
outputMsg=True,
drawLog=False,
saveFlag=False,
)
# 保存结果
best_ind = res["Vars"][0] # 最优个体(漏损比例)
best_obj = res["ObjV"][0][0] # 最优目标函数值
print(f"优化完成。最佳目标值: {best_obj}")
# 保存到文件
effective_ratio_map = self._effective_area_ratios(
best_ind,
self.area_ids,
self.nodes_by_area,
allocatable_counts=problem.allocatable_counts,
)
normalized_ratios = [
effective_ratio_map.get(area_id, 0.0) for area_id in self.area_ids
]
leakage_flow_m3s = [ratio * self.q_sum for ratio in normalized_ratios]
leakage_flow_output = [
self._flow_from_m3s(value_m3s, output_flow_unit)
for value_m3s in leakage_flow_m3s
]
result_df = pd.DataFrame(
{
"Area": self.area_ids,
"LeakageRatioRaw": best_ind,
"LeakageRatio": normalized_ratios,
"LeakageFlow_m3_per_s": leakage_flow_m3s,
f"LeakageFlow_{output_flow_unit.replace('/', '_per_')}": leakage_flow_output,
}
)
result_path = None
if save_result:
result_path = os.path.join(
output_dir, f"identified_leakage_{observed_name}"
)
result_df.to_csv(result_path, index=False)
print(f"结果已保存至 {result_path}")
result_df.attrs["result_path"] = result_path
return result_df
if ea is not None:
class LeakageProblem(ea.Problem):
def __init__(
self,
wn,
nodes_by_area,
area_ids,
sensor_nodes,
observed_data,
q_sum,
):
name = "LeakageIdentification"
M = 1 # 单目标
maxormins = [1] # 最小化
Dim = len(area_ids) # 维度 = 区域数量
varTypes = [0] * Dim # 连续变量
lb = [0] * Dim # 下界 0
ub = [1] * Dim # 上界 1
lbin = [1] * Dim # 包含下界
ubin = [1] * Dim # 包含上界
super().__init__(name, M, maxormins, Dim, varTypes, lb, ub, lbin, ubin)
self.wn = wn
self.nodes_by_area = nodes_by_area
self.area_ids = area_ids
self.sensor_nodes = sensor_nodes
self.q_sum = q_sum
# 预处理观测数据以匹配模拟格式
# 提取对应于传感器节点的列
try:
# 检查列是否存在
missing_sensors = [
s for s in self.sensor_nodes if s not in observed_data.columns
]
if not missing_sensors:
self.obs_matrix = observed_data[self.sensor_nodes].values
else:
# 回退:如果标题不匹配,假设列顺序与传感器节点一致
self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)]
except Exception:
self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)]
duration_sec = float(self.wn.options.time.duration)
step_sec = float(self.wn.options.time.hydraulic_timestep)
if step_sec > 0:
max_steps = int(duration_sec / step_sec) + 1
self.obs_matrix = self.obs_matrix[:max_steps, :]
# 预先缓存每个区域的需水对象,减少每次适应度计算的节点查找
self.demand_objs_by_area = {}
for area_id in self.area_ids:
demand_objs = []
for node_name in self.nodes_by_area.get(area_id, []):
if node_name not in self.wn.node_name_list:
continue
node = self.wn.get_node(node_name)
if (
hasattr(node, "demand_timeseries_list")
and len(node.demand_timeseries_list) > 0
):
demand_objs.append(node.demand_timeseries_list[0])
self.demand_objs_by_area[area_id] = demand_objs
self.allocatable_counts = {
area_id: len(self.demand_objs_by_area.get(area_id, []))
for area_id in self.area_ids
}
if not any(count > 0 for count in self.allocatable_counts.values()):
raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。")
def aimFunc(self, pop):
Vars = pop.Phen # 种群表现型(实数值)
NIND = Vars.shape[0]
ObjV = np.zeros((NIND, 1))
for i in range(NIND):
leak_ratios = Vars[i, :]
# 1. 将漏损分布应用到模型
effective_ratio_map = LeakageIdentifier._effective_area_ratios(
leak_ratios,
self.area_ids,
self.nodes_by_area,
allocatable_counts=self.allocatable_counts,
)
# 此时跟踪修改以便稍后恢复
modifications = [] # (node_obj, original_base_demand) 列表
for j, area_id in enumerate(self.area_ids):
ratio = effective_ratio_map.get(area_id, 0.0)
if ratio <= 0:
continue
demand_objs = self.demand_objs_by_area.get(area_id, [])
if not demand_objs:
continue
# 将漏损分配给区域内的节点
per_node_leak = self.q_sum * ratio / len(demand_objs)
for demand_obj in demand_objs:
original_val = demand_obj.base_value
demand_obj.base_value = original_val + per_node_leak
modifications.append((demand_obj, original_val))
# 2. 运行模拟
try:
sim = wntr.sim.EpanetSimulator(self.wn)
results = sim.run_sim()
# 3. 计算目标函数(误差)
sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes]
# 对齐维度
# 仅比较重叠的时间步长
n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0])
sim_vals = sim_pressure.values[:n_steps, :]
obs_vals = self.obs_matrix[:n_steps, :]
# 差值
diff = sim_vals - obs_vals
# 按行最大值归一化(根据原始代码逻辑)
# R1 = R0 - Rs
# Rmax = R1.max(axis=1) -> 该时间步长的最大绝对差值?
# 原始代码: Rmax = R1.max(axis=1) (该时间步长所有传感器的最大值)
# 注意:如果最大差值为 0 或负数,原始代码逻辑可能有缺陷,使用绝对最大值更安全
# Rmax = Rmax.reshape(-1, 1)
# R = R1 / Rmax
# 计算每个时间步长的最大差值
row_max = np.max(np.abs(diff), axis=1, keepdims=True)
row_max[row_max == 0] = 1.0 # 防止除以零
normalized_diff = diff / row_max
# 目标:归一化差值矩阵的 2-范数
error = np.linalg.norm(normalized_diff)
ObjV[i] = error
except Exception:
ObjV[i] = 1e9 # 失败时给予高惩罚
# 4. 恢复模型更改
for demand_obj, original_val in modifications:
demand_obj.base_value = original_val
pop.ObjV = ObjV
pass
else:
class LeakageProblem:
def __init__(self, *args, **kwargs):
raise ImportError(
"geatpy 无法导入,LeakageProblem 不可用。"
) from _GEATPY_IMPORT_ERROR
def main() -> int:
parser = argparse.ArgumentParser(description="漏损区域识别")
parser.add_argument("--inp", required=True, help=".inp 文件路径")
parser.add_argument("--map", help="节点-区域映射 CSV 路径")
parser.add_argument("--scada", help="SCADA 压力 CSV 路径 (观测数据)")
parser.add_argument("--sensors", help="传感器节点 ID 列表 (逗号分隔)")
parser.add_argument("--output", default="Results", help="输出目录")
parser.add_argument("--pop_size", type=int, default=50, help="种群大小")
parser.add_argument("--max_gen", type=int, default=100, help="最大代数")
parser.add_argument("--duration", type=float, default=24, help="模拟时长(小时)")
parser.add_argument("--q_sum", type=float, default=0.241, help="总漏损流量")
parser.add_argument(
"--q_sum_unit",
default="m3/s",
choices=list(LeakageIdentifier.FLOW_UNIT_TO_M3S.keys()),
help="q_sum 输入单位(建议与现场习惯一致,内部统一换算为 m3/s)",
)
args = parser.parse_args()
if not args.map or not args.scada or not args.sensors:
parser.error("--map、--scada、--sensors 为必填")
q_sum_m3s = LeakageIdentifier._flow_to_m3s(args.q_sum, args.q_sum_unit)
sensors = [sensor.strip() for sensor in args.sensors.split(",") if sensor.strip()]
identifier = LeakageIdentifier(
args.inp, sensors, args.map, duration=args.duration, q_sum=q_sum_m3s
)
identifier.run_identification(
args.scada,
args.output,
pop_size=args.pop_size,
max_gen=args.max_gen,
output_flow_unit=args.q_sum_unit,
)
return 0
if __name__ == "__main__":
raise SystemExit(main())