Files
TJWaterServerBinary/app/algorithms/burst_location/burst_location.py
T

343 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import argparse
import json
import logging
from multiprocessing import cpu_count
from pathlib import Path
from typing import Any, Iterable
import pandas as pd
from app.algorithms.burst_location import leak_simulator
from .burst_locator import (
DN_search_multi_simple_add_flow_count_new,
)
from .network_model import (
_build_node_pipe_maps,
cal_node_coordinate,
construct_graph,
load_inp,
read_inf_inp,
read_inf_inp_other,
)
DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4))
# DEFAULT_N_WORKERS = max(1, cpu_count() - 1)
logger = logging.getLogger(__name__)
def _read_id_list_json(path):
if path is None:
return None
data = json.loads(Path(path).read_text(encoding="utf-8"))
if isinstance(data, list):
return [str(item) for item in data]
if isinstance(data, dict):
if "ids" in data and isinstance(data["ids"], list):
return [str(item) for item in data["ids"]]
raise ValueError(f"ID JSON must be list or dict with key 'ids': {path}")
raise ValueError(f"Unsupported ID JSON format: {path}")
def _read_series_csv(path):
if path is None:
return None
df = pd.read_csv(path)
if df.shape[1] < 2:
raise ValueError(f"CSV must contain at least two columns (id,value): {path}")
if {"id", "value"}.issubset(df.columns):
id_col, value_col = "id", "value"
else:
id_col, value_col = df.columns[0], df.columns[1]
series = pd.Series(
df[value_col].values, index=df[id_col].astype(str).values, dtype=float
)
return series
def _align_scada_series(
series: pd.Series, ids: Iterable[str], series_name: str
) -> pd.Series:
ids = [str(item) for item in ids]
aligned = series.copy()
aligned.index = aligned.index.map(str)
missing_ids = [item for item in ids if item not in aligned.index]
if missing_ids:
preview = ", ".join(missing_ids[:10])
raise ValueError(f"{series_name} missing IDs: {preview}")
aligned = pd.to_numeric(aligned.loc[ids], errors="coerce")
invalid_ids = aligned[aligned.isna()].index.tolist()
if invalid_ids:
preview = ", ".join(invalid_ids[:10])
raise ValueError(
f"{series_name} contains non-numeric values for IDs: {preview}"
)
return aligned
def _validate_flow_inputs(
flow_scada_ids: list[str] | None,
burst_flow: pd.Series | None,
normal_flow: pd.Series | None,
) -> tuple[bool, list[str]]:
has_any_flow = any(
value is not None for value in [flow_scada_ids, burst_flow, normal_flow]
)
has_all_flow = all(
value is not None for value in [flow_scada_ids, burst_flow, normal_flow]
)
if has_any_flow and not has_all_flow:
raise ValueError(
"flow_scada_ids, burst_flow, and normal_flow must be provided together."
)
if not has_all_flow:
return False, []
flow_ids = [str(item) for item in (flow_scada_ids or [])]
if len(flow_ids) == 0:
raise ValueError("flow_scada_ids cannot be empty when flow data is provided.")
return True, flow_ids
def _build_top_candidates(similarity_series: pd.Series) -> list[dict[str, Any]]:
top_series = similarity_series.iloc[:10]
return [
{"pipe_id": str(pipe_id), "similarity": float(score)}
for pipe_id, score in top_series.items()
]
def run_burst_location(
wn_inp_path: str,
pressure_scada_ids: list[str],
burst_pressure: pd.Series,
normal_pressure: pd.Series,
burst_leakage: float,
flow_scada_ids: list[str] | None = None,
burst_flow: pd.Series | None = None,
normal_flow: pd.Series | None = None,
min_dpressure: float = 2.0,
basic_pressure: float = 10.0,
n_workers: int = DEFAULT_N_WORKERS,
partition_on_full_graph: bool = True,
visualize_partition: bool = True,
visualize_pause_seconds: float = 0.3,
final_candidates_csv_path: (
str | None
) = "temp/burst_location/final_round_candidates.csv",
) -> dict[str, Any]:
if pressure_scada_ids is None or len(pressure_scada_ids) == 0:
raise ValueError("pressure_scada_ids cannot be empty.")
if burst_pressure is None or normal_pressure is None:
raise ValueError("burst_pressure and normal_pressure are required.")
has_all_flow, flow_ids = _validate_flow_inputs(
flow_scada_ids=flow_scada_ids,
burst_flow=burst_flow,
normal_flow=normal_flow,
)
inp_path = Path(wn_inp_path)
wn = load_inp(
inp_name=inp_path.name,
inp_location=str(inp_path.parent) + "/",
inp_time=0,
driven_mode="PDD",
require_p=float(basic_pressure),
minimum_p=0.0,
)
(
all_node,
_,
node_coordinates,
all_pipe,
_,
_,
pipe_length,
pipe_diameter,
) = read_inf_inp(wn)
candidate_pipe, _ = leak_simulator.cal_possible_pipe(
burst_leakage, all_pipe, pipe_diameter
)
_, pipe_start_node_all, pipe_end_node_all = read_inf_inp_other(wn)
node_x, node_y = cal_node_coordinate(all_node, node_coordinates)
G0 = construct_graph(wn)
node_pipe_dic, couple_node_length = _build_node_pipe_maps(
all_node,
all_pipe,
pipe_start_node_all,
pipe_end_node_all,
pipe_length,
)
all_node_series = pd.Series(range(len(all_node)), index=all_node)
pressure_ids = [str(item) for item in pressure_scada_ids]
normal_pressure_aligned = _align_scada_series(
normal_pressure, pressure_ids, "normal_pressure"
)
burst_pressure_aligned = _align_scada_series(
burst_pressure, pressure_ids, "burst_pressure"
)
pressure_normal = normal_pressure_aligned.to_frame().T
pressure_monitor = burst_pressure_aligned.to_frame().T
pressure_predict = pressure_normal.copy()
timestep_list = list(pressure_normal.index)
if has_all_flow:
normal_flow_aligned = _align_scada_series(normal_flow, flow_ids, "normal_flow")
burst_flow_aligned = _align_scada_series(burst_flow, flow_ids, "burst_flow")
flow_normal = normal_flow_aligned.to_frame().T
flow_monitor = burst_flow_aligned.to_frame().T
flow_predict = flow_normal.copy()
similarity_mode = "CDF"
max_flow = flow_normal.iloc[0, :].abs()
else:
flow_normal = pd.DataFrame(index=timestep_list)
flow_monitor = pd.DataFrame(index=timestep_list)
flow_predict = pd.DataFrame(index=timestep_list)
similarity_mode = "CAD_new_gy"
max_flow = pd.Series(dtype=float)
stage_timing: dict[str, Any] = {}
try:
(
located_pipe,
elapsed_seconds,
simulation_times,
_,
similarity_series,
exit_condition,
final_candidates_csv,
) = DN_search_multi_simple_add_flow_count_new(
wn=wn,
wn_inp_path=str(inp_path),
G0=G0,
all_node=all_node,
node_x=node_x,
node_y=node_y,
pipe_start_node_all=pipe_start_node_all,
pipe_end_node_all=pipe_end_node_all,
pipe_diameter=pipe_diameter,
couple_node_length=couple_node_length,
node_pipe_dic=node_pipe_dic,
all_node_series=all_node_series,
top_group_ratio=0.3,
top_pipe_num_max=80,
top_pipe_num_min=10,
candidate_pipe_input_initial=candidate_pipe,
similarity_mode=similarity_mode,
pressure_monitor=pressure_monitor,
pressure_predict=pressure_predict,
pressure_normal=pressure_normal,
pressure_leak_all=None,
flow_monitor=flow_monitor,
flow_predict=flow_predict,
flow_normal=flow_normal,
flow_leak_all=None,
timestep_list=timestep_list,
max_flow=max_flow,
group_basic_num=30,
Top_sensor_num=min(5, len(pressure_ids)),
if_gy=0,
pressure_threshold=float(min_dpressure),
leak_mag=float(burst_leakage),
n_workers=max(1, int(n_workers)),
stage_timing=stage_timing,
partition_on_full_graph=partition_on_full_graph,
visualize_partition=visualize_partition,
visualize_pause_seconds=visualize_pause_seconds,
final_candidates_csv_path=final_candidates_csv_path,
)
except Exception as exc:
logger.exception("Burst location algorithm execution failed.")
raise RuntimeError(f"Failed to run burst location algorithm: {exc}") from exc
return {
"located_pipe": located_pipe,
"burst_leakage": float(burst_leakage),
"elapsed_seconds": elapsed_seconds,
"simulation_times": int(simulation_times),
"top_candidates": _build_top_candidates(similarity_series),
"similarity_mode": similarity_mode,
"exit_condition": exit_condition,
"final_candidates_csv": final_candidates_csv,
"stage_timing_seconds": stage_timing,
}
def _parse_args():
parser = argparse.ArgumentParser(description="爆管定位主函数入口")
parser.add_argument("--wn-inp", required=True, help="EPANET inp 文件路径")
parser.add_argument(
"--pressure-ids-json", required=True, help="压力SCADA ID列表 JSON 文件"
)
parser.add_argument(
"--flow-ids-json", default=None, help="(可选)流量SCADA ID列表 JSON 文件"
)
parser.add_argument(
"--burst-pressure-csv", required=True, help="爆管时压力 CSVid,value"
)
parser.add_argument(
"--normal-pressure-csv", required=True, help="正常时压力 CSVid,value"
)
parser.add_argument(
"--burst-flow-csv", default=None, help="(可选)爆管时流量 CSV(id,value"
)
parser.add_argument(
"--normal-flow-csv", default=None, help="(可选)正常时流量 CSV(id,value"
)
parser.add_argument(
"--burst-leakage", type=float, required=True, help="爆管漏损流量"
)
parser.add_argument(
"--min-dpressure",
type=float,
default=2.0,
help="(可选)最小压降阈值,默认 2.0",
)
parser.add_argument(
"--basic-pressure",
type=float,
default=10.0,
help="(可选)基础服务压力,默认 10.0",
)
parser.add_argument(
"--n-workers",
type=int,
default=DEFAULT_N_WORKERS,
help="(可选)特征中心模拟进程数,默认 max(1, min(cpu_count()-1, 4))",
)
parser.add_argument(
"--final-candidates-csv-path",
default="temp/burst_location/final_round_candidates.csv",
help="(可选)最后一轮候选管道明细 CSV 输出路径",
)
return parser.parse_args()
def main():
args = _parse_args()
result = run_burst_location(
wn_inp_path=args.wn_inp,
pressure_scada_ids=_read_id_list_json(args.pressure_ids_json),
burst_pressure=_read_series_csv(args.burst_pressure_csv),
normal_pressure=_read_series_csv(args.normal_pressure_csv),
burst_leakage=args.burst_leakage,
flow_scada_ids=_read_id_list_json(args.flow_ids_json),
burst_flow=_read_series_csv(args.burst_flow_csv),
normal_flow=_read_series_csv(args.normal_flow_csv),
min_dpressure=args.min_dpressure,
basic_pressure=args.basic_pressure,
n_workers=args.n_workers,
final_candidates_csv_path=args.final_candidates_csv_path,
)
print(json.dumps(result, ensure_ascii=False))
if __name__ == "__main__":
main()