Files
TJWaterServerBinary/app/services/leakage_identifier.py
T

528 lines
18 KiB
Python

import math
import os
from collections import deque
from datetime import datetime
from typing import Any
import numpy as np
import pandas as pd
import wntr
from app.algorithms.leakage_identifier import LeakageIdentifier
from app.infra.db.timescaledb.internal_queries import InternalQueries
from app.services.scheme_management import (
query_leakage_identify_scheme_detail,
query_leakage_identify_schemes,
scheme_name_exists,
store_leakage_identify_result,
store_scheme_info,
)
from app.services.tjnetwork import (
dump_inp,
get_all_scada_info,
get_network_link_nodes,
get_network_node_coords,
)
DEFAULT_N_WORKERS = max(1, min((os.cpu_count() or 1) - 1, 4))
def run_leakage_identification(
network: str,
username: str,
observed_pressure_data: (
str | pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]] | None
) = None,
start_time: float = 0,
duration: float = 24,
timestep: float = 5,
q_sum: float = 0.2,
q_sum_unit: str = "m3/s",
output_dir: str = "db_inp",
pop_size: int = 50,
max_gen: int = 100,
n_workers: int = DEFAULT_N_WORKERS,
output_flow_unit: str = "m3/s",
dma_count: int | None = None,
scada_start: datetime | str | None = None,
scada_end: datetime | str | None = None,
sensor_nodes: list[str] | None = None,
scheme_name: str | None = None,
) -> dict[str, Any]:
os.makedirs(output_dir, exist_ok=True)
inp_path = _prepare_leakage_inp(network)
selected_sensor_nodes = (
list(dict.fromkeys([node for node in (sensor_nodes or []) if node]))
if sensor_nodes
else _get_pressure_sensor_nodes(network)
)
if not selected_sensor_nodes:
raise ValueError("未提供有效传感器节点,且系统未识别到可用压力传感器。")
area_map, areas, node_coords = _build_area_map_by_topology(
network, selected_sensor_nodes, dma_count
)
observed_source = "request_payload"
if scada_start is not None or scada_end is not None:
observed_df = _build_observed_pressure_from_scada(
network=network,
sensor_nodes=selected_sensor_nodes,
scada_start=scada_start,
scada_end=scada_end,
)
observed_source = "backend_timerange"
else:
if observed_pressure_data is None:
raise ValueError(
"未提供 observed_pressure_data,且未提供 scada_start/scada_end。"
)
observed_df = observed_pressure_data
q_sum_m3s = LeakageIdentifier._flow_to_m3s(q_sum, q_sum_unit)
identifier = LeakageIdentifier(
inp_path=inp_path,
sensor_nodes=selected_sensor_nodes,
area_map=area_map,
start_time=start_time,
duration=duration,
timestep=timestep,
q_sum=q_sum_m3s,
)
result_df = identifier.run_identification(
observed_pressure_data=observed_df,
output_dir=output_dir,
pop_size=pop_size,
max_gen=max_gen,
n_workers=n_workers,
output_flow_unit=output_flow_unit,
save_result=False,
)
rows = result_df.to_dict(orient="records")
# node_visual_payload = _build_node_visual_payload(area_map, node_coords, rows)
# drawing_payload = _build_drawing_payload(node_visual_payload)
payload = {
"result_path": result_df.attrs.get("result_path"),
"sensor_nodes": selected_sensor_nodes,
"observed_source": observed_source,
"area_count": len(set(area_map.values())),
"node_area_map": area_map,
"areas": areas,
# "node_visual_payload": node_visual_payload,
# "drawing_payload": drawing_payload,
"rows": rows,
}
if scheme_name:
if scheme_name_exists(network, scheme_name):
raise ValueError(f"方案名称已存在: {scheme_name}")
scheme_start_time = (
_to_datetime(scada_start).isoformat()
if scada_start is not None
else datetime.now().isoformat()
)
scheme_detail = {
"network": network,
"dma_count": dma_count,
"sensor_nodes": selected_sensor_nodes,
"scada_start": (
_to_datetime(scada_start).isoformat()
if scada_start is not None
else None
),
"scada_end": (
_to_datetime(scada_end).isoformat() if scada_end is not None else None
),
"algorithm_params": {
"start_time": start_time,
"duration": duration,
"timestep": timestep,
"q_sum": q_sum,
"q_sum_unit": q_sum_unit,
"output_flow_unit": output_flow_unit,
"pop_size": pop_size,
"max_gen": max_gen,
"n_workers": n_workers,
},
"result_summary": {
"area_count": len(set(area_map.values())),
"max_leakage": max(
(float(row.get("LeakageFlow_m3_per_s", 0.0)) for row in rows),
default=0.0,
),
},
}
store_scheme_info(
name=network,
scheme_name=scheme_name,
scheme_type="dma_leak_identification",
username=username,
scheme_start_time=scheme_start_time,
scheme_detail=scheme_detail,
)
store_leakage_identify_result(
name=network,
scheme_name=scheme_name,
network=network,
sensor_nodes=selected_sensor_nodes,
result_rows=rows,
node_area_map=area_map,
areas=areas,
drawing_payload={},
)
payload["scheme_name"] = scheme_name
return payload
def list_leakage_identify_schemes(
network: str, query_date: datetime | str | None = None
) -> list[dict[str, Any]]:
parsed_date = _to_datetime(query_date).date() if query_date is not None else None
return query_leakage_identify_schemes(
name=network, network=network, query_date=parsed_date
)
def get_leakage_identify_scheme_detail(
network: str, scheme_name: str
) -> dict[str, Any]:
result = query_leakage_identify_scheme_detail(network, scheme_name)
if not result:
raise ValueError(f"未找到漏损识别方案: {scheme_name}")
return result
def _get_pressure_sensor_nodes(network: str) -> list[str]:
scada_info = get_all_scada_info(network)
sensor_nodes: list[str] = []
for item in scada_info:
scada_type = str(item.get("type", "")).lower()
if scada_type != "pressure":
continue
node_id = item.get("associated_element_id")
if isinstance(node_id, str) and node_id:
sensor_nodes.append(node_id)
sensor_nodes = list(dict.fromkeys(sensor_nodes))
if not sensor_nodes:
raise ValueError("未找到压力传感器对应节点(scada_info.type=pressure)。")
return sensor_nodes
def _build_area_map_by_topology(
network: str, sensor_nodes: list[str], dma_count: int | None
) -> tuple[dict[str, str], list[dict[str, Any]], dict[str, dict[str, float]]]:
node_coords = get_network_node_coords(network)
all_nodes = list(node_coords.keys())
if not all_nodes:
raise ValueError("管网中未获取到可分区节点。")
available_sensors = [node for node in sensor_nodes if node in node_coords]
if not available_sensors:
raise ValueError("无可用压力传感器,无法生成虚拟分区。")
area_count = _resolve_dma_count(dma_count, available_sensors, all_nodes)
sensor_area_map = _cluster_sensors_to_areas(
available_sensors, node_coords, area_count
)
adjacency = _build_adjacency(network, all_nodes)
distance_by_sensor = {
sensor: _bfs_distances(adjacency, sensor) for sensor in available_sensors
}
assignment_count = {sensor: 0 for sensor in available_sensors}
area_map: dict[str, str] = {}
for node_id in sorted(all_nodes):
sensor = _choose_sensor_for_node(
node_id=node_id,
sensors=available_sensors,
node_coords=node_coords,
distance_by_sensor=distance_by_sensor,
assignment_count=assignment_count,
)
assignment_count[sensor] += 1
area_map[node_id] = sensor_area_map[sensor]
if not area_map:
raise ValueError("虚拟分区结果为空,无法生成节点区域映射。")
areas = _build_area_meta(area_map, sensor_area_map)
return area_map, areas, node_coords
def _resolve_dma_count(
dma_count: int | None, sensor_nodes: list[str], all_nodes: list[str]
) -> int:
if dma_count is None:
return min(len(sensor_nodes), len(all_nodes))
if dma_count <= 0:
raise ValueError("dma_count 必须大于 0。")
if dma_count > len(all_nodes):
raise ValueError("dma_count 不能大于可分区节点数量。")
if dma_count > len(sensor_nodes):
raise ValueError("dma_count 不能大于可用传感器数量。")
return dma_count
def _cluster_sensors_to_areas(
sensor_nodes: list[str], node_coords: dict[str, dict[str, float]], area_count: int
) -> dict[str, str]:
if area_count >= len(sensor_nodes):
return {sensor: str(i + 1) for i, sensor in enumerate(sensor_nodes)}
points = np.array(
[
[float(node_coords[s]["x"]), float(node_coords[s]["y"])]
for s in sensor_nodes
],
dtype=float,
)
centers = points[:area_count].copy()
labels = np.zeros(points.shape[0], dtype=int)
for _ in range(20):
d2 = ((points[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2)
new_labels = d2.argmin(axis=1)
if np.array_equal(labels, new_labels):
break
labels = new_labels
for i in range(area_count):
cluster_points = points[labels == i]
if cluster_points.size > 0:
centers[i] = cluster_points.mean(axis=0)
return {
sensor: str(int(labels[idx]) + 1) for idx, sensor in enumerate(sensor_nodes)
}
def _build_adjacency(network: str, all_nodes: list[str]) -> dict[str, set[str]]:
adjacency: dict[str, set[str]] = {node: set() for node in all_nodes}
for link in get_network_link_nodes(network):
parts = str(link).split(":")
if len(parts) < 4:
continue
node1, node2 = parts[-2], parts[-1]
if node1 in adjacency and node2 in adjacency:
adjacency[node1].add(node2)
adjacency[node2].add(node1)
return adjacency
def _bfs_distances(adjacency: dict[str, set[str]], start: str) -> dict[str, int]:
distances: dict[str, int] = {start: 0}
queue: deque[str] = deque([start])
while queue:
node = queue.popleft()
for neighbor in adjacency.get(node, set()):
if neighbor in distances:
continue
distances[neighbor] = distances[node] + 1
queue.append(neighbor)
return distances
def _choose_sensor_for_node(
node_id: str,
sensors: list[str],
node_coords: dict[str, dict[str, float]],
distance_by_sensor: dict[str, dict[str, int]],
assignment_count: dict[str, int],
) -> str:
min_distance = None
candidates: list[str] = []
for sensor in sensors:
d = distance_by_sensor.get(sensor, {}).get(node_id)
if d is None:
continue
if min_distance is None or d < min_distance:
min_distance = d
candidates = [sensor]
elif d == min_distance:
candidates.append(sensor)
if not candidates:
node_coord = node_coords[node_id]
return min(
sensors,
key=lambda sensor: _euclidean_distance(
node_coord, node_coords.get(sensor, node_coord)
),
)
return min(candidates, key=lambda sensor: (assignment_count[sensor], sensor))
def _euclidean_distance(a: dict[str, float], b: dict[str, float]) -> float:
return math.hypot(float(a["x"]) - float(b["x"]), float(a["y"]) - float(b["y"]))
def _build_area_meta(
area_map: dict[str, str], sensor_area_map: dict[str, str]
) -> list[dict[str, Any]]:
nodes_by_area: dict[str, list[str]] = {}
for node_id, area_id in area_map.items():
nodes_by_area.setdefault(area_id, []).append(node_id)
sensors_by_area: dict[str, list[str]] = {}
for sensor, area_id in sensor_area_map.items():
sensors_by_area.setdefault(area_id, []).append(sensor)
areas: list[dict[str, Any]] = []
for area_id in sorted(nodes_by_area.keys(), key=lambda x: int(x)):
node_ids = sorted(nodes_by_area.get(area_id, []))
sensor_nodes = sorted(sensors_by_area.get(area_id, []))
areas.append(
{
"area_id": area_id,
"sensor_nodes": sensor_nodes,
"node_ids": node_ids,
"node_count": len(node_ids),
}
)
return areas
def _build_area_node_map(area_map: dict[str, str]) -> dict[str, list[str]]:
area_node_map: dict[str, list[str]] = {}
for node_id, area_id in area_map.items():
area_node_map.setdefault(area_id, []).append(node_id)
for area_id in list(area_node_map.keys()):
area_node_map[area_id] = sorted(area_node_map[area_id])
return area_node_map
def _build_node_visual_payload(
area_map: dict[str, str],
node_coords: dict[str, dict[str, float]],
rows: list[dict[str, Any]],
) -> dict[str, Any]:
area_leakage_map = _build_area_leakage_map(rows)
max_leakage = max(area_leakage_map.values(), default=0.0)
features: list[dict[str, Any]] = []
for node_id, area_id in area_map.items():
coord = node_coords.get(node_id)
if not coord:
continue
leakage_flow = float(area_leakage_map.get(area_id, 0.0))
leakage_level = _classify_leakage_level(leakage_flow, max_leakage)
features.append(
{
"type": "Feature",
"properties": {
"node_id": node_id,
"area_id": area_id,
"leakage_flow_m3_per_s": leakage_flow,
"leakage_level": leakage_level,
},
"geometry": {
"type": "Point",
"coordinates": [float(coord["x"]), float(coord["y"])],
},
}
)
return {"type": "FeatureCollection", "features": features}
def _build_area_leakage_map(rows: list[dict[str, Any]]) -> dict[str, float]:
area_leakage_map: dict[str, float] = {}
for row in rows:
area_id = str(row.get("Area", "")).strip()
if not area_id:
continue
area_leakage_map[area_id] = float(row.get("LeakageFlow_m3_per_s", 0.0))
return area_leakage_map
def _classify_leakage_level(leakage_flow: float, max_leakage: float) -> str:
if max_leakage <= 0:
return "normal"
ratio = leakage_flow / max_leakage
if ratio >= 0.75:
return "high"
if ratio >= 0.4:
return "medium"
if ratio > 0:
return "low"
return "normal"
def _build_drawing_payload(node_visual_payload: dict[str, Any]) -> dict[str, Any]:
return node_visual_payload
def _build_observed_pressure_from_scada(
network: str,
sensor_nodes: list[str],
scada_start: datetime | str | None,
scada_end: datetime | str | None,
) -> pd.DataFrame:
if scada_start is None or scada_end is None:
raise ValueError("使用后端 SCADA 查询时必须同时提供 scada_start 与 scada_end。")
start_dt = _to_datetime(scada_start)
end_dt = _to_datetime(scada_end)
if start_dt >= end_dt:
raise ValueError("SCADA 时间窗非法:scada_start 必须早于 scada_end。")
node_query_id: dict[str, str] = {}
for item in get_all_scada_info(network):
if str(item.get("type", "")).lower() != "pressure":
continue
node_id = item.get("associated_element_id")
query_id = item.get("api_query_id")
if (
isinstance(node_id, str)
and node_id
and isinstance(query_id, str)
and query_id
):
node_query_id[node_id] = query_id
query_ids = [node_query_id[node] for node in sensor_nodes if node in node_query_id]
if not query_ids:
raise ValueError("未找到可用于压力观测的 SCADA api_query_id。")
scada_data = InternalQueries.query_scada_by_ids_timerange(
db_name=network,
device_ids=query_ids,
start_time=start_dt.isoformat(),
end_time=end_dt.isoformat(),
)
available_lengths = [
len(scada_data.get(query_id, []))
for query_id in query_ids
if len(scada_data.get(query_id, [])) > 0
]
if not available_lengths:
raise ValueError("指定时间窗内未查询到压力 SCADA 数据。")
min_len = min(available_lengths)
obs_df = pd.DataFrame()
for node_id in sensor_nodes:
query_id = node_query_id.get(node_id)
if not query_id:
continue
records = scada_data.get(query_id, [])[:min_len]
if len(records) < min_len:
continue
obs_df[node_id] = [float(item["value"]) for item in records]
if obs_df.empty:
raise ValueError("SCADA 压力数据无法构建观测矩阵。")
return obs_df
def _to_datetime(value: datetime | str) -> datetime:
if isinstance(value, datetime):
return value
return datetime.fromisoformat(value)
def _prepare_leakage_inp(network: str) -> str:
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
db_inp_dir = os.path.join(project_root, "db_inp")
os.makedirs(db_inp_dir, exist_ok=True)
inp_path = os.path.join(db_inp_dir, f"{network}.leakage.inp")
if os.path.isfile(inp_path) and os.path.getsize(inp_path) > 0:
return inp_path
dump_inp(network, inp_path, "2")
if not os.path.isfile(inp_path) or os.path.getsize(inp_path) <= 0:
raise ValueError(f"漏损识别 INP 文件无效: {inp_path}")
return inp_path