import argparse import json import logging from multiprocessing import cpu_count from pathlib import Path from typing import Any, Iterable import pandas as pd from app.algorithms.burst_location import leak_simulator from .burst_locator import ( DN_search_multi_simple_add_flow_count_new, ) from .network_model import ( _build_node_pipe_maps, cal_node_coordinate, construct_graph, load_inp, read_inf_inp, read_inf_inp_other, ) # DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4)) DEFAULT_N_WORKERS = max(1, cpu_count() - 1) logger = logging.getLogger(__name__) def _read_id_list_json(path): if path is None: return None data = json.loads(Path(path).read_text(encoding="utf-8")) if isinstance(data, list): return [str(item) for item in data] if isinstance(data, dict): if "ids" in data and isinstance(data["ids"], list): return [str(item) for item in data["ids"]] raise ValueError(f"ID JSON must be list or dict with key 'ids': {path}") raise ValueError(f"Unsupported ID JSON format: {path}") def _read_series_csv(path): if path is None: return None df = pd.read_csv(path) if df.shape[1] < 2: raise ValueError(f"CSV must contain at least two columns (id,value): {path}") if {"id", "value"}.issubset(df.columns): id_col, value_col = "id", "value" else: id_col, value_col = df.columns[0], df.columns[1] series = pd.Series( df[value_col].values, index=df[id_col].astype(str).values, dtype=float ) return series def _align_scada_series( series: pd.Series, ids: Iterable[str], series_name: str ) -> pd.Series: ids = [str(item) for item in ids] aligned = series.copy() aligned.index = aligned.index.map(str) missing_ids = [item for item in ids if item not in aligned.index] if missing_ids: preview = ", ".join(missing_ids[:10]) raise ValueError(f"{series_name} missing IDs: {preview}") aligned = pd.to_numeric(aligned.loc[ids], errors="coerce") invalid_ids = aligned[aligned.isna()].index.tolist() if invalid_ids: preview = ", ".join(invalid_ids[:10]) raise ValueError( f"{series_name} contains non-numeric values for IDs: {preview}" ) return aligned def _validate_flow_inputs( flow_scada_ids: list[str] | None, burst_flow: pd.Series | None, normal_flow: pd.Series | None, ) -> tuple[bool, list[str]]: has_any_flow = any( value is not None for value in [flow_scada_ids, burst_flow, normal_flow] ) has_all_flow = all( value is not None for value in [flow_scada_ids, burst_flow, normal_flow] ) if has_any_flow and not has_all_flow: raise ValueError( "flow_scada_ids, burst_flow, and normal_flow must be provided together." ) if not has_all_flow: return False, [] flow_ids = [str(item) for item in (flow_scada_ids or [])] if len(flow_ids) == 0: raise ValueError("flow_scada_ids cannot be empty when flow data is provided.") return True, flow_ids def _build_top_candidates(similarity_series: pd.Series) -> list[dict[str, Any]]: top_series = similarity_series.iloc[:10] return [ {"pipe_id": str(pipe_id), "similarity": float(score)} for pipe_id, score in top_series.items() ] def run_burst_location( wn_inp_path: str, pressure_scada_ids: list[str], burst_pressure: pd.Series, normal_pressure: pd.Series, burst_leakage: float, flow_scada_ids: list[str] | None = None, burst_flow: pd.Series | None = None, normal_flow: pd.Series | None = None, min_dpressure: float = 2.0, basic_pressure: float = 10.0, n_workers: int = DEFAULT_N_WORKERS, partition_on_full_graph: bool = True, visualize_partition: bool = True, visualize_pause_seconds: float = 0.3, final_candidates_csv_path: str | None = "temp/burst_location/final_round_candidates.csv", ) -> dict[str, Any]: if pressure_scada_ids is None or len(pressure_scada_ids) == 0: raise ValueError("pressure_scada_ids cannot be empty.") if burst_pressure is None or normal_pressure is None: raise ValueError("burst_pressure and normal_pressure are required.") has_all_flow, flow_ids = _validate_flow_inputs( flow_scada_ids=flow_scada_ids, burst_flow=burst_flow, normal_flow=normal_flow, ) inp_path = Path(wn_inp_path) wn = load_inp( inp_name=inp_path.name, inp_location=str(inp_path.parent) + "/", inp_time=0, driven_mode="PDD", require_p=float(basic_pressure), minimum_p=0.0, ) ( all_node, _, node_coordinates, all_pipe, _, _, pipe_length, pipe_diameter, ) = read_inf_inp(wn) candidate_pipe, _ = leak_simulator.cal_possible_pipe( burst_leakage, all_pipe, pipe_diameter ) _, pipe_start_node_all, pipe_end_node_all = read_inf_inp_other(wn) node_x, node_y = cal_node_coordinate(all_node, node_coordinates) G0 = construct_graph(wn) node_pipe_dic, couple_node_length = _build_node_pipe_maps( all_node, all_pipe, pipe_start_node_all, pipe_end_node_all, pipe_length, ) all_node_series = pd.Series(range(len(all_node)), index=all_node) pressure_ids = [str(item) for item in pressure_scada_ids] normal_pressure_aligned = _align_scada_series( normal_pressure, pressure_ids, "normal_pressure" ) burst_pressure_aligned = _align_scada_series( burst_pressure, pressure_ids, "burst_pressure" ) pressure_normal = normal_pressure_aligned.to_frame().T pressure_monitor = burst_pressure_aligned.to_frame().T pressure_predict = pressure_normal.copy() timestep_list = list(pressure_normal.index) if has_all_flow: normal_flow_aligned = _align_scada_series(normal_flow, flow_ids, "normal_flow") burst_flow_aligned = _align_scada_series(burst_flow, flow_ids, "burst_flow") flow_normal = normal_flow_aligned.to_frame().T flow_monitor = burst_flow_aligned.to_frame().T flow_predict = flow_normal.copy() similarity_mode = "CDF" max_flow = flow_normal.iloc[0, :].abs() else: flow_normal = pd.DataFrame(index=timestep_list) flow_monitor = pd.DataFrame(index=timestep_list) flow_predict = pd.DataFrame(index=timestep_list) similarity_mode = "CAD_new_gy" max_flow = pd.Series(dtype=float) stage_timing: dict[str, Any] = {} try: ( located_pipe, elapsed_seconds, simulation_times, _, similarity_series, exit_condition, final_candidates_csv, ) = ( DN_search_multi_simple_add_flow_count_new( wn=wn, wn_inp_path=str(inp_path), G0=G0, all_node=all_node, node_x=node_x, node_y=node_y, pipe_start_node_all=pipe_start_node_all, pipe_end_node_all=pipe_end_node_all, couple_node_length=couple_node_length, node_pipe_dic=node_pipe_dic, all_node_series=all_node_series, top_group_ratio=0.3, top_pipe_num_max=80, top_pipe_num_min=10, candidate_pipe_input_initial=candidate_pipe, similarity_mode=similarity_mode, pressure_monitor=pressure_monitor, pressure_predict=pressure_predict, pressure_normal=pressure_normal, pressure_leak_all=None, flow_monitor=flow_monitor, flow_predict=flow_predict, flow_normal=flow_normal, flow_leak_all=None, timestep_list=timestep_list, max_flow=max_flow, group_basic_num=30, Top_sensor_num=min(5, len(pressure_ids)), if_gy=0, pressure_threshold=float(min_dpressure), leak_mag=float(burst_leakage), n_workers=max(1, int(n_workers)), stage_timing=stage_timing, partition_on_full_graph=partition_on_full_graph, visualize_partition=visualize_partition, visualize_pause_seconds=visualize_pause_seconds, final_candidates_csv_path=final_candidates_csv_path, ) ) except Exception as exc: logger.exception("Burst location algorithm execution failed.") raise RuntimeError(f"Failed to run burst location algorithm: {exc}") from exc return { "located_pipe": located_pipe, "burst_leakage": float(burst_leakage), "elapsed_seconds": elapsed_seconds, "simulation_times": int(simulation_times), "top_candidates": _build_top_candidates(similarity_series), "similarity_mode": similarity_mode, "exit_condition": exit_condition, "final_candidates_csv": final_candidates_csv, "stage_timing_seconds": stage_timing, } def _parse_args(): parser = argparse.ArgumentParser(description="爆管定位主函数入口") parser.add_argument("--wn-inp", required=True, help="EPANET inp 文件路径") parser.add_argument( "--pressure-ids-json", required=True, help="压力SCADA ID列表 JSON 文件" ) parser.add_argument( "--flow-ids-json", default=None, help="(可选)流量SCADA ID列表 JSON 文件" ) parser.add_argument( "--burst-pressure-csv", required=True, help="爆管时压力 CSV(id,value)" ) parser.add_argument( "--normal-pressure-csv", required=True, help="正常时压力 CSV(id,value)" ) parser.add_argument( "--burst-flow-csv", default=None, help="(可选)爆管时流量 CSV(id,value)" ) parser.add_argument( "--normal-flow-csv", default=None, help="(可选)正常时流量 CSV(id,value)" ) parser.add_argument( "--burst-leakage", type=float, required=True, help="爆管漏损流量" ) parser.add_argument( "--min-dpressure", type=float, default=2.0, help="(可选)最小压降阈值,默认 2.0", ) parser.add_argument( "--basic-pressure", type=float, default=10.0, help="(可选)基础服务压力,默认 10.0", ) parser.add_argument( "--n-workers", type=int, default=DEFAULT_N_WORKERS, help="(可选)特征中心模拟进程数,默认 max(1, min(cpu_count()-1, 4))", ) parser.add_argument( "--final-candidates-csv-path", default="temp/burst_location/final_round_candidates.csv", help="(可选)最后一轮候选管道明细 CSV 输出路径", ) return parser.parse_args() def main(): args = _parse_args() result = run_burst_location( wn_inp_path=args.wn_inp, pressure_scada_ids=_read_id_list_json(args.pressure_ids_json), burst_pressure=_read_series_csv(args.burst_pressure_csv), normal_pressure=_read_series_csv(args.normal_pressure_csv), burst_leakage=args.burst_leakage, flow_scada_ids=_read_id_list_json(args.flow_ids_json), burst_flow=_read_series_csv(args.burst_flow_csv), normal_flow=_read_series_csv(args.normal_flow_csv), min_dpressure=args.min_dpressure, basic_pressure=args.basic_pressure, n_workers=args.n_workers, final_candidates_csv_path=args.final_candidates_csv_path, ) print(json.dumps(result, ensure_ascii=False)) if __name__ == "__main__": main()