From f9111ab9c15d0fcf8a984d0823c45d5871a0fabe Mon Sep 17 00:00:00 2001 From: Jiang Date: Mon, 9 Mar 2026 11:29:57 +0800 Subject: [PATCH] =?UTF-8?q?=E5=87=8F=E5=B0=91=E7=88=86=E7=AE=A1=E5=AE=9A?= =?UTF-8?q?=E4=BD=8D=E4=BB=A3=E7=A0=81=E4=B8=AD=E5=BC=95=E5=85=A5=E7=9A=84?= =?UTF-8?q?=E4=B8=8D=E7=A1=AE=E5=AE=9A=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../burst_location/burst_location.py | 1 + .../burst_location/burst_locator.py | 30 ++++-- .../burst_location/leak_simulator.py | 13 ++- .../burst_location/network_partitioner.py | 97 ++++++++++++++++--- .../burst_location/similarity_calculator.py | 35 ++++--- 5 files changed, 145 insertions(+), 31 deletions(-) diff --git a/app/algorithms/burst_location/burst_location.py b/app/algorithms/burst_location/burst_location.py index af39ee8..0613cb0 100644 --- a/app/algorithms/burst_location/burst_location.py +++ b/app/algorithms/burst_location/burst_location.py @@ -220,6 +220,7 @@ def run_burst_location( node_y=node_y, pipe_start_node_all=pipe_start_node_all, pipe_end_node_all=pipe_end_node_all, + pipe_diameter=pipe_diameter, couple_node_length=couple_node_length, node_pipe_dic=node_pipe_dic, all_node_series=all_node_series, diff --git a/app/algorithms/burst_location/burst_locator.py b/app/algorithms/burst_location/burst_locator.py index c7307b6..981f634 100644 --- a/app/algorithms/burst_location/burst_locator.py +++ b/app/algorithms/burst_location/burst_locator.py @@ -345,6 +345,7 @@ def DN_search_multi_simple_add_flow_count_new( node_y, pipe_start_node_all, pipe_end_node_all, + pipe_diameter, couple_node_length, node_pipe_dic, all_node_series, @@ -415,7 +416,12 @@ def DN_search_multi_simple_add_flow_count_new( # group 分组,得出候选漏损中心 stage_start = perf_counter() - candidate_center_list, candidate_group_list, new_all_node = ( + ( + candidate_center_list, + candidate_group_list, + new_all_node, + candidate_center_candidates, + ) = ( metis_grouping_pipe_weight( G0, wn, @@ -429,6 +435,7 @@ def DN_search_multi_simple_add_flow_count_new( node_pipe_dic, all_node_series, couple_node_length, + pipe_diameter, ) ) _accumulate_stage(stage_timing, "group_partitioning", stage_start) @@ -479,19 +486,30 @@ def DN_search_multi_simple_add_flow_count_new( add_center = [] leak_center_dict = dict() for i in range(len(candidate_center_list)): - houxuan_center = [] + primary_center = candidate_center_list[i] + houxuan_center = [ + center + for center in candidate_center_candidates[i] + if center != primary_center + ] candidate_group_set = set(candidate_group_list[i]) for each_center in record_center_dataset: if ( each_center in candidate_group_set - and each_center != candidate_center_list[i] + and each_center != primary_center ): houxuan_center.append(each_center) add_center = add_center + houxuan_center - houxuan_center.append(candidate_center_list[i]) - leak_center_dict[candidate_center_list[i]] = houxuan_center + leak_center_dict[primary_center] = _dedupe_preserve_order( + houxuan_center + [primary_center] + ) add_center = _dedupe_preserve_order(add_center) - for each_center in candidate_center_list: + for each_group_centers in candidate_center_candidates: + for each_center in each_group_centers: + if each_center not in record_center_set: + record_center_dataset.append(each_center) + record_center_set.add(each_center) + for each_center in add_center: if each_center not in record_center_set: record_center_dataset.append(each_center) record_center_set.add(each_center) diff --git a/app/algorithms/burst_location/leak_simulator.py b/app/algorithms/burst_location/leak_simulator.py index 213cd0a..4b8547d 100644 --- a/app/algorithms/burst_location/leak_simulator.py +++ b/app/algorithms/burst_location/leak_simulator.py @@ -474,6 +474,7 @@ def cal_signature_pipe_multi_pf( float(leak_mag), list(sensor_name), option_values, + list(candidate_center), ), ) as pool: for i, (center_name, pressure_array) in enumerate( @@ -482,6 +483,9 @@ def cal_signature_pipe_multi_pf( pressure_leak.loc[(center_name, slice(None)), :] = pressure_array sys.stdout.write("\r" + "已经完成计算" + str(i + 1) + "个特征中心") else: + # Pre-insert all mid-nodes so every simulation sees the same topology + for center in candidate_center: + ensure_mid_node(wn, center) for i in range(candidate_center_num): temp_prefix = _make_temp_prefix(f"sig_{i}") wn, pressure_output = leak_simulation_pipe_dd_multi_pf( @@ -500,10 +504,17 @@ def cal_signature_pipe_multi_pf( return pressure_leak, candidate_center -def _signature_worker_init(inp_path, leak_mag, sensor_name, option_values): +def _signature_worker_init( + inp_path, leak_mag, sensor_name, option_values, candidate_centers=None +): global _SIGNATURE_WORKER_DATA wn = wntr.network.WaterNetworkModel(inp_path) _apply_hydraulic_options(wn, option_values) + # Pre-insert ALL mid-nodes so every simulation runs on the same topology, + # regardless of which worker handles which task. + if candidate_centers is not None: + for center in candidate_centers: + ensure_mid_node(wn, center) _SIGNATURE_WORKER_DATA = { "wn": wn, "leak_mag": leak_mag, diff --git a/app/algorithms/burst_location/network_partitioner.py b/app/algorithms/burst_location/network_partitioner.py index 81d4cf5..1c43668 100644 --- a/app/algorithms/burst_location/network_partitioner.py +++ b/app/algorithms/burst_location/network_partitioner.py @@ -20,6 +20,17 @@ def _to_metis_edge_weight(edge_weight): return max(1, int(round(weight))) +def _dedupe_preserve_order(items): + seen = set() + output = [] + for item in items: + if item in seen: + continue + seen.add(item) + output.append(item) + return output + + def pick_center_pipe(node_x, node_y, candidate_pipe, pipe_start_node, pipe_end_node): candidate_pipe_list = list(candidate_pipe) start_nodes = pipe_start_node[candidate_pipe_list] @@ -34,14 +45,52 @@ def pick_center_pipe(node_x, node_y, candidate_pipe, pipe_start_node, pipe_end_n return candidate_pipe_list[center_idx] +def pick_max_diameter_pipe(candidate_pipe, pipe_diameter): + candidate_pipe_list = list(candidate_pipe) + diameters = pd.to_numeric( + pipe_diameter.reindex(candidate_pipe_list), errors="coerce" + ).dropna() + if len(diameters) != len(candidate_pipe_list): + missing = sorted(set(candidate_pipe_list) - set(diameters.index)) + preview = ", ".join(map(str, missing[:10])) + raise ValueError(f"Missing or invalid diameter for pipes: {preview}") + + max_diameter = float(diameters.max()) + max_diameter_pipes = sorted( + [pipe for pipe, diameter in diameters.items() if float(diameter) == max_diameter], + key=str, + ) + return max_diameter_pipes[0] + + +def pick_dual_center_pipes( + node_x, node_y, candidate_pipe, pipe_start_node, pipe_end_node, pipe_diameter +): + geometric_center = pick_center_pipe( + node_x, node_y, candidate_pipe, pipe_start_node, pipe_end_node + ) + diameter_center = pick_max_diameter_pipe(candidate_pipe, pipe_diameter) + return _dedupe_preserve_order([geometric_center, diameter_center]) + + def find_new_center_pipe( - node_x, node_y, candidate_pipe, pipe_start_node, pipe_end_node, record_center + node_x, + node_y, + candidate_pipe, + pipe_start_node, + pipe_end_node, + pipe_diameter, + record_center, ): new_candidate_pipe = sorted(set(candidate_pipe) - set(record_center)) if new_candidate_pipe == []: new_candidate_pipe = candidate_pipe center_t = pick_center_pipe( - node_x, node_y, new_candidate_pipe, pipe_start_node, pipe_end_node + node_x, + node_y, + new_candidate_pipe, + pipe_start_node, + pipe_end_node, ) return center_t @@ -66,6 +115,7 @@ def metis_grouping_pipe_weight( node_pipe_dic, all_node_series, couple_node_length, + pipe_diameter, ): all_node_iter_series_new = all_node_series[all_node_iter] all_node_iter_series_new = all_node_iter_series_new.sort_values(ascending=True) @@ -82,10 +132,12 @@ def metis_grouping_pipe_weight( correspond_dic = {} count_node = 0 w = [] - for node_name, neighbors in G1.adjacency(): + for node_name in all_node_iter_new: + neighbors = G1[node_name] w_temp = [] n_t = [node_dict[node_name]] - for neighbor_name, edge_data in neighbors.items(): + for neighbor_name in sorted(neighbors.keys()): + edge_data = neighbors[neighbor_name] edge_key = f"{node_name},{neighbor_name}" reverse_edge_key = f"{neighbor_name},{node_name}" if edge_key in couple_node_length: @@ -122,14 +174,14 @@ def metis_grouping_pipe_weight( w_f = w_f + w_new[i] # (edgecuts, parts) = pymetis.part_graph(nparts=group_num, adjacency=adjacency_list_new) - # metis_options = pymetis.Options() - # metis_options.seed = 42 + metis_options = pymetis.Options() + metis_options.seed = 42 (edgecuts, parts) = pymetis.part_graph( nparts=group_num, adjncy=final_adjacency_list, xadj=xadj, eweights=w_f, - # options=metis_options, + options=metis_options, ) # (edgecuts, parts) = pymetis.part_graph(nparts=group_num, adjacency=adjacency_list_new) candidate_group_list = [[] * 1 for i in range(group_num)] @@ -145,6 +197,7 @@ def metis_grouping_pipe_weight( new_center = [] new_group = [] + new_center_candidates = [] new_all_node = [] candidate_pipe_set = set(candidate_pipe_input) all_grouped_pipe = [] @@ -158,7 +211,7 @@ def metis_grouping_pipe_weight( # 求交集 nodeset = G_sub.nodes() pipeset_set = set(cal_area_node_linked_pipe(nodeset, node_pipe_dic)) - candidate_pipe = list(pipeset_set.intersection(candidate_pipe_set)) + candidate_pipe = sorted(pipeset_set.intersection(candidate_pipe_set)) # 判断集合是否保留 if len(candidate_pipe) > 0: @@ -170,21 +223,30 @@ def metis_grouping_pipe_weight( pipe_start_node_all, pipe_end_node_all, ) + center_candidates_t = pick_dual_center_pipes( + node_x, + node_y, + candidate_pipe, + pipe_start_node_all, + pipe_end_node_all, + pipe_diameter, + ) # 更新 new_center.append(center_t) + new_center_candidates.append(center_candidates_t) new_group.append(candidate_pipe) new_all_node.append(nodeset) all_grouped_pipe = all_grouped_pipe + candidate_pipe else: - for c in sub_graphs: + for c in sorted(sub_graphs, key=lambda c: min(c)): G_temp = G0.subgraph(c) nodeset = G_temp.nodes() pipeset = cal_area_node_linked_pipe(nodeset, node_pipe_dic) pipeset_set = set(pipeset) # 求交集 - candidate_pipe = list(pipeset_set.intersection(candidate_pipe_set)) + candidate_pipe = sorted(pipeset_set.intersection(candidate_pipe_set)) # print(len(candidate_node)) # 判断集合是否保留 if len(candidate_pipe) > 0: @@ -196,8 +258,17 @@ def metis_grouping_pipe_weight( pipe_start_node_all, pipe_end_node_all, ) + center_candidates_t = pick_dual_center_pipes( + node_x, + node_y, + candidate_pipe, + pipe_start_node_all, + pipe_end_node_all, + pipe_diameter, + ) # 更新 new_center.append(center_t) + new_center_candidates.append(center_candidates_t) new_group.append(candidate_pipe) new_all_node.append(nodeset) all_grouped_pipe = all_grouped_pipe + candidate_pipe @@ -217,8 +288,12 @@ def metis_grouping_pipe_weight( each_group, pipe_start_node_all, pipe_end_node_all, + pipe_diameter, record_center, ) + new_center_candidates[c_g] = _dedupe_preserve_order( + [new_center[c_g]] + list(new_center_candidates[c_g]) + ) record_center.append(new_center[c_g]) c_g += 1 @@ -227,7 +302,7 @@ def metis_grouping_pipe_weight( # node_x, node_y, # pipe_start_node_all, pipe_end_node_all # ) - return new_center, new_group, new_all_node + return new_center, new_group, new_all_node, new_center_candidates def visualize_metis_partition( diff --git a/app/algorithms/burst_location/similarity_calculator.py b/app/algorithms/burst_location/similarity_calculator.py index 6ada573..31aee39 100644 --- a/app/algorithms/burst_location/similarity_calculator.py +++ b/app/algorithms/burst_location/similarity_calculator.py @@ -44,7 +44,7 @@ def cal_similarity_simple_return_dd( similarity_dis = 0 else:""" none_flag = 0 - sensor_for_cos = list( + sensor_for_cos = sorted( set(dpressure_s.index).intersection(set(act_dpressure.index)) ) """if len(dpressure_s) ==0 or len(dpressure) ==0: @@ -74,7 +74,7 @@ def cal_similarity_simple_return_dd( none_flag = 1 else:""" none_flag = 0 - important_sensor = list( + important_sensor = sorted( set(important_sensor).intersection(set(act_dpressure.index)) ) part_dpressure = dpressure_s[important_sensor] - dpressure[important_sensor] @@ -107,7 +107,7 @@ def cal_similarity_simple_return_dd( elif similarity_mode == "CAD_new_gy" or similarity_mode == "CDF": # cos - sensor_for_cos = list( + sensor_for_cos = sorted( set(dpressure_s.index).intersection(set(act_dpressure.index)) ) if len(sensor_for_cos) == 0 and len(dpressure_s) == 0: @@ -141,7 +141,7 @@ def cal_similarity_simple_return_dd( similarity_cos = s1 / s2 # DIS - important_sensor_new = list( + important_sensor_new = sorted( set(important_sensor).intersection(set(act_dpressure.index)) ) if len(important_sensor_new) == 0: @@ -172,7 +172,7 @@ def cal_similarity_simple_return_dd( similarity_cos = 0 none_flag = 0 # DIS - important_sensor_new = list( + important_sensor_new = sorted( set(important_sensor).intersection(set(act_dpressure.index)) ) if len(important_sensor_new) == 0: @@ -762,15 +762,24 @@ def update_similarity(leak_candidate_center, similarity, leak_center_dict): return similarity_new -def extra_judge(similarity): +def extra_judge( + similarity, min_candidates_to_prune: int = 200, std_relax_factor: float = 0.5 +): + if len(similarity.index) == 0: + return 1.0, similarity + if len(similarity.index) < int(min_candidates_to_prune): + return 1.0, similarity + mean_similarity = float(similarity.mean()) - record_sensor = [] - record_value = [] - for i in range(len(similarity.index)): - if similarity.iloc[i] >= mean_similarity - 1e-10: - record_value.append(similarity.iloc[i]) - record_sensor.append(similarity.index[i]) - out_put_similarity = pd.Series(record_value, index=record_sensor, dtype=float) + std_similarity = float(similarity.std()) + if not math.isfinite(std_similarity): + std_similarity = 0.0 + + threshold = mean_similarity - float(std_relax_factor) * std_similarity + out_put_similarity = similarity[similarity >= threshold - 1e-10] + if len(out_put_similarity.index) == 0: + out_put_similarity = similarity.iloc[:1] + cut_ratio = len(out_put_similarity.index) / len(similarity.index) return cut_ratio, out_put_similarity