import math import os from collections import deque from datetime import datetime from typing import Any import numpy as np import pandas as pd from app.algorithms.leakage_identifier import LeakageIdentifier from app.infra.db.influxdb import api as influxdb_api from app.services.scheme_management import ( query_leakage_identify_scheme_detail, query_leakage_identify_schemes, scheme_name_exists, store_leakage_identify_result, store_scheme_info, ) from app.services.tjnetwork import ( dump_inp, get_all_scada_info, get_network_link_nodes, get_network_node_coords, ) def run_leakage_identification( network: str, observed_pressure_data: ( str | pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]] | None ) = None, start_time: float = 0, duration: float = 24, timestep: float = 5, q_sum: float = 0.2, q_sum_unit: str = "m3/s", output_dir: str = "Results", pop_size: int = 50, max_gen: int = 100, output_flow_unit: str = "m3/s", dma_count: int | None = None, scada_start: datetime | str | None = None, scada_end: datetime | str | None = None, sensor_nodes: list[str] | None = None, scheme_name: str | None = None, username: str = "admin", ) -> dict[str, Any]: os.makedirs(output_dir, exist_ok=True) inp_path = os.path.join(output_dir, f"{network}.leakage.inp") dump_inp(network, inp_path, "2") selected_sensor_nodes = ( list(dict.fromkeys([node for node in (sensor_nodes or []) if node])) if sensor_nodes else _get_pressure_sensor_nodes(network) ) if not selected_sensor_nodes: raise ValueError("未提供有效传感器节点,且系统未识别到可用压力传感器。") area_map, areas, drawing_payload = _build_area_map_by_topology( network, selected_sensor_nodes, dma_count ) observed_source = "request_payload" if scada_start is not None or scada_end is not None: observed_df = _build_observed_pressure_from_scada( network=network, sensor_nodes=selected_sensor_nodes, scada_start=scada_start, scada_end=scada_end, ) observed_source = "backend_timerange" else: if observed_pressure_data is None: raise ValueError("未提供 observed_pressure_data,且未提供 scada_start/scada_end。") observed_df = observed_pressure_data q_sum_m3s = LeakageIdentifier._flow_to_m3s(q_sum, q_sum_unit) identifier = LeakageIdentifier( inp_path=inp_path, sensor_nodes=selected_sensor_nodes, area_map=area_map, start_time=start_time, duration=duration, timestep=timestep, q_sum=q_sum_m3s, ) result_df = identifier.run_identification( observed_pressure_data=observed_df, output_dir=output_dir, pop_size=pop_size, max_gen=max_gen, output_flow_unit=output_flow_unit, save_result=False, ) rows = result_df.to_dict(orient="records") payload = { "result_path": result_df.attrs.get("result_path"), "sensor_nodes": selected_sensor_nodes, "observed_source": observed_source, "area_count": len(set(area_map.values())), "node_area_map": area_map, "areas": areas, "drawing_payload": drawing_payload, "rows": rows, } if scheme_name: if scheme_name_exists(network, scheme_name): raise ValueError(f"方案名称已存在: {scheme_name}") scheme_start_time = ( _to_datetime(scada_start).isoformat() if scada_start is not None else datetime.now().isoformat() ) scheme_detail = { "network": network, "dma_count": dma_count, "sensor_nodes": selected_sensor_nodes, "scada_start": _to_datetime(scada_start).isoformat() if scada_start is not None else None, "scada_end": _to_datetime(scada_end).isoformat() if scada_end is not None else None, "algorithm_params": { "start_time": start_time, "duration": duration, "timestep": timestep, "q_sum": q_sum, "q_sum_unit": q_sum_unit, "output_flow_unit": output_flow_unit, "pop_size": pop_size, "max_gen": max_gen, }, "result_summary": { "area_count": len(set(area_map.values())), "max_leakage": max( (float(row.get("LeakageFlow_m3_per_s", 0.0)) for row in rows), default=0.0 ), }, } store_scheme_info( name=network, scheme_name=scheme_name, scheme_type="dma_leak_identification", username=username, scheme_start_time=scheme_start_time, scheme_detail=scheme_detail, ) store_leakage_identify_result( name=network, scheme_name=scheme_name, network=network, sensor_nodes=selected_sensor_nodes, result_rows=rows, node_area_map=area_map, areas=areas, drawing_payload=drawing_payload, ) payload["scheme_name"] = scheme_name return payload def list_leakage_identify_schemes( network: str, query_date: datetime | str | None = None ) -> list[dict[str, Any]]: parsed_date = _to_datetime(query_date).date() if query_date is not None else None return query_leakage_identify_schemes( name=network, network=network, query_date=parsed_date ) def get_leakage_identify_scheme_detail(network: str, scheme_name: str) -> dict[str, Any]: result = query_leakage_identify_scheme_detail(network, scheme_name) if not result: raise ValueError(f"未找到漏损识别方案: {scheme_name}") return result def _get_pressure_sensor_nodes(network: str) -> list[str]: scada_info = get_all_scada_info(network) sensor_nodes: list[str] = [] for item in scada_info: scada_type = str(item.get("type", "")).lower() if scada_type != "pressure": continue node_id = item.get("associated_element_id") if isinstance(node_id, str) and node_id: sensor_nodes.append(node_id) sensor_nodes = list(dict.fromkeys(sensor_nodes)) if not sensor_nodes: raise ValueError("未找到压力传感器对应节点(scada_info.type=pressure)。") return sensor_nodes def _build_area_map_by_topology( network: str, sensor_nodes: list[str], dma_count: int | None ) -> tuple[dict[str, str], list[dict[str, Any]], dict[str, Any]]: node_coords = get_network_node_coords(network) all_nodes = list(node_coords.keys()) if not all_nodes: raise ValueError("管网中未获取到可分区节点。") available_sensors = [node for node in sensor_nodes if node in node_coords] if not available_sensors: raise ValueError("无可用压力传感器,无法生成虚拟分区。") area_count = _resolve_dma_count(dma_count, available_sensors, all_nodes) sensor_area_map = _cluster_sensors_to_areas(available_sensors, node_coords, area_count) adjacency = _build_adjacency(network, all_nodes) distance_by_sensor = { sensor: _bfs_distances(adjacency, sensor) for sensor in available_sensors } assignment_count = {sensor: 0 for sensor in available_sensors} area_map: dict[str, str] = {} for node_id in sorted(all_nodes): sensor = _choose_sensor_for_node( node_id=node_id, sensors=available_sensors, node_coords=node_coords, distance_by_sensor=distance_by_sensor, assignment_count=assignment_count, ) assignment_count[sensor] += 1 area_map[node_id] = sensor_area_map[sensor] if not area_map: raise ValueError("虚拟分区结果为空,无法生成节点区域映射。") areas = _build_area_meta(area_map, sensor_area_map) drawing_payload = _build_drawing_payload(areas, node_coords) return area_map, areas, drawing_payload def _resolve_dma_count( dma_count: int | None, sensor_nodes: list[str], all_nodes: list[str] ) -> int: if dma_count is None: return min(len(sensor_nodes), len(all_nodes)) if dma_count <= 0: raise ValueError("dma_count 必须大于 0。") if dma_count > len(all_nodes): raise ValueError("dma_count 不能大于可分区节点数量。") if dma_count > len(sensor_nodes): raise ValueError("dma_count 不能大于可用传感器数量。") return dma_count def _cluster_sensors_to_areas( sensor_nodes: list[str], node_coords: dict[str, dict[str, float]], area_count: int ) -> dict[str, str]: if area_count >= len(sensor_nodes): return {sensor: str(i + 1) for i, sensor in enumerate(sensor_nodes)} points = np.array( [[float(node_coords[s]["x"]), float(node_coords[s]["y"])] for s in sensor_nodes], dtype=float, ) centers = points[:area_count].copy() labels = np.zeros(points.shape[0], dtype=int) for _ in range(20): d2 = ((points[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2) new_labels = d2.argmin(axis=1) if np.array_equal(labels, new_labels): break labels = new_labels for i in range(area_count): cluster_points = points[labels == i] if cluster_points.size > 0: centers[i] = cluster_points.mean(axis=0) return {sensor: str(int(labels[idx]) + 1) for idx, sensor in enumerate(sensor_nodes)} def _build_adjacency(network: str, all_nodes: list[str]) -> dict[str, set[str]]: adjacency: dict[str, set[str]] = {node: set() for node in all_nodes} for link in get_network_link_nodes(network): parts = str(link).split(":") if len(parts) < 4: continue node1, node2 = parts[-2], parts[-1] if node1 in adjacency and node2 in adjacency: adjacency[node1].add(node2) adjacency[node2].add(node1) return adjacency def _bfs_distances(adjacency: dict[str, set[str]], start: str) -> dict[str, int]: distances: dict[str, int] = {start: 0} queue: deque[str] = deque([start]) while queue: node = queue.popleft() for neighbor in adjacency.get(node, set()): if neighbor in distances: continue distances[neighbor] = distances[node] + 1 queue.append(neighbor) return distances def _choose_sensor_for_node( node_id: str, sensors: list[str], node_coords: dict[str, dict[str, float]], distance_by_sensor: dict[str, dict[str, int]], assignment_count: dict[str, int], ) -> str: min_distance = None candidates: list[str] = [] for sensor in sensors: d = distance_by_sensor.get(sensor, {}).get(node_id) if d is None: continue if min_distance is None or d < min_distance: min_distance = d candidates = [sensor] elif d == min_distance: candidates.append(sensor) if not candidates: node_coord = node_coords[node_id] return min( sensors, key=lambda sensor: _euclidean_distance( node_coord, node_coords.get(sensor, node_coord) ), ) return min(candidates, key=lambda sensor: (assignment_count[sensor], sensor)) def _euclidean_distance(a: dict[str, float], b: dict[str, float]) -> float: return math.hypot(float(a["x"]) - float(b["x"]), float(a["y"]) - float(b["y"])) def _build_area_meta( area_map: dict[str, str], sensor_area_map: dict[str, str] ) -> list[dict[str, Any]]: nodes_by_area: dict[str, list[str]] = {} for node_id, area_id in area_map.items(): nodes_by_area.setdefault(area_id, []).append(node_id) sensors_by_area: dict[str, list[str]] = {} for sensor, area_id in sensor_area_map.items(): sensors_by_area.setdefault(area_id, []).append(sensor) areas: list[dict[str, Any]] = [] for area_id in sorted(nodes_by_area.keys(), key=lambda x: int(x)): node_ids = sorted(nodes_by_area.get(area_id, [])) sensor_nodes = sorted(sensors_by_area.get(area_id, [])) areas.append( { "area_id": area_id, "sensor_nodes": sensor_nodes, "node_ids": node_ids, "node_count": len(node_ids), } ) return areas def _build_drawing_payload( areas: list[dict[str, Any]], node_coords: dict[str, dict[str, float]] ) -> dict[str, Any]: features: list[dict[str, Any]] = [] for area in areas: points = [ ( float(node_coords[node_id]["x"]), float(node_coords[node_id]["y"]), ) for node_id in area["node_ids"] if node_id in node_coords ] ring = _points_to_polygon_ring(points) features.append( { "type": "Feature", "properties": { "area_id": area["area_id"], "node_count": area["node_count"], "sensor_nodes": area["sensor_nodes"], }, "geometry": {"type": "Polygon", "coordinates": [ring]}, } ) return {"type": "FeatureCollection", "features": features} def _points_to_polygon_ring(points: list[tuple[float, float]]) -> list[list[float]]: if not points: return [] unique_points = list(dict.fromkeys(points)) if len(unique_points) == 1: x, y = unique_points[0] delta = 1e-6 return [ [x - delta, y - delta], [x + delta, y - delta], [x + delta, y + delta], [x - delta, y + delta], [x - delta, y - delta], ] if len(unique_points) == 2: (x1, y1), (x2, y2) = unique_points dx, dy = x2 - x1, y2 - y1 length = math.hypot(dx, dy) if length == 0: return _points_to_polygon_ring([unique_points[0]]) width = max(length * 0.02, 1e-6) nx, ny = -dy / length * width, dx / length * width return [ [x1 + nx, y1 + ny], [x2 + nx, y2 + ny], [x2 - nx, y2 - ny], [x1 - nx, y1 - ny], [x1 + nx, y1 + ny], ] hull = _convex_hull(unique_points) ring = [[x, y] for x, y in hull] ring.append([hull[0][0], hull[0][1]]) return ring def _convex_hull(points: list[tuple[float, float]]) -> list[tuple[float, float]]: pts = sorted(points) if len(pts) <= 1: return pts def cross( o: tuple[float, float], a: tuple[float, float], b: tuple[float, float] ) -> float: return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0]) lower: list[tuple[float, float]] = [] for p in pts: while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0: lower.pop() lower.append(p) upper: list[tuple[float, float]] = [] for p in reversed(pts): while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0: upper.pop() upper.append(p) return lower[:-1] + upper[:-1] def _build_observed_pressure_from_scada( network: str, sensor_nodes: list[str], scada_start: datetime | str | None, scada_end: datetime | str | None, ) -> pd.DataFrame: if scada_start is None or scada_end is None: raise ValueError("使用后端 SCADA 查询时必须同时提供 scada_start 与 scada_end。") start_dt = _to_datetime(scada_start) end_dt = _to_datetime(scada_end) if start_dt >= end_dt: raise ValueError("SCADA 时间窗非法:scada_start 必须早于 scada_end。") node_query_id: dict[str, str] = {} for item in get_all_scada_info(network): if str(item.get("type", "")).lower() != "pressure": continue node_id = item.get("associated_element_id") query_id = item.get("api_query_id") if isinstance(node_id, str) and node_id and isinstance(query_id, str) and query_id: node_query_id[node_id] = query_id query_ids = [node_query_id[node] for node in sensor_nodes if node in node_query_id] if not query_ids: raise ValueError("未找到可用于压力观测的 SCADA api_query_id。") scada_data = influxdb_api.query_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=start_dt.isoformat(), end_time=end_dt.isoformat(), ) available_lengths = [ len(scada_data.get(query_id, [])) for query_id in query_ids if len(scada_data.get(query_id, [])) > 0 ] if not available_lengths: raise ValueError("指定时间窗内未查询到压力 SCADA 数据。") min_len = min(available_lengths) obs_df = pd.DataFrame() for node_id in sensor_nodes: query_id = node_query_id.get(node_id) if not query_id: continue records = scada_data.get(query_id, [])[:min_len] if len(records) < min_len: continue obs_df[node_id] = [float(item["value"]) for item in records] if obs_df.empty: raise ValueError("SCADA 压力数据无法构建观测矩阵。") return obs_df def _to_datetime(value: datetime | str) -> datetime: if isinstance(value, datetime): return value return datetime.fromisoformat(value)