diff --git a/.env.local b/.env.local deleted file mode 100644 index d488870..0000000 --- a/.env.local +++ /dev/null @@ -1,23 +0,0 @@ -NETWORK_NAME="tjwater" -KEYCLOAK_PUBLIC_KEY="-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEApBjdgjImuFfKsZ+FWFlsZSG0Kftduc2o0qA/warFezaYmi8+7fiuuhLErLUbjGPSEU3WpsVxPe5PIs+AJJn/z9uBXXXo/pYggHvp48hlwr6MIYX5xtby7MLM/bHL2ACN4m7FNs/Gilkkbt4515sMFUiwJzd6Wj6FvQdGDDGx/7bVGgiVQRJvrrMZN5zD4i8cFiTQIcGKbURJjre/zWWiA+7gEwArp9ujjBuaINooiQLQM39C9Z5QJcp5nhaztOBiJJgiJOHi5MLpIhI1p1ViVBXKXRMuPhtTXLAz+r/sC44XZS/6V8uUPuLNin9o0jHk/CqJ3GkK3xJBQoWgplkwuQIDAQAB\n-----END PUBLIC KEY-----" -KEYCLOAK_ALGORITHM="RS256" -KEYCLOAK_AUDIENCE="account" - -DB_NAME="tjwater" -DB_HOST="192.168.1.114" -DB_PORT="5432" -DB_USER="tjwater" -DB_PASSWORD="Tjwater@123456" - -TIMESCALEDB_DB_NAME="tjwater" -TIMESCALEDB_DB_HOST="192.168.1.114" -TIMESCALEDB_DB_PORT="5433" -TIMESCALEDB_DB_USER="tjwater" -TIMESCALEDB_DB_PASSWORD="Tjwater@123456" - -METADATA_DB_NAME="system_hub" -METADATA_DB_HOST="192.168.1.114" -METADATA_DB_PORT="5432" -METADATA_DB_USER="tjwater" -METADATA_DB_PASSWORD="Tjwater@123456" -DATABASE_ENCRYPTION_KEY="rJC2VqLg4KrlSq+DGJcYm869q4v5KB2dFAeuQTe0I50=" diff --git a/app/algorithms/leakage_identifier.py b/app/algorithms/leakage_identifier.py new file mode 100644 index 0000000..697be86 --- /dev/null +++ b/app/algorithms/leakage_identifier.py @@ -0,0 +1,490 @@ +import wntr +import numpy as np +import pandas as pd +import os +import argparse +from typing import Any, List, Dict, Union + +try: + import geatpy as ea + + _GEATPY_IMPORT_ERROR = None + +except Exception as import_error: + ea = None + _GEATPY_IMPORT_ERROR = import_error + + +class LeakageIdentifier: + FLOW_UNIT_TO_M3S = { + "m3/s": 1.0, + "m3/h": 1.0 / 3600.0, + "L/s": 1.0 / 1000.0, + "L/min": 1.0 / 60000.0, + } + + @classmethod + def _flow_to_m3s(cls, value: float, unit: str) -> float: + if unit not in cls.FLOW_UNIT_TO_M3S: + raise ValueError(f"不支持的流量单位: {unit}") + return float(value) * cls.FLOW_UNIT_TO_M3S[unit] + + @classmethod + def _flow_from_m3s(cls, value_m3s: float, unit: str) -> float: + if unit not in cls.FLOW_UNIT_TO_M3S: + raise ValueError(f"不支持的流量单位: {unit}") + return float(value_m3s) / cls.FLOW_UNIT_TO_M3S[unit] + + @staticmethod + def _effective_area_ratios( + raw_ratios: Union["np.ndarray", Dict[str, float]], + area_ids: List[str], + nodes_by_area: Dict[str, List[str]], + allocatable_counts: Union[Dict[str, int], None] = None, + ) -> Dict[str, float]: + """将输入比例转换为有效区域比例,确保有效区域比例和为 1。""" + area_count = len(area_ids) + if area_count == 0: + return {} + + if isinstance(raw_ratios, dict): + ratios = np.array( + [float(raw_ratios.get(area_id, 0.0)) for area_id in area_ids], + dtype=float, + ) + else: + arr = np.asarray(raw_ratios, dtype=float).reshape(-1) + ratios = np.zeros(area_count, dtype=float) + fill_len = min(area_count, arr.shape[0]) + if fill_len > 0: + ratios[:fill_len] = arr[:fill_len] + + # 仅保留非负比例,负值按 0 处理 + ratios = np.clip(ratios, a_min=0.0, a_max=None) + + # 仅在有效区域(存在可分配节点)内归一化 + if allocatable_counts is not None: + valid_mask = np.array( + [int(allocatable_counts.get(area_id, 0)) > 0 for area_id in area_ids], + dtype=bool, + ) + else: + valid_mask = np.array( + [len(nodes_by_area.get(area_id, [])) > 0 for area_id in area_ids], + dtype=bool, + ) + if not np.any(valid_mask): + raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") + + effective = np.zeros(area_count, dtype=float) + valid_sum = float(np.sum(ratios[valid_mask])) + + if valid_sum > 0: + effective[valid_mask] = ratios[valid_mask] / valid_sum + else: + # 若输入全为 0,则在有效区域内均分,保证总和仍为 1 + valid_count = int(np.sum(valid_mask)) + effective[valid_mask] = 1.0 / valid_count + + return {area_id: float(effective[idx]) for idx, area_id in enumerate(area_ids)} + + @staticmethod + def _normalize_area_map_df(df: pd.DataFrame) -> pd.DataFrame: + """标准化区域映射列名为 ID 和 Area。""" + if "ID" in df.columns and "Area" in df.columns: + return df + + if "ID" in df.columns and "now" in df.columns: + df = df.rename(columns={"now": "Area"}) + return df + + df = df.copy() + df.columns = ["ID", "Area"] + list(df.columns[2:]) + return df + + def __init__( + self, + inp_path: str, + sensor_nodes: List[str], + area_map: Union[str, Dict[str, str]], + start_time: float = 0, + duration: float = 24, + timestep: float = 5, + q_sum: float = 0.2, + ): + """ + 初始化漏损识别器。 + + 参数: + inp_path: EPANET .inp 文件路径。 + sensor_nodes: 用作压力传感器的节点 ID 列表。 + area_map: 节点到区域的映射。可以是 CSV 文件路径(列:ID, Area),也可以是字典 {NodeID: AreaID}。 + start_time: 模拟开始时间(小时)。 + duration: 模拟持续时间(小时)。 + timestep: 模拟时间步长(分钟)。 + q_sum: 假设的总漏损流量 (m3/s)。 + """ + self.inp_path = inp_path + self.sensor_nodes = sensor_nodes + self.start_time = start_time + self.duration = duration + self.timestep = timestep + self.q_sum = q_sum + + # 加载管网模型(仅一次) + self.wn = wntr.network.WaterNetworkModel(self.inp_path) + + # 优化 WNTR 设置以提高速度 + self.wn.options.hydraulic.demand_model = "DD" + self.wn.options.time.duration = float(self.duration) * 3600 + self.wn.options.time.hydraulic_timestep = float(self.timestep) * 60 + self.wn.options.time.pattern_timestep = float(self.timestep) * 60 + self.wn.options.time.report_timestep = float(self.timestep) * 60 + + # 加载区域映射 + if isinstance(area_map, str): + self.area_map_df = self._load_area_map(area_map) + elif isinstance(area_map, dict): + self.area_map_df = self._normalize_area_map_df( + pd.DataFrame(list(area_map.items()), columns=["ID", "Area"]) + ) + else: + raise ValueError("area_map 必须是 CSV 文件路径或字典。") + + self.area_ids = sorted(self.area_map_df["Area"].unique()) + self.num_areas = len(self.area_ids) + + # 按区域对节点进行预分类,以便更快查找 + self.nodes_by_area = { + area: self.area_map_df[self.area_map_df["Area"] == area]["ID"].tolist() + for area in self.area_ids + } + + def _load_area_map(self, path: str) -> pd.DataFrame: + """加载并验证节点-区域映射文件。""" + df = pd.read_csv(path, dtype={"ID": str, "Area": str}) + return self._normalize_area_map_df(df) + + def run_identification( + self, + observed_pressure_data: Union[ + str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]] + ], + output_dir: str = "Results", + pop_size: int = 50, + max_gen: int = 100, + output_flow_unit: str = "m3/s", + save_result: bool = True, + ): + """ + 运行遗传算法以识别漏损分布。 + + 参数: + observed_pressure_data: 包含 SCADA 压力数据的 CSV 文件路径或 DataFrame/字典列表数据。 + output_dir: 结果保存目录。 + pop_size: GA 的种群大小。 + max_gen: GA 的最大代数。 + save_result: 是否保存识别结果到本地 CSV。 + """ + if ea is None: + raise ImportError( + "geatpy 无法导入,无法运行识别。请安装兼容版本或修复依赖。" + ) from _GEATPY_IMPORT_ERROR + if save_result: + os.makedirs(output_dir, exist_ok=True) + + # 加载观测数据 + # 假设观测数据格式:行=时间,列=传感器(或原始格式) + # 原始代码: scadapd = pd.read_csv(scada); Rs = scadapd.values[1 : Rlen + 1, :] + # 我们需要将其标准化。假设带有标题的标准 CSV 或匹配原始格式。 + if isinstance(observed_pressure_data, str): + obs_df = pd.read_csv(observed_pressure_data) + observed_name = os.path.basename(observed_pressure_data) + elif isinstance(observed_pressure_data, pd.DataFrame): + obs_df = observed_pressure_data.copy() + observed_name = "observed_pressure.csv" + else: + obs_df = pd.DataFrame(observed_pressure_data) + observed_name = "observed_pressure.csv" + # 提取特定传感器和时间步长的观测压力 + # 这部分需要与数据存储方式对齐。 + # 对于此重构,我们将遵循读取对应模拟步骤值的原始逻辑。 + + # 准备问题实例 + problem = LeakageProblem( + self.wn, + self.nodes_by_area, + self.area_ids, + self.sensor_nodes, + obs_df, + q_sum=self.q_sum, + ) + + # 配置算法 + algorithm = ea.soea_SEGA_templet( + problem, ea.Population(Encoding="RI", NIND=pop_size) + ) + algorithm.MAXGEN = max_gen + algorithm.mutOper.Pm = 0.5 # 变异概率 (for Real Encoding, this is usually probability per individual or variable) + algorithm.recOper.XOVR = 0.9 # 交叉概率 + algorithm.logTras = 1 # 每代记录一次 + + # 运行 + res = ea.optimize( + algorithm, + verbose=True, + drawing=0, + outputMsg=True, + drawLog=False, + saveFlag=False, + ) + + # 保存结果 + best_ind = res["Vars"][0] # 最优个体(漏损比例) + best_obj = res["ObjV"][0][0] # 最优目标函数值 + + print(f"优化完成。最佳目标值: {best_obj}") + + # 保存到文件 + effective_ratio_map = self._effective_area_ratios( + best_ind, + self.area_ids, + self.nodes_by_area, + allocatable_counts=problem.allocatable_counts, + ) + normalized_ratios = [ + effective_ratio_map.get(area_id, 0.0) for area_id in self.area_ids + ] + leakage_flow_m3s = [ratio * self.q_sum for ratio in normalized_ratios] + leakage_flow_output = [ + self._flow_from_m3s(value_m3s, output_flow_unit) + for value_m3s in leakage_flow_m3s + ] + + result_df = pd.DataFrame( + { + "Area": self.area_ids, + "LeakageRatioRaw": best_ind, + "LeakageRatio": normalized_ratios, + "LeakageFlow_m3_per_s": leakage_flow_m3s, + f"LeakageFlow_{output_flow_unit.replace('/', '_per_')}": leakage_flow_output, + } + ) + result_path = None + if save_result: + result_path = os.path.join( + output_dir, f"identified_leakage_{observed_name}" + ) + result_df.to_csv(result_path, index=False) + print(f"结果已保存至 {result_path}") + result_df.attrs["result_path"] = result_path + return result_df + + +if ea is not None: + + class LeakageProblem(ea.Problem): + def __init__( + self, + wn, + nodes_by_area, + area_ids, + sensor_nodes, + observed_data, + q_sum, + ): + name = "LeakageIdentification" + M = 1 # 单目标 + maxormins = [1] # 最小化 + Dim = len(area_ids) # 维度 = 区域数量 + varTypes = [0] * Dim # 连续变量 + lb = [0] * Dim # 下界 0 + ub = [1] * Dim # 上界 1 + lbin = [1] * Dim # 包含下界 + ubin = [1] * Dim # 包含上界 + + super().__init__(name, M, maxormins, Dim, varTypes, lb, ub, lbin, ubin) + + self.wn = wn + self.nodes_by_area = nodes_by_area + self.area_ids = area_ids + self.sensor_nodes = sensor_nodes + self.q_sum = q_sum + + # 预处理观测数据以匹配模拟格式 + # 提取对应于传感器节点的列 + try: + # 检查列是否存在 + missing_sensors = [ + s for s in self.sensor_nodes if s not in observed_data.columns + ] + if not missing_sensors: + self.obs_matrix = observed_data[self.sensor_nodes].values + else: + # 回退:如果标题不匹配,假设列顺序与传感器节点一致 + self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)] + except Exception: + self.obs_matrix = observed_data.values[:, : len(self.sensor_nodes)] + + duration_sec = float(self.wn.options.time.duration) + step_sec = float(self.wn.options.time.hydraulic_timestep) + if step_sec > 0: + max_steps = int(duration_sec / step_sec) + 1 + self.obs_matrix = self.obs_matrix[:max_steps, :] + + # 预先缓存每个区域的需水对象,减少每次适应度计算的节点查找 + self.demand_objs_by_area = {} + for area_id in self.area_ids: + demand_objs = [] + for node_name in self.nodes_by_area.get(area_id, []): + if node_name not in self.wn.node_name_list: + continue + node = self.wn.get_node(node_name) + if ( + hasattr(node, "demand_timeseries_list") + and len(node.demand_timeseries_list) > 0 + ): + demand_objs.append(node.demand_timeseries_list[0]) + self.demand_objs_by_area[area_id] = demand_objs + + self.allocatable_counts = { + area_id: len(self.demand_objs_by_area.get(area_id, [])) + for area_id in self.area_ids + } + if not any(count > 0 for count in self.allocatable_counts.values()): + raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") + + def aimFunc(self, pop): + Vars = pop.Phen # 种群表现型(实数值) + NIND = Vars.shape[0] + ObjV = np.zeros((NIND, 1)) + + for i in range(NIND): + leak_ratios = Vars[i, :] + + # 1. 将漏损分布应用到模型 + effective_ratio_map = LeakageIdentifier._effective_area_ratios( + leak_ratios, + self.area_ids, + self.nodes_by_area, + allocatable_counts=self.allocatable_counts, + ) + + # 此时跟踪修改以便稍后恢复 + modifications = [] # (node_obj, original_base_demand) 列表 + + for j, area_id in enumerate(self.area_ids): + ratio = effective_ratio_map.get(area_id, 0.0) + if ratio <= 0: + continue + + demand_objs = self.demand_objs_by_area.get(area_id, []) + if not demand_objs: + continue + + # 将漏损分配给区域内的节点 + per_node_leak = self.q_sum * ratio / len(demand_objs) + + for demand_obj in demand_objs: + original_val = demand_obj.base_value + demand_obj.base_value = original_val + per_node_leak + modifications.append((demand_obj, original_val)) + + # 2. 运行模拟 + try: + sim = wntr.sim.EpanetSimulator(self.wn) + results = sim.run_sim() + + # 3. 计算目标函数(误差) + sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes] + + # 对齐维度 + # 仅比较重叠的时间步长 + n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0]) + sim_vals = sim_pressure.values[:n_steps, :] + obs_vals = self.obs_matrix[:n_steps, :] + + # 差值 + diff = sim_vals - obs_vals + + # 按行最大值归一化(根据原始代码逻辑) + # R1 = R0 - Rs + # Rmax = R1.max(axis=1) -> 该时间步长的最大绝对差值? + # 原始代码: Rmax = R1.max(axis=1) (该时间步长所有传感器的最大值) + # 注意:如果最大差值为 0 或负数,原始代码逻辑可能有缺陷,使用绝对最大值更安全 + # Rmax = Rmax.reshape(-1, 1) + # R = R1 / Rmax + + # 计算每个时间步长的最大差值 + row_max = np.max(np.abs(diff), axis=1, keepdims=True) + row_max[row_max == 0] = 1.0 # 防止除以零 + + normalized_diff = diff / row_max + + # 目标:归一化差值矩阵的 2-范数 + error = np.linalg.norm(normalized_diff) + ObjV[i] = error + + except Exception: + ObjV[i] = 1e9 # 失败时给予高惩罚 + + # 4. 恢复模型更改 + for demand_obj, original_val in modifications: + demand_obj.base_value = original_val + + pop.ObjV = ObjV + + pass + +else: + + class LeakageProblem: + def __init__(self, *args, **kwargs): + raise ImportError( + "geatpy 无法导入,LeakageProblem 不可用。" + ) from _GEATPY_IMPORT_ERROR + + +def main() -> int: + parser = argparse.ArgumentParser(description="漏损区域识别") + parser.add_argument("--inp", required=True, help=".inp 文件路径") + parser.add_argument("--map", help="节点-区域映射 CSV 路径") + parser.add_argument("--scada", help="SCADA 压力 CSV 路径 (观测数据)") + parser.add_argument("--sensors", help="传感器节点 ID 列表 (逗号分隔)") + parser.add_argument("--output", default="Results", help="输出目录") + + parser.add_argument("--pop_size", type=int, default=50, help="种群大小") + parser.add_argument("--max_gen", type=int, default=100, help="最大代数") + parser.add_argument("--duration", type=float, default=24, help="模拟时长(小时)") + parser.add_argument("--q_sum", type=float, default=0.241, help="总漏损流量") + parser.add_argument( + "--q_sum_unit", + default="m3/s", + choices=list(LeakageIdentifier.FLOW_UNIT_TO_M3S.keys()), + help="q_sum 输入单位(建议与现场习惯一致,内部统一换算为 m3/s)", + ) + + args = parser.parse_args() + + if not args.map or not args.scada or not args.sensors: + parser.error("--map、--scada、--sensors 为必填") + + q_sum_m3s = LeakageIdentifier._flow_to_m3s(args.q_sum, args.q_sum_unit) + + sensors = [sensor.strip() for sensor in args.sensors.split(",") if sensor.strip()] + identifier = LeakageIdentifier( + args.inp, sensors, args.map, duration=args.duration, q_sum=q_sum_m3s + ) + + identifier.run_identification( + args.scada, + args.output, + pop_size=args.pop_size, + max_gen=args.max_gen, + output_flow_unit=args.q_sum_unit, + ) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/app/api/v1/endpoints/leakage.py b/app/api/v1/endpoints/leakage.py new file mode 100644 index 0000000..c83aab2 --- /dev/null +++ b/app/api/v1/endpoints/leakage.py @@ -0,0 +1,30 @@ +from typing import Any + +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from app.services.leakage_identifier import run_leakage_identification + +router = APIRouter() + + +class LeakageIdentifyRequest(BaseModel): + network: str + observed_pressure_data: str | dict[str, list[Any]] | list[dict[str, Any]] + start_time: float = 0 + duration: float = 24 + timestep: float = 5 + q_sum: float = 0.2 + q_sum_unit: str = "m3/s" + output_dir: str = "Results" + pop_size: int = 50 + max_gen: int = 100 + output_flow_unit: str = "m3/s" + + +@router.post("/identify/") +async def identify_leakage(data: LeakageIdentifyRequest) -> dict[str, Any]: + try: + return run_leakage_identification(**data.dict()) + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) diff --git a/app/api/v1/router.py b/app/api/v1/router.py index 25b4abd..bc13d8d 100644 --- a/app/api/v1/router.py +++ b/app/api/v1/router.py @@ -12,6 +12,7 @@ from app.api.v1.endpoints import ( misc, risk, cache, + leakage, user_management, # 新增:用户管理 audit, # 新增:审计日志 meta, @@ -83,6 +84,7 @@ api_router.include_router(schemes.router, tags=["Schemes"]) api_router.include_router(misc.router, tags=["Misc"]) api_router.include_router(risk.router, tags=["Risk"]) api_router.include_router(cache.router, tags=["Cache"]) +api_router.include_router(leakage.router, prefix="/leakage", tags=["Leakage"]) # Database Routers api_router.include_router(timescaledb_router, tags=["TimescaleDB"]) diff --git a/app/services/__init__.py b/app/services/__init__.py index 2fd9a9c..645a38a 100644 --- a/app/services/__init__.py +++ b/app/services/__init__.py @@ -15,6 +15,7 @@ from app.services.simulation_ops import ( scheduling_simulation, daily_scheduling_simulation, ) +from app.services.leakage_identifier import run_leakage_identification __all__ = [ "network_update", @@ -31,4 +32,5 @@ __all__ = [ "scheduling_simulation", "daily_scheduling_simulation", "analyze_valve_isolation", + "run_leakage_identification", ] diff --git a/app/services/leakage_identifier.py b/app/services/leakage_identifier.py new file mode 100644 index 0000000..66d97e8 --- /dev/null +++ b/app/services/leakage_identifier.py @@ -0,0 +1,104 @@ +import os +from typing import Any + +import pandas as pd + +from app.algorithms.leakage_identifier import LeakageIdentifier +from app.services.tjnetwork import ( + PARTITION_TYPE_KWAY, + calculate_district_metering_area_for_nodes, + dump_inp, + get_all_scada_info, + get_network_node_coords, +) + + +def run_leakage_identification( + network: str, + observed_pressure_data: str | pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]], + start_time: float = 0, + duration: float = 24, + timestep: float = 5, + q_sum: float = 0.2, + q_sum_unit: str = "m3/s", + output_dir: str = "Results", + pop_size: int = 50, + max_gen: int = 100, + output_flow_unit: str = "m3/s", +) -> dict[str, Any]: + os.makedirs(output_dir, exist_ok=True) + inp_path = os.path.join(output_dir, f"{network}.leakage.inp") + dump_inp(network, inp_path, "2") + sensor_nodes = _get_pressure_sensor_nodes(network) + area_map = _build_area_map_by_spectral_partition(network, sensor_nodes) + + q_sum_m3s = LeakageIdentifier._flow_to_m3s(q_sum, q_sum_unit) + identifier = LeakageIdentifier( + inp_path=inp_path, + sensor_nodes=sensor_nodes, + area_map=area_map, + start_time=start_time, + duration=duration, + timestep=timestep, + q_sum=q_sum_m3s, + ) + result_df = identifier.run_identification( + observed_pressure_data=observed_pressure_data, + output_dir=output_dir, + pop_size=pop_size, + max_gen=max_gen, + output_flow_unit=output_flow_unit, + save_result=False, + ) + return { + "result_path": result_df.attrs.get("result_path"), + "sensor_nodes": sensor_nodes, + "area_count": len(set(area_map.values())), + "rows": result_df.to_dict(orient="records"), + } + + +def _get_pressure_sensor_nodes(network: str) -> list[str]: + scada_info = get_all_scada_info(network) + sensor_nodes: list[str] = [] + for item in scada_info: + if item.get("type") != "pressure": + continue + node_id = item.get("associated_element_id") + if isinstance(node_id, str) and node_id: + sensor_nodes.append(node_id) + sensor_nodes = list(dict.fromkeys(sensor_nodes)) + if not sensor_nodes: + raise ValueError("未找到压力传感器对应节点(scada_info.type=pressure)。") + return sensor_nodes + + +def _build_area_map_by_spectral_partition( + network: str, sensor_nodes: list[str] +) -> dict[str, str]: + node_coords = get_network_node_coords(network) + all_nodes = list(node_coords.keys()) + if not all_nodes: + raise ValueError("管网中未获取到可分区节点。") + + part_count = min(len(sensor_nodes), len(all_nodes)) + if part_count <= 0: + raise ValueError("无可用压力传感器,无法生成虚拟分区。") + + groups = calculate_district_metering_area_for_nodes( + network, + all_nodes, + part_count=part_count, + part_type=PARTITION_TYPE_KWAY, + ) + if not groups: + raise ValueError("虚拟分区计算失败,未返回分区结果。") + + area_map: dict[str, str] = {} + for idx, group_nodes in enumerate(groups, start=1): + area_id = str(idx) + for node_id in group_nodes: + area_map[node_id] = area_id + if not area_map: + raise ValueError("虚拟分区结果为空,无法生成节点区域映射。") + return area_map