diff --git a/app/algorithms/leakage_identifier.py b/app/algorithms/leakage_identifier.py index c85755e..328ec63 100644 --- a/app/algorithms/leakage_identifier.py +++ b/app/algorithms/leakage_identifier.py @@ -1,10 +1,11 @@ import wntr import numpy as np -import pandas as pd -import os -import time -import argparse -from typing import Any, List, Dict, Union +import pandas as pd +import os +import time +import argparse +from multiprocessing import Pool, cpu_count +from typing import Any, List, Dict, Union from pymoo.core.problem import Problem from pymoo.core.callback import Callback @@ -12,10 +13,123 @@ from pymoo.algorithms.soo.nonconvex.ga import GA from pymoo.operators.crossover.sbx import SBX from pymoo.operators.mutation.pm import PM from pymoo.optimize import minimize as pymoo_minimize -from pymoo.termination.default import DefaultSingleObjectiveTermination - - -class LeakageIdentifier: +from pymoo.termination.default import DefaultSingleObjectiveTermination + + +_worker_data: dict[str, Any] = {} +DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4)) + + +def _cleanup_temp_files(prefix: str) -> None: + for ext in [".inp", ".rpt", ".bin", ".out"]: + temp_file = prefix + ext + if os.path.exists(temp_file): + try: + os.remove(temp_file) + except OSError: + pass + + +def _worker_init( + inp_path: str, + sensor_nodes: list[str], + area_ids: list[str], + nodes_by_area: dict[str, list[str]], + obs_matrix: np.ndarray, + q_sum: float, + duration_sec: float, + timestep_sec: float, +) -> None: + global _worker_data + wn = wntr.network.WaterNetworkModel(inp_path) + wn.options.hydraulic.demand_model = "DD" + wn.options.time.duration = duration_sec + wn.options.time.hydraulic_timestep = timestep_sec + wn.options.time.pattern_timestep = timestep_sec + wn.options.time.report_timestep = timestep_sec + + demand_objs_by_area = {} + allocatable_counts = {} + for area_id in area_ids: + demand_objs = [] + for node_name in nodes_by_area.get(area_id, []): + if node_name not in wn.node_name_list: + continue + node = wn.get_node(node_name) + if ( + hasattr(node, "demand_timeseries_list") + and len(node.demand_timeseries_list) > 0 + ): + demand_objs.append(node.demand_timeseries_list[0]) + demand_objs_by_area[area_id] = demand_objs + allocatable_counts[area_id] = len(demand_objs) + + _worker_data = { + "wn": wn, + "sensor_nodes": sensor_nodes, + "area_ids": area_ids, + "nodes_by_area": nodes_by_area, + "demand_objs_by_area": demand_objs_by_area, + "allocatable_counts": allocatable_counts, + "obs_matrix": obs_matrix, + "q_sum": q_sum, + } + + +def _worker_evaluate(raw_ratios: np.ndarray) -> float: + d = _worker_data + effective_ratio_map = LeakageIdentifier._effective_area_ratios( + raw_ratios, + d["area_ids"], + d["nodes_by_area"], + allocatable_counts=d["allocatable_counts"], + ) + + modifications = [] + for area_id in d["area_ids"]: + ratio = effective_ratio_map.get(area_id, 0.0) + if ratio <= 0: + continue + + demand_objs = d["demand_objs_by_area"].get(area_id, []) + if not demand_objs: + continue + + per_node_leak = d["q_sum"] * ratio / len(demand_objs) + for demand_obj in demand_objs: + original_val = demand_obj.base_value + demand_obj.base_value = original_val + per_node_leak + modifications.append((demand_obj, original_val)) + + temp_dir = os.path.abspath(os.path.join("temp", "leakage")) + os.makedirs(temp_dir, exist_ok=True) + prefix = os.path.join(temp_dir, f"temp_{os.getpid()}") + + try: + sim = wntr.sim.EpanetSimulator(d["wn"]) + results = sim.run_sim(file_prefix=prefix) + sim_pressure = results.node["pressure"].loc[:, d["sensor_nodes"]] + + n_steps = min(sim_pressure.shape[0], d["obs_matrix"].shape[0]) + sim_vals = sim_pressure.values[:n_steps, :] + obs_vals = d["obs_matrix"][:n_steps, :] + diff = sim_vals - obs_vals + + row_max = np.max(np.abs(diff), axis=1, keepdims=True) + row_max[row_max == 0] = 1.0 + normalized_diff = diff / row_max + return float(np.linalg.norm(normalized_diff)) + + except Exception: + return 1e9 + + finally: + for demand_obj, original_val in modifications: + demand_obj.base_value = original_val + _cleanup_temp_files(prefix) + + +class LeakageIdentifier: FLOW_UNIT_TO_M3S = { "m3/s": 1.0, "m3/h": 1.0 / 3600.0, @@ -165,19 +279,20 @@ class LeakageIdentifier: df = pd.read_csv(path, dtype={"ID": str, "Area": str}) return self._normalize_area_map_df(df) - def run_identification( - self, - observed_pressure_data: Union[ - str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]] - ], + def run_identification( + self, + observed_pressure_data: Union[ + str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]] + ], output_dir: str = "Results", pop_size: int = 50, max_gen: int = 100, output_flow_unit: str = "m3/s", - save_result: bool = True, - ftol: float = 1e-3, - ftol_period: int = 15, - ): + save_result: bool = True, + ftol: float = 1e-3, + ftol_period: int = 15, + n_workers: int = DEFAULT_N_WORKERS, + ): """ 运行遗传算法以识别漏损分布。 @@ -187,10 +302,11 @@ class LeakageIdentifier: pop_size: GA 的种群大小。 max_gen: GA 的最大代数。 output_flow_unit: 输出漏损流量的单位。 - save_result: 是否保存识别结果到本地 CSV。 - ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。 - ftol_period: 收敛检测的窗口代数。 - """ + save_result: 是否保存识别结果到本地 CSV。 + ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。 + ftol_period: 收敛检测的窗口代数。 + n_workers: 并行工作进程数(1=串行,>1=并行评估)。 + """ if save_result: os.makedirs(output_dir, exist_ok=True) @@ -206,14 +322,16 @@ class LeakageIdentifier: observed_name = "observed_pressure.csv" # 准备 pymoo 问题实例 - problem = LeakageProblem( - self.wn, - self.nodes_by_area, - self.area_ids, - self.sensor_nodes, - obs_df, - q_sum=self.q_sum, - ) + problem = LeakageProblem( + self.wn, + self.nodes_by_area, + self.area_ids, + self.sensor_nodes, + obs_df, + q_sum=self.q_sum, + n_workers=n_workers, + inp_path=os.path.abspath(self.inp_path), + ) # 配置 pymoo GA 算法 n_var = self.num_areas @@ -234,16 +352,19 @@ class LeakageIdentifier: # 回调:记录每代信息 callback = _ProgressCallback() - t0 = time.time() - res = pymoo_minimize( - problem, - algorithm, - termination, - seed=42, - verbose=True, - callback=callback, - ) - elapsed = time.time() - t0 + t0 = time.time() + try: + res = pymoo_minimize( + problem, + algorithm, + termination, + seed=42, + verbose=True, + callback=callback, + ) + finally: + problem.close() + elapsed = time.time() - t0 # 提取最优解 best_ind = res.X # 最优个体(漏损比例原始值) @@ -305,7 +426,7 @@ class _ProgressCallback(Callback): self._t_last = now -class LeakageProblem(Problem): +class LeakageProblem(Problem): """pymoo 批量评估问题定义。 搜索空间:n 维 [0, 1] 实数 -> 通过 _effective_area_ratios 归一化到单纯形。 @@ -313,15 +434,17 @@ class LeakageProblem(Problem): 无显式约束(sum=1 由归一化自动保证)。 """ - def __init__( - self, - wn, - nodes_by_area, - area_ids, - sensor_nodes, - observed_data, - q_sum: float = 0.2, - ): + def __init__( + self, + wn, + nodes_by_area, + area_ids, + sensor_nodes, + observed_data, + q_sum: float = 0.2, + n_workers: int = DEFAULT_N_WORKERS, + inp_path: str | None = None, + ): n_var = len(area_ids) super().__init__( @@ -335,8 +458,10 @@ class LeakageProblem(Problem): self.wn = wn self.nodes_by_area = nodes_by_area self.area_ids = area_ids - self.sensor_nodes = sensor_nodes - self.q_sum = q_sum + self.sensor_nodes = sensor_nodes + self.q_sum = q_sum + self.n_workers = max(1, int(n_workers)) + self.inp_path = inp_path # 预处理观测数据以匹配模拟格式 try: @@ -375,11 +500,31 @@ class LeakageProblem(Problem): area_id: len(self.demand_objs_by_area.get(area_id, [])) for area_id in self.area_ids } - if not any(count > 0 for count in self.allocatable_counts.values()): - raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") - - # 评估计数器(诊断用) - self._eval_count = 0 + if not any(count > 0 for count in self.allocatable_counts.values()): + raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") + + # 评估计数器(诊断用) + self._eval_count = 0 + self._pool = None + if self.n_workers > 1: + if not self.inp_path: + raise ValueError("并行评估需要提供 inp_path。") + duration_sec = float(self.wn.options.time.duration) + timestep_sec = float(self.wn.options.time.hydraulic_timestep) + self._pool = Pool( + processes=self.n_workers, + initializer=_worker_init, + initargs=( + self.inp_path, + list(self.sensor_nodes), + list(self.area_ids), + {k: list(v) for k, v in self.nodes_by_area.items()}, + self.obs_matrix.copy(), + self.q_sum, + duration_sec, + timestep_sec, + ), + ) def _evaluate(self, X, out, *args, **kwargs): """批量评估种群。 @@ -389,10 +534,15 @@ class LeakageProblem(Problem): n_pop = X.shape[0] self._eval_count += n_pop - F = np.zeros((n_pop, 1)) - for i in range(n_pop): - F[i, 0] = self._evaluate_single(X[i]) - out["F"] = F + if self._pool is not None: + results = self._pool.map(_worker_evaluate, [X[i] for i in range(n_pop)]) + out["F"] = np.array(results, dtype=float).reshape(-1, 1) + return + + F = np.zeros((n_pop, 1)) + for i in range(n_pop): + F[i, 0] = self._evaluate_single(X[i]) + out["F"] = F def _evaluate_single(self, x): """评估单个个体,返回归一化误差范数。""" @@ -457,14 +607,13 @@ class LeakageProblem(Problem): for demand_obj, original_val in modifications: demand_obj.base_value = original_val - # 操作完成后删除临时文件 - for ext in [".inp", ".rpt", ".bin", ".out"]: - temp_file = prefix + ext - if os.path.exists(temp_file): - try: - os.remove(temp_file) - except OSError: - pass + _cleanup_temp_files(prefix) + + def close(self) -> None: + if self._pool is not None: + self._pool.close() + self._pool.join() + self._pool = None def main() -> int: diff --git a/app/api/v1/endpoints/leakage.py b/app/api/v1/endpoints/leakage.py index 5c6b62e..8277745 100644 --- a/app/api/v1/endpoints/leakage.py +++ b/app/api/v1/endpoints/leakage.py @@ -1,11 +1,11 @@ +import os from typing import Any from datetime import datetime from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel -from app.auth.dependencies import get_current_user -from app.domain.schemas.user import UserInDB +from app.auth.keycloak_dependencies import get_current_keycloak_username from app.services.leakage_identifier import ( get_leakage_identify_scheme_detail, list_leakage_identify_schemes, @@ -13,6 +13,7 @@ from app.services.leakage_identifier import ( ) router = APIRouter() +DEFAULT_N_WORKERS = max(1, min((os.cpu_count() or 1) - 1, 4)) class LeakageIdentifyRequest(BaseModel): @@ -28,6 +29,7 @@ class LeakageIdentifyRequest(BaseModel): output_dir: str = "db_inp" pop_size: int = 50 max_gen: int = 100 + n_workers: int = DEFAULT_N_WORKERS output_flow_unit: str = "m3/s" dma_count: int | None = None scada_start: datetime | None = None @@ -38,12 +40,11 @@ class LeakageIdentifyRequest(BaseModel): @router.post("/identify/") async def identify_leakage( - data: LeakageIdentifyRequest, current_user: UserInDB = Depends(get_current_user) + data: LeakageIdentifyRequest, + username: str = Depends(get_current_keycloak_username), ) -> dict[str, Any]: try: - return run_leakage_identification( - **data.model_dump(), username=current_user.username - ) + return run_leakage_identification(**data.model_dump(), username=username) except Exception as exc: raise HTTPException(status_code=400, detail=str(exc)) diff --git a/app/auth/keycloak_dependencies.py b/app/auth/keycloak_dependencies.py index 403189e..6b34936 100644 --- a/app/auth/keycloak_dependencies.py +++ b/app/auth/keycloak_dependencies.py @@ -61,3 +61,43 @@ async def get_current_keycloak_sub( detail="Invalid subject claim", headers={"WWW-Authenticate": "Bearer"}, ) from exc + + +async def get_current_keycloak_username( + token: str | None = Depends(oauth2_optional), +) -> str: + if not token: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Not authenticated", + headers={"WWW-Authenticate": "Bearer"}, + ) + if settings.KEYCLOAK_PUBLIC_KEY: + key = settings.KEYCLOAK_PUBLIC_KEY.replace("\\n", "\n") + algorithms = [settings.KEYCLOAK_ALGORITHM] + else: + key = settings.SECRET_KEY + algorithms = [settings.ALGORITHM] + + try: + payload = jwt.decode( + token, + key, + algorithms=algorithms, + audience=settings.KEYCLOAK_AUDIENCE or None, + ) + except JWTError as exc: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid token", + headers={"WWW-Authenticate": "Bearer"}, + ) from exc + + username = payload.get("preferred_username") or payload.get("username") + if not username: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Missing username claim", + headers={"WWW-Authenticate": "Bearer"}, + ) + return str(username) diff --git a/app/infra/db/timescaledb/internal_queries.py b/app/infra/db/timescaledb/internal_queries.py index 7c5f3c0..29d735e 100644 --- a/app/infra/db/timescaledb/internal_queries.py +++ b/app/infra/db/timescaledb/internal_queries.py @@ -120,3 +120,53 @@ class InternalQueries: time.sleep(1) else: raise + + @staticmethod + def query_scada_by_ids_timerange( + device_ids: List[str], + start_time: str | datetime, + end_time: str | datetime, + db_name: str = None, + max_retries: int = 3, + ) -> dict[str, list[dict]]: + """查询指定时间窗的 SCADA 数据,返回 {device_id: [{time, value}, ...]}。""" + start_dt = ( + datetime.fromisoformat(start_time) + if isinstance(start_time, str) + else start_time + ) + end_dt = ( + datetime.fromisoformat(end_time) if isinstance(end_time, str) else end_time + ) + + for attempt in range(max_retries): + try: + conn_string = ( + timescaledb_info.get_pgconn_string(db_name=db_name) + if db_name + else timescaledb_info.get_pgconn_string() + ) + with psycopg.Connection.connect(conn_string) as conn: + rows = ScadaRepository.get_scada_by_ids_time_range_sync( + conn, device_ids, start_dt, end_dt + ) + result: dict[str, list[dict]] = { + device_id: [] for device_id in device_ids + } + for row in rows: + device_id = row["device_id"] + value = row.get("cleaned_value") + if value is None: + value = row.get("monitored_value") + result.setdefault(device_id, []).append( + {"time": row["time"].isoformat(), "value": value} + ) + for device_id in result: + result[device_id].sort(key=lambda item: item["time"]) + return result + except Exception as e: + logger.error(f"查询尝试 {attempt + 1} 失败: {e}") + if attempt < max_retries - 1: + time.sleep(1) + else: + raise diff --git a/app/infra/db/timescaledb/schemas/scada.py b/app/infra/db/timescaledb/schemas/scada.py index e879c14..bc8717f 100644 --- a/app/infra/db/timescaledb/schemas/scada.py +++ b/app/infra/db/timescaledb/schemas/scada.py @@ -2,6 +2,7 @@ from typing import List, Any from datetime import datetime from collections import defaultdict from psycopg import AsyncConnection, Connection, sql +from psycopg.rows import dict_row class ScadaRepository: @@ -46,7 +47,7 @@ class ScadaRepository: start_time: datetime, end_time: datetime, ) -> List[dict]: - with conn.cursor() as cur: + with conn.cursor(row_factory=dict_row) as cur: cur.execute( "SELECT * FROM scada.scada_data WHERE device_id = ANY(%s) AND time >= %s AND time <= %s", (device_ids, start_time, end_time), diff --git a/app/services/leakage_identifier.py b/app/services/leakage_identifier.py index 9dfa6da..359515f 100644 --- a/app/services/leakage_identifier.py +++ b/app/services/leakage_identifier.py @@ -9,7 +9,7 @@ import pandas as pd import wntr from app.algorithms.leakage_identifier import LeakageIdentifier -from app.infra.db.influxdb import api as influxdb_api +from app.infra.db.timescaledb.internal_queries import InternalQueries from app.services.scheme_management import ( query_leakage_identify_scheme_detail, query_leakage_identify_schemes, @@ -24,6 +24,8 @@ from app.services.tjnetwork import ( get_network_node_coords, ) +DEFAULT_N_WORKERS = max(1, min((os.cpu_count() or 1) - 1, 4)) + def run_leakage_identification( network: str, @@ -38,6 +40,7 @@ def run_leakage_identification( output_dir: str = "db_inp", pop_size: int = 50, max_gen: int = 100, + n_workers: int = DEFAULT_N_WORKERS, output_flow_unit: str = "m3/s", dma_count: int | None = None, scada_start: datetime | str | None = None, @@ -57,7 +60,7 @@ def run_leakage_identification( if not selected_sensor_nodes: raise ValueError("未提供有效传感器节点,且系统未识别到可用压力传感器。") - area_map, areas, drawing_payload = _build_area_map_by_topology( + area_map, areas, node_coords = _build_area_map_by_topology( network, selected_sensor_nodes, dma_count ) @@ -72,7 +75,9 @@ def run_leakage_identification( observed_source = "backend_timerange" else: if observed_pressure_data is None: - raise ValueError("未提供 observed_pressure_data,且未提供 scada_start/scada_end。") + raise ValueError( + "未提供 observed_pressure_data,且未提供 scada_start/scada_end。" + ) observed_df = observed_pressure_data q_sum_m3s = LeakageIdentifier._flow_to_m3s(q_sum, q_sum_unit) @@ -90,10 +95,13 @@ def run_leakage_identification( output_dir=output_dir, pop_size=pop_size, max_gen=max_gen, + n_workers=n_workers, output_flow_unit=output_flow_unit, save_result=False, ) rows = result_df.to_dict(orient="records") + # node_visual_payload = _build_node_visual_payload(area_map, node_coords, rows) + # drawing_payload = _build_drawing_payload(node_visual_payload) payload = { "result_path": result_df.attrs.get("result_path"), "sensor_nodes": selected_sensor_nodes, @@ -101,21 +109,30 @@ def run_leakage_identification( "area_count": len(set(area_map.values())), "node_area_map": area_map, "areas": areas, - "drawing_payload": drawing_payload, + # "node_visual_payload": node_visual_payload, + # "drawing_payload": drawing_payload, "rows": rows, } if scheme_name: if scheme_name_exists(network, scheme_name): raise ValueError(f"方案名称已存在: {scheme_name}") scheme_start_time = ( - _to_datetime(scada_start).isoformat() if scada_start is not None else datetime.now().isoformat() + _to_datetime(scada_start).isoformat() + if scada_start is not None + else datetime.now().isoformat() ) scheme_detail = { "network": network, "dma_count": dma_count, "sensor_nodes": selected_sensor_nodes, - "scada_start": _to_datetime(scada_start).isoformat() if scada_start is not None else None, - "scada_end": _to_datetime(scada_end).isoformat() if scada_end is not None else None, + "scada_start": ( + _to_datetime(scada_start).isoformat() + if scada_start is not None + else None + ), + "scada_end": ( + _to_datetime(scada_end).isoformat() if scada_end is not None else None + ), "algorithm_params": { "start_time": start_time, "duration": duration, @@ -125,11 +142,13 @@ def run_leakage_identification( "output_flow_unit": output_flow_unit, "pop_size": pop_size, "max_gen": max_gen, + "n_workers": n_workers, }, "result_summary": { "area_count": len(set(area_map.values())), "max_leakage": max( - (float(row.get("LeakageFlow_m3_per_s", 0.0)) for row in rows), default=0.0 + (float(row.get("LeakageFlow_m3_per_s", 0.0)) for row in rows), + default=0.0, ), }, } @@ -149,7 +168,7 @@ def run_leakage_identification( result_rows=rows, node_area_map=area_map, areas=areas, - drawing_payload=drawing_payload, + drawing_payload={}, ) payload["scheme_name"] = scheme_name return payload @@ -164,7 +183,9 @@ def list_leakage_identify_schemes( ) -def get_leakage_identify_scheme_detail(network: str, scheme_name: str) -> dict[str, Any]: +def get_leakage_identify_scheme_detail( + network: str, scheme_name: str +) -> dict[str, Any]: result = query_leakage_identify_scheme_detail(network, scheme_name) if not result: raise ValueError(f"未找到漏损识别方案: {scheme_name}") @@ -189,7 +210,7 @@ def _get_pressure_sensor_nodes(network: str) -> list[str]: def _build_area_map_by_topology( network: str, sensor_nodes: list[str], dma_count: int | None -) -> tuple[dict[str, str], list[dict[str, Any]], dict[str, Any]]: +) -> tuple[dict[str, str], list[dict[str, Any]], dict[str, dict[str, float]]]: node_coords = get_network_node_coords(network) all_nodes = list(node_coords.keys()) if not all_nodes: @@ -199,7 +220,9 @@ def _build_area_map_by_topology( if not available_sensors: raise ValueError("无可用压力传感器,无法生成虚拟分区。") area_count = _resolve_dma_count(dma_count, available_sensors, all_nodes) - sensor_area_map = _cluster_sensors_to_areas(available_sensors, node_coords, area_count) + sensor_area_map = _cluster_sensors_to_areas( + available_sensors, node_coords, area_count + ) adjacency = _build_adjacency(network, all_nodes) distance_by_sensor = { sensor: _bfs_distances(adjacency, sensor) for sensor in available_sensors @@ -222,8 +245,7 @@ def _build_area_map_by_topology( raise ValueError("虚拟分区结果为空,无法生成节点区域映射。") areas = _build_area_meta(area_map, sensor_area_map) - drawing_payload = _build_drawing_payload(areas, node_coords) - return area_map, areas, drawing_payload + return area_map, areas, node_coords def _resolve_dma_count( @@ -247,7 +269,10 @@ def _cluster_sensors_to_areas( return {sensor: str(i + 1) for i, sensor in enumerate(sensor_nodes)} points = np.array( - [[float(node_coords[s]["x"]), float(node_coords[s]["y"])] for s in sensor_nodes], + [ + [float(node_coords[s]["x"]), float(node_coords[s]["y"])] + for s in sensor_nodes + ], dtype=float, ) centers = points[:area_count].copy() @@ -262,7 +287,9 @@ def _cluster_sensors_to_areas( cluster_points = points[labels == i] if cluster_points.size > 0: centers[i] = cluster_points.mean(axis=0) - return {sensor: str(int(labels[idx]) + 1) for idx, sensor in enumerate(sensor_nodes)} + return { + sensor: str(int(labels[idx]) + 1) for idx, sensor in enumerate(sensor_nodes) + } def _build_adjacency(network: str, all_nodes: list[str]) -> dict[str, set[str]]: @@ -350,93 +377,72 @@ def _build_area_meta( return areas -def _build_drawing_payload( - areas: list[dict[str, Any]], node_coords: dict[str, dict[str, float]] +def _build_area_node_map(area_map: dict[str, str]) -> dict[str, list[str]]: + area_node_map: dict[str, list[str]] = {} + for node_id, area_id in area_map.items(): + area_node_map.setdefault(area_id, []).append(node_id) + for area_id in list(area_node_map.keys()): + area_node_map[area_id] = sorted(area_node_map[area_id]) + return area_node_map + + +def _build_node_visual_payload( + area_map: dict[str, str], + node_coords: dict[str, dict[str, float]], + rows: list[dict[str, Any]], ) -> dict[str, Any]: + area_leakage_map = _build_area_leakage_map(rows) + max_leakage = max(area_leakage_map.values(), default=0.0) features: list[dict[str, Any]] = [] - for area in areas: - points = [ - ( - float(node_coords[node_id]["x"]), - float(node_coords[node_id]["y"]), - ) - for node_id in area["node_ids"] - if node_id in node_coords - ] - ring = _points_to_polygon_ring(points) + for node_id, area_id in area_map.items(): + coord = node_coords.get(node_id) + if not coord: + continue + leakage_flow = float(area_leakage_map.get(area_id, 0.0)) + leakage_level = _classify_leakage_level(leakage_flow, max_leakage) features.append( { "type": "Feature", "properties": { - "area_id": area["area_id"], - "node_count": area["node_count"], - "sensor_nodes": area["sensor_nodes"], + "node_id": node_id, + "area_id": area_id, + "leakage_flow_m3_per_s": leakage_flow, + "leakage_level": leakage_level, + }, + "geometry": { + "type": "Point", + "coordinates": [float(coord["x"]), float(coord["y"])], }, - "geometry": {"type": "Polygon", "coordinates": [ring]}, } ) return {"type": "FeatureCollection", "features": features} -def _points_to_polygon_ring(points: list[tuple[float, float]]) -> list[list[float]]: - if not points: - return [] - unique_points = list(dict.fromkeys(points)) - if len(unique_points) == 1: - x, y = unique_points[0] - delta = 1e-6 - return [ - [x - delta, y - delta], - [x + delta, y - delta], - [x + delta, y + delta], - [x - delta, y + delta], - [x - delta, y - delta], - ] - if len(unique_points) == 2: - (x1, y1), (x2, y2) = unique_points - dx, dy = x2 - x1, y2 - y1 - length = math.hypot(dx, dy) - if length == 0: - return _points_to_polygon_ring([unique_points[0]]) - width = max(length * 0.02, 1e-6) - nx, ny = -dy / length * width, dx / length * width - return [ - [x1 + nx, y1 + ny], - [x2 + nx, y2 + ny], - [x2 - nx, y2 - ny], - [x1 - nx, y1 - ny], - [x1 + nx, y1 + ny], - ] - - hull = _convex_hull(unique_points) - ring = [[x, y] for x, y in hull] - ring.append([hull[0][0], hull[0][1]]) - return ring +def _build_area_leakage_map(rows: list[dict[str, Any]]) -> dict[str, float]: + area_leakage_map: dict[str, float] = {} + for row in rows: + area_id = str(row.get("Area", "")).strip() + if not area_id: + continue + area_leakage_map[area_id] = float(row.get("LeakageFlow_m3_per_s", 0.0)) + return area_leakage_map -def _convex_hull(points: list[tuple[float, float]]) -> list[tuple[float, float]]: - pts = sorted(points) - if len(pts) <= 1: - return pts +def _classify_leakage_level(leakage_flow: float, max_leakage: float) -> str: + if max_leakage <= 0: + return "normal" + ratio = leakage_flow / max_leakage + if ratio >= 0.75: + return "high" + if ratio >= 0.4: + return "medium" + if ratio > 0: + return "low" + return "normal" - def cross( - o: tuple[float, float], a: tuple[float, float], b: tuple[float, float] - ) -> float: - return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0]) - lower: list[tuple[float, float]] = [] - for p in pts: - while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0: - lower.pop() - lower.append(p) - - upper: list[tuple[float, float]] = [] - for p in reversed(pts): - while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0: - upper.pop() - upper.append(p) - - return lower[:-1] + upper[:-1] +def _build_drawing_payload(node_visual_payload: dict[str, Any]) -> dict[str, Any]: + return node_visual_payload def _build_observed_pressure_from_scada( @@ -459,15 +465,21 @@ def _build_observed_pressure_from_scada( continue node_id = item.get("associated_element_id") query_id = item.get("api_query_id") - if isinstance(node_id, str) and node_id and isinstance(query_id, str) and query_id: + if ( + isinstance(node_id, str) + and node_id + and isinstance(query_id, str) + and query_id + ): node_query_id[node_id] = query_id query_ids = [node_query_id[node] for node in sensor_nodes if node in node_query_id] if not query_ids: raise ValueError("未找到可用于压力观测的 SCADA api_query_id。") - scada_data = influxdb_api.query_SCADA_data_by_device_ID_and_timerange( - query_ids_list=query_ids, + scada_data = InternalQueries.query_scada_by_ids_timerange( + db_name=network, + device_ids=query_ids, start_time=start_dt.isoformat(), end_time=end_dt.isoformat(), ) @@ -507,19 +519,9 @@ def _prepare_leakage_inp(network: str) -> str: db_inp_dir = os.path.join(project_root, "db_inp") os.makedirs(db_inp_dir, exist_ok=True) inp_path = os.path.join(db_inp_dir, f"{network}.leakage.inp") - if _is_valid_inp_file(inp_path): + if os.path.isfile(inp_path) and os.path.getsize(inp_path) > 0: return inp_path dump_inp(network, inp_path, "2") - if not _is_valid_inp_file(inp_path): + if not os.path.isfile(inp_path) or os.path.getsize(inp_path) <= 0: raise ValueError(f"漏损识别 INP 文件无效: {inp_path}") return inp_path - - -def _is_valid_inp_file(inp_path: str) -> bool: - if not os.path.isfile(inp_path) or os.path.getsize(inp_path) <= 0: - return False - try: - wntr.network.WaterNetworkModel(inp_path) - return True - except Exception: - return False diff --git a/app/services/scheme_management.py b/app/services/scheme_management.py index 7a3eb3b..e35d0a2 100644 --- a/app/services/scheme_management.py +++ b/app/services/scheme_management.py @@ -220,7 +220,7 @@ def store_leakage_identify_result( result_rows: list[dict], node_area_map: dict[str, str], areas: list[dict], - drawing_payload: dict, + drawing_payload: dict | None = None, run_status: str = "completed", error_message: str | None = None, ) -> None: @@ -257,7 +257,7 @@ def store_leakage_identify_result( json.dumps(result_rows), json.dumps(node_area_map), json.dumps(areas), - json.dumps(drawing_payload), + json.dumps(drawing_payload or {}), ), ) conn.commit() diff --git a/tests/api/test_leakage_endpoints.py b/tests/api/test_leakage_endpoints.py index bab360d..db8ec69 100644 --- a/tests/api/test_leakage_endpoints.py +++ b/tests/api/test_leakage_endpoints.py @@ -8,8 +8,8 @@ from app.api.v1.endpoints import leakage as leakage_endpoint def _build_client() -> TestClient: app = FastAPI() app.include_router(leakage_endpoint.router, prefix="/api/v1/leakage") - app.dependency_overrides[leakage_endpoint.get_current_user] = lambda: SimpleNamespace( - username="tester" + app.dependency_overrides[leakage_endpoint.get_current_keycloak_username] = ( + lambda: "tester" ) return TestClient(app)