From 61f69752969f8abccc8658ab3f0214ae6429a1a0 Mon Sep 17 00:00:00 2001 From: Jiang Date: Wed, 4 Mar 2026 15:21:31 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E5=8C=BA=E5=9F=9F=E6=BC=8F?= =?UTF-8?q?=E6=8D=9F=E8=AF=86=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/algorithms/leakage_identifier.py | 17 +- app/api/v1/endpoints/leakage.py | 35 +- app/services/leakage_identifier.py | 458 +++++++++++++++++++++++++-- app/services/scheme_management.py | 193 +++++++++++ tests/api/test_leakage_endpoints.py | 63 ++++ 5 files changed, 732 insertions(+), 34 deletions(-) create mode 100644 tests/api/test_leakage_endpoints.py diff --git a/app/algorithms/leakage_identifier.py b/app/algorithms/leakage_identifier.py index 8717dc4..c85755e 100644 --- a/app/algorithms/leakage_identifier.py +++ b/app/algorithms/leakage_identifier.py @@ -425,10 +425,14 @@ class LeakageProblem(Problem): demand_obj.base_value = original_val + per_node_leak modifications.append((demand_obj, original_val)) + # 结果保存在根目录的temp/leakage文件夹中 + temp_dir = os.path.abspath(os.path.join("temp", "leakage")) + os.makedirs(temp_dir, exist_ok=True) + prefix = os.path.join(temp_dir, f"temp_{os.getpid()}") + try: sim = wntr.sim.EpanetSimulator(self.wn) - results = sim.run_sim() - + results = sim.run_sim(file_prefix=prefix) sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes] n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0]) @@ -453,6 +457,15 @@ class LeakageProblem(Problem): for demand_obj, original_val in modifications: demand_obj.base_value = original_val + # 操作完成后删除临时文件 + for ext in [".inp", ".rpt", ".bin", ".out"]: + temp_file = prefix + ext + if os.path.exists(temp_file): + try: + os.remove(temp_file) + except OSError: + pass + def main() -> int: parser = argparse.ArgumentParser(description="漏损区域识别") diff --git a/app/api/v1/endpoints/leakage.py b/app/api/v1/endpoints/leakage.py index c83aab2..e8ae06c 100644 --- a/app/api/v1/endpoints/leakage.py +++ b/app/api/v1/endpoints/leakage.py @@ -1,16 +1,21 @@ from typing import Any +from datetime import datetime from fastapi import APIRouter, HTTPException from pydantic import BaseModel -from app.services.leakage_identifier import run_leakage_identification +from app.services.leakage_identifier import ( + get_leakage_identify_scheme_detail, + list_leakage_identify_schemes, + run_leakage_identification, +) router = APIRouter() class LeakageIdentifyRequest(BaseModel): network: str - observed_pressure_data: str | dict[str, list[Any]] | list[dict[str, Any]] + observed_pressure_data: str | dict[str, list[Any]] | list[dict[str, Any]] | None = None start_time: float = 0 duration: float = 24 timestep: float = 5 @@ -20,6 +25,12 @@ class LeakageIdentifyRequest(BaseModel): pop_size: int = 50 max_gen: int = 100 output_flow_unit: str = "m3/s" + dma_count: int | None = None + scada_start: datetime | None = None + scada_end: datetime | None = None + sensor_nodes: list[str] | None = None + scheme_name: str | None = None + username: str = "admin" @router.post("/identify/") @@ -28,3 +39,23 @@ async def identify_leakage(data: LeakageIdentifyRequest) -> dict[str, Any]: return run_leakage_identification(**data.dict()) except Exception as exc: raise HTTPException(status_code=400, detail=str(exc)) + + +@router.get("/schemes/") +async def query_leakage_schemes( + network: str, query_date: datetime | None = None +) -> list[dict[str, Any]]: + try: + return list_leakage_identify_schemes(network=network, query_date=query_date) + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) + + +@router.get("/schemes/{scheme_name}") +async def query_leakage_scheme_detail( + network: str, scheme_name: str +) -> dict[str, Any]: + try: + return get_leakage_identify_scheme_detail(network=network, scheme_name=scheme_name) + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) diff --git a/app/services/leakage_identifier.py b/app/services/leakage_identifier.py index 66d97e8..ca19988 100644 --- a/app/services/leakage_identifier.py +++ b/app/services/leakage_identifier.py @@ -1,21 +1,34 @@ +import math import os +from collections import deque +from datetime import datetime from typing import Any +import numpy as np import pandas as pd from app.algorithms.leakage_identifier import LeakageIdentifier +from app.infra.db.influxdb import api as influxdb_api +from app.services.scheme_management import ( + query_leakage_identify_scheme_detail, + query_leakage_identify_schemes, + scheme_name_exists, + store_leakage_identify_result, + store_scheme_info, +) from app.services.tjnetwork import ( - PARTITION_TYPE_KWAY, - calculate_district_metering_area_for_nodes, dump_inp, get_all_scada_info, + get_network_link_nodes, get_network_node_coords, ) def run_leakage_identification( network: str, - observed_pressure_data: str | pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]], + observed_pressure_data: ( + str | pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]] | None + ) = None, start_time: float = 0, duration: float = 24, timestep: float = 5, @@ -25,17 +38,47 @@ def run_leakage_identification( pop_size: int = 50, max_gen: int = 100, output_flow_unit: str = "m3/s", + dma_count: int | None = None, + scada_start: datetime | str | None = None, + scada_end: datetime | str | None = None, + sensor_nodes: list[str] | None = None, + scheme_name: str | None = None, + username: str = "admin", ) -> dict[str, Any]: os.makedirs(output_dir, exist_ok=True) inp_path = os.path.join(output_dir, f"{network}.leakage.inp") dump_inp(network, inp_path, "2") - sensor_nodes = _get_pressure_sensor_nodes(network) - area_map = _build_area_map_by_spectral_partition(network, sensor_nodes) + + selected_sensor_nodes = ( + list(dict.fromkeys([node for node in (sensor_nodes or []) if node])) + if sensor_nodes + else _get_pressure_sensor_nodes(network) + ) + if not selected_sensor_nodes: + raise ValueError("未提供有效传感器节点,且系统未识别到可用压力传感器。") + + area_map, areas, drawing_payload = _build_area_map_by_topology( + network, selected_sensor_nodes, dma_count + ) + + observed_source = "request_payload" + if scada_start is not None or scada_end is not None: + observed_df = _build_observed_pressure_from_scada( + network=network, + sensor_nodes=selected_sensor_nodes, + scada_start=scada_start, + scada_end=scada_end, + ) + observed_source = "backend_timerange" + else: + if observed_pressure_data is None: + raise ValueError("未提供 observed_pressure_data,且未提供 scada_start/scada_end。") + observed_df = observed_pressure_data q_sum_m3s = LeakageIdentifier._flow_to_m3s(q_sum, q_sum_unit) identifier = LeakageIdentifier( inp_path=inp_path, - sensor_nodes=sensor_nodes, + sensor_nodes=selected_sensor_nodes, area_map=area_map, start_time=start_time, duration=duration, @@ -43,26 +86,97 @@ def run_leakage_identification( q_sum=q_sum_m3s, ) result_df = identifier.run_identification( - observed_pressure_data=observed_pressure_data, + observed_pressure_data=observed_df, output_dir=output_dir, pop_size=pop_size, max_gen=max_gen, output_flow_unit=output_flow_unit, save_result=False, ) - return { + rows = result_df.to_dict(orient="records") + payload = { "result_path": result_df.attrs.get("result_path"), - "sensor_nodes": sensor_nodes, + "sensor_nodes": selected_sensor_nodes, + "observed_source": observed_source, "area_count": len(set(area_map.values())), - "rows": result_df.to_dict(orient="records"), + "node_area_map": area_map, + "areas": areas, + "drawing_payload": drawing_payload, + "rows": rows, } + if scheme_name: + if scheme_name_exists(network, scheme_name): + raise ValueError(f"方案名称已存在: {scheme_name}") + scheme_start_time = ( + _to_datetime(scada_start).isoformat() if scada_start is not None else datetime.now().isoformat() + ) + scheme_detail = { + "network": network, + "dma_count": dma_count, + "sensor_nodes": selected_sensor_nodes, + "scada_start": _to_datetime(scada_start).isoformat() if scada_start is not None else None, + "scada_end": _to_datetime(scada_end).isoformat() if scada_end is not None else None, + "algorithm_params": { + "start_time": start_time, + "duration": duration, + "timestep": timestep, + "q_sum": q_sum, + "q_sum_unit": q_sum_unit, + "output_flow_unit": output_flow_unit, + "pop_size": pop_size, + "max_gen": max_gen, + }, + "result_summary": { + "area_count": len(set(area_map.values())), + "max_leakage": max( + (float(row.get("LeakageFlow_m3_per_s", 0.0)) for row in rows), default=0.0 + ), + }, + } + store_scheme_info( + name=network, + scheme_name=scheme_name, + scheme_type="dma_leak_identification", + username=username, + scheme_start_time=scheme_start_time, + scheme_detail=scheme_detail, + ) + store_leakage_identify_result( + name=network, + scheme_name=scheme_name, + network=network, + sensor_nodes=selected_sensor_nodes, + result_rows=rows, + node_area_map=area_map, + areas=areas, + drawing_payload=drawing_payload, + ) + payload["scheme_name"] = scheme_name + return payload + + +def list_leakage_identify_schemes( + network: str, query_date: datetime | str | None = None +) -> list[dict[str, Any]]: + parsed_date = _to_datetime(query_date).date() if query_date is not None else None + return query_leakage_identify_schemes( + name=network, network=network, query_date=parsed_date + ) + + +def get_leakage_identify_scheme_detail(network: str, scheme_name: str) -> dict[str, Any]: + result = query_leakage_identify_scheme_detail(network, scheme_name) + if not result: + raise ValueError(f"未找到漏损识别方案: {scheme_name}") + return result def _get_pressure_sensor_nodes(network: str) -> list[str]: scada_info = get_all_scada_info(network) sensor_nodes: list[str] = [] for item in scada_info: - if item.get("type") != "pressure": + scada_type = str(item.get("type", "")).lower() + if scada_type != "pressure": continue node_id = item.get("associated_element_id") if isinstance(node_id, str) and node_id: @@ -73,32 +187,316 @@ def _get_pressure_sensor_nodes(network: str) -> list[str]: return sensor_nodes -def _build_area_map_by_spectral_partition( - network: str, sensor_nodes: list[str] -) -> dict[str, str]: +def _build_area_map_by_topology( + network: str, sensor_nodes: list[str], dma_count: int | None +) -> tuple[dict[str, str], list[dict[str, Any]], dict[str, Any]]: node_coords = get_network_node_coords(network) all_nodes = list(node_coords.keys()) if not all_nodes: raise ValueError("管网中未获取到可分区节点。") - part_count = min(len(sensor_nodes), len(all_nodes)) - if part_count <= 0: + available_sensors = [node for node in sensor_nodes if node in node_coords] + if not available_sensors: raise ValueError("无可用压力传感器,无法生成虚拟分区。") + area_count = _resolve_dma_count(dma_count, available_sensors, all_nodes) + sensor_area_map = _cluster_sensors_to_areas(available_sensors, node_coords, area_count) + adjacency = _build_adjacency(network, all_nodes) + distance_by_sensor = { + sensor: _bfs_distances(adjacency, sensor) for sensor in available_sensors + } - groups = calculate_district_metering_area_for_nodes( - network, - all_nodes, - part_count=part_count, - part_type=PARTITION_TYPE_KWAY, - ) - if not groups: - raise ValueError("虚拟分区计算失败,未返回分区结果。") - + assignment_count = {sensor: 0 for sensor in available_sensors} area_map: dict[str, str] = {} - for idx, group_nodes in enumerate(groups, start=1): - area_id = str(idx) - for node_id in group_nodes: - area_map[node_id] = area_id + for node_id in sorted(all_nodes): + sensor = _choose_sensor_for_node( + node_id=node_id, + sensors=available_sensors, + node_coords=node_coords, + distance_by_sensor=distance_by_sensor, + assignment_count=assignment_count, + ) + assignment_count[sensor] += 1 + area_map[node_id] = sensor_area_map[sensor] + if not area_map: raise ValueError("虚拟分区结果为空,无法生成节点区域映射。") - return area_map + + areas = _build_area_meta(area_map, sensor_area_map) + drawing_payload = _build_drawing_payload(areas, node_coords) + return area_map, areas, drawing_payload + + +def _resolve_dma_count( + dma_count: int | None, sensor_nodes: list[str], all_nodes: list[str] +) -> int: + if dma_count is None: + return min(len(sensor_nodes), len(all_nodes)) + if dma_count <= 0: + raise ValueError("dma_count 必须大于 0。") + if dma_count > len(all_nodes): + raise ValueError("dma_count 不能大于可分区节点数量。") + if dma_count > len(sensor_nodes): + raise ValueError("dma_count 不能大于可用传感器数量。") + return dma_count + + +def _cluster_sensors_to_areas( + sensor_nodes: list[str], node_coords: dict[str, dict[str, float]], area_count: int +) -> dict[str, str]: + if area_count >= len(sensor_nodes): + return {sensor: str(i + 1) for i, sensor in enumerate(sensor_nodes)} + + points = np.array( + [[float(node_coords[s]["x"]), float(node_coords[s]["y"])] for s in sensor_nodes], + dtype=float, + ) + centers = points[:area_count].copy() + labels = np.zeros(points.shape[0], dtype=int) + for _ in range(20): + d2 = ((points[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2) + new_labels = d2.argmin(axis=1) + if np.array_equal(labels, new_labels): + break + labels = new_labels + for i in range(area_count): + cluster_points = points[labels == i] + if cluster_points.size > 0: + centers[i] = cluster_points.mean(axis=0) + return {sensor: str(int(labels[idx]) + 1) for idx, sensor in enumerate(sensor_nodes)} + + +def _build_adjacency(network: str, all_nodes: list[str]) -> dict[str, set[str]]: + adjacency: dict[str, set[str]] = {node: set() for node in all_nodes} + for link in get_network_link_nodes(network): + parts = str(link).split(":") + if len(parts) < 4: + continue + node1, node2 = parts[-2], parts[-1] + if node1 in adjacency and node2 in adjacency: + adjacency[node1].add(node2) + adjacency[node2].add(node1) + return adjacency + + +def _bfs_distances(adjacency: dict[str, set[str]], start: str) -> dict[str, int]: + distances: dict[str, int] = {start: 0} + queue: deque[str] = deque([start]) + while queue: + node = queue.popleft() + for neighbor in adjacency.get(node, set()): + if neighbor in distances: + continue + distances[neighbor] = distances[node] + 1 + queue.append(neighbor) + return distances + + +def _choose_sensor_for_node( + node_id: str, + sensors: list[str], + node_coords: dict[str, dict[str, float]], + distance_by_sensor: dict[str, dict[str, int]], + assignment_count: dict[str, int], +) -> str: + min_distance = None + candidates: list[str] = [] + for sensor in sensors: + d = distance_by_sensor.get(sensor, {}).get(node_id) + if d is None: + continue + if min_distance is None or d < min_distance: + min_distance = d + candidates = [sensor] + elif d == min_distance: + candidates.append(sensor) + if not candidates: + node_coord = node_coords[node_id] + return min( + sensors, + key=lambda sensor: _euclidean_distance( + node_coord, node_coords.get(sensor, node_coord) + ), + ) + return min(candidates, key=lambda sensor: (assignment_count[sensor], sensor)) + + +def _euclidean_distance(a: dict[str, float], b: dict[str, float]) -> float: + return math.hypot(float(a["x"]) - float(b["x"]), float(a["y"]) - float(b["y"])) + + +def _build_area_meta( + area_map: dict[str, str], sensor_area_map: dict[str, str] +) -> list[dict[str, Any]]: + nodes_by_area: dict[str, list[str]] = {} + for node_id, area_id in area_map.items(): + nodes_by_area.setdefault(area_id, []).append(node_id) + + sensors_by_area: dict[str, list[str]] = {} + for sensor, area_id in sensor_area_map.items(): + sensors_by_area.setdefault(area_id, []).append(sensor) + + areas: list[dict[str, Any]] = [] + for area_id in sorted(nodes_by_area.keys(), key=lambda x: int(x)): + node_ids = sorted(nodes_by_area.get(area_id, [])) + sensor_nodes = sorted(sensors_by_area.get(area_id, [])) + areas.append( + { + "area_id": area_id, + "sensor_nodes": sensor_nodes, + "node_ids": node_ids, + "node_count": len(node_ids), + } + ) + return areas + + +def _build_drawing_payload( + areas: list[dict[str, Any]], node_coords: dict[str, dict[str, float]] +) -> dict[str, Any]: + features: list[dict[str, Any]] = [] + for area in areas: + points = [ + ( + float(node_coords[node_id]["x"]), + float(node_coords[node_id]["y"]), + ) + for node_id in area["node_ids"] + if node_id in node_coords + ] + ring = _points_to_polygon_ring(points) + features.append( + { + "type": "Feature", + "properties": { + "area_id": area["area_id"], + "node_count": area["node_count"], + "sensor_nodes": area["sensor_nodes"], + }, + "geometry": {"type": "Polygon", "coordinates": [ring]}, + } + ) + return {"type": "FeatureCollection", "features": features} + + +def _points_to_polygon_ring(points: list[tuple[float, float]]) -> list[list[float]]: + if not points: + return [] + unique_points = list(dict.fromkeys(points)) + if len(unique_points) == 1: + x, y = unique_points[0] + delta = 1e-6 + return [ + [x - delta, y - delta], + [x + delta, y - delta], + [x + delta, y + delta], + [x - delta, y + delta], + [x - delta, y - delta], + ] + if len(unique_points) == 2: + (x1, y1), (x2, y2) = unique_points + dx, dy = x2 - x1, y2 - y1 + length = math.hypot(dx, dy) + if length == 0: + return _points_to_polygon_ring([unique_points[0]]) + width = max(length * 0.02, 1e-6) + nx, ny = -dy / length * width, dx / length * width + return [ + [x1 + nx, y1 + ny], + [x2 + nx, y2 + ny], + [x2 - nx, y2 - ny], + [x1 - nx, y1 - ny], + [x1 + nx, y1 + ny], + ] + + hull = _convex_hull(unique_points) + ring = [[x, y] for x, y in hull] + ring.append([hull[0][0], hull[0][1]]) + return ring + + +def _convex_hull(points: list[tuple[float, float]]) -> list[tuple[float, float]]: + pts = sorted(points) + if len(pts) <= 1: + return pts + + def cross( + o: tuple[float, float], a: tuple[float, float], b: tuple[float, float] + ) -> float: + return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0]) + + lower: list[tuple[float, float]] = [] + for p in pts: + while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0: + lower.pop() + lower.append(p) + + upper: list[tuple[float, float]] = [] + for p in reversed(pts): + while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0: + upper.pop() + upper.append(p) + + return lower[:-1] + upper[:-1] + + +def _build_observed_pressure_from_scada( + network: str, + sensor_nodes: list[str], + scada_start: datetime | str | None, + scada_end: datetime | str | None, +) -> pd.DataFrame: + if scada_start is None or scada_end is None: + raise ValueError("使用后端 SCADA 查询时必须同时提供 scada_start 与 scada_end。") + + start_dt = _to_datetime(scada_start) + end_dt = _to_datetime(scada_end) + if start_dt >= end_dt: + raise ValueError("SCADA 时间窗非法:scada_start 必须早于 scada_end。") + + node_query_id: dict[str, str] = {} + for item in get_all_scada_info(network): + if str(item.get("type", "")).lower() != "pressure": + continue + node_id = item.get("associated_element_id") + query_id = item.get("api_query_id") + if isinstance(node_id, str) and node_id and isinstance(query_id, str) and query_id: + node_query_id[node_id] = query_id + + query_ids = [node_query_id[node] for node in sensor_nodes if node in node_query_id] + if not query_ids: + raise ValueError("未找到可用于压力观测的 SCADA api_query_id。") + + scada_data = influxdb_api.query_SCADA_data_by_device_ID_and_timerange( + query_ids_list=query_ids, + start_time=start_dt.isoformat(), + end_time=end_dt.isoformat(), + ) + + available_lengths = [ + len(scada_data.get(query_id, [])) + for query_id in query_ids + if len(scada_data.get(query_id, [])) > 0 + ] + if not available_lengths: + raise ValueError("指定时间窗内未查询到压力 SCADA 数据。") + min_len = min(available_lengths) + + obs_df = pd.DataFrame() + for node_id in sensor_nodes: + query_id = node_query_id.get(node_id) + if not query_id: + continue + records = scada_data.get(query_id, [])[:min_len] + if len(records) < min_len: + continue + obs_df[node_id] = [float(item["value"]) for item in records] + + if obs_df.empty: + raise ValueError("SCADA 压力数据无法构建观测矩阵。") + return obs_df + + +def _to_datetime(value: datetime | str) -> datetime: + if isinstance(value, datetime): + return value + return datetime.fromisoformat(value) diff --git a/app/services/scheme_management.py b/app/services/scheme_management.py index 1fe7cc3..7a3eb3b 100644 --- a/app/services/scheme_management.py +++ b/app/services/scheme_management.py @@ -1,5 +1,6 @@ import ast import json +from datetime import date import geopandas as gpd import pandas as pd @@ -170,6 +171,198 @@ def query_scheme_list(name: str) -> list: print(f"查询错误:{e}") +def ensure_leakage_identify_result_table(name: str) -> None: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS public.leakage_identify_result ( + id BIGSERIAL PRIMARY KEY, + scheme_name VARCHAR(255) NOT NULL, + network VARCHAR(255) NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + run_status VARCHAR(32) NOT NULL DEFAULT 'completed', + error_message TEXT, + sensor_nodes JSONB NOT NULL DEFAULT '[]'::jsonb, + result_rows JSONB NOT NULL DEFAULT '[]'::jsonb, + node_area_map JSONB NOT NULL DEFAULT '{}'::jsonb, + areas JSONB NOT NULL DEFAULT '[]'::jsonb, + drawing_payload JSONB NOT NULL DEFAULT '{"type":"FeatureCollection","features":[]}'::jsonb, + CONSTRAINT uq_leakage_identify_result_scheme UNIQUE (scheme_name), + CONSTRAINT fk_leakage_identify_result_scheme + FOREIGN KEY (scheme_name) + REFERENCES public.scheme_list (scheme_name) + ON DELETE CASCADE + ); + """ + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_network ON public.leakage_identify_result (network);" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_created_at ON public.leakage_identify_result (created_at DESC);" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_run_status ON public.leakage_identify_result (run_status);" + ) + cur.execute( + "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_rows_gin ON public.leakage_identify_result USING GIN (result_rows);" + ) + conn.commit() + + +def store_leakage_identify_result( + name: str, + scheme_name: str, + network: str, + sensor_nodes: list[str], + result_rows: list[dict], + node_area_map: dict[str, str], + areas: list[dict], + drawing_payload: dict, + run_status: str = "completed", + error_message: str | None = None, +) -> None: + ensure_leakage_identify_result_table(name) + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO public.leakage_identify_result + ( + scheme_name, network, run_status, error_message, + sensor_nodes, result_rows, node_area_map, areas, drawing_payload + ) + VALUES (%s, %s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb) + ON CONFLICT (scheme_name) + DO UPDATE SET + network = EXCLUDED.network, + run_status = EXCLUDED.run_status, + error_message = EXCLUDED.error_message, + sensor_nodes = EXCLUDED.sensor_nodes, + result_rows = EXCLUDED.result_rows, + node_area_map = EXCLUDED.node_area_map, + areas = EXCLUDED.areas, + drawing_payload = EXCLUDED.drawing_payload, + created_at = NOW(); + """, + ( + scheme_name, + network, + run_status, + error_message, + json.dumps(sensor_nodes), + json.dumps(result_rows), + json.dumps(node_area_map), + json.dumps(areas), + json.dumps(drawing_payload), + ), + ) + conn.commit() + + +def query_leakage_identify_schemes( + name: str, + network: str, + scheme_type: str = "dma_leak_identification", + query_date: date | None = None, +) -> list[dict]: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + if query_date is None: + cur.execute( + """ + SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail + FROM public.scheme_list + WHERE scheme_type = %s + ORDER BY create_time DESC + """, + (scheme_type,), + ) + else: + cur.execute( + """ + SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail + FROM public.scheme_list + WHERE scheme_type = %s AND DATE(create_time) = %s + ORDER BY create_time DESC + """, + (scheme_type, query_date), + ) + rows = cur.fetchall() + result = [] + for row in rows: + detail = row[6] if isinstance(row[6], dict) else {} + if network and detail.get("network") not in (None, network): + continue + result.append( + { + "scheme_id": row[0], + "scheme_name": row[1], + "scheme_type": row[2], + "username": row[3], + "create_time": row[4], + "scheme_start_time": row[5], + "scheme_detail": detail, + } + ) + return result + + +def query_leakage_identify_scheme_detail(name: str, scheme_name: str) -> dict: + ensure_leakage_identify_result_table(name) + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail + FROM public.scheme_list + WHERE scheme_name = %s + LIMIT 1 + """, + (scheme_name,), + ) + base_row = cur.fetchone() + if base_row is None: + return {} + cur.execute( + """ + SELECT network, created_at, run_status, error_message, sensor_nodes, result_rows, node_area_map, areas, drawing_payload + FROM public.leakage_identify_result + WHERE scheme_name = %s + LIMIT 1 + """, + (scheme_name,), + ) + result_row = cur.fetchone() + if result_row is None: + return {} + return { + "scheme_id": base_row[0], + "scheme_name": base_row[1], + "scheme_type": base_row[2], + "username": base_row[3], + "create_time": base_row[4], + "scheme_start_time": base_row[5], + "scheme_detail": base_row[6] if isinstance(base_row[6], dict) else {}, + "network": result_row[0], + "result_created_at": result_row[1], + "run_status": result_row[2], + "error_message": result_row[3], + "sensor_nodes": result_row[4] if isinstance(result_row[4], list) else [], + "rows": result_row[5] if isinstance(result_row[5], list) else [], + "node_area_map": result_row[6] if isinstance(result_row[6], dict) else {}, + "areas": result_row[7] if isinstance(result_row[7], list) else [], + "drawing_payload": ( + result_row[8] if isinstance(result_row[8], dict) else {"type": "FeatureCollection", "features": []} + ), + } + + # 2025/03/23 def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str): """ diff --git a/tests/api/test_leakage_endpoints.py b/tests/api/test_leakage_endpoints.py new file mode 100644 index 0000000..7a0cc10 --- /dev/null +++ b/tests/api/test_leakage_endpoints.py @@ -0,0 +1,63 @@ +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from app.api.v1.endpoints import leakage as leakage_endpoint + + +def _build_client() -> TestClient: + app = FastAPI() + app.include_router(leakage_endpoint.router, prefix="/api/v1/leakage") + return TestClient(app) + + +def test_identify_leakage_success(monkeypatch): + def fake_run_leakage_identification(**kwargs): + assert kwargs["network"] == "demo" + return {"rows": [], "area_count": 0} + + monkeypatch.setattr( + leakage_endpoint, "run_leakage_identification", fake_run_leakage_identification + ) + client = _build_client() + response = client.post( + "/api/v1/leakage/identify/", + json={ + "network": "demo", + "scada_start": "2026-01-01T00:00:00+08:00", + "scada_end": "2026-01-01T01:00:00+08:00", + "scheme_name": "dma_001", + }, + ) + assert response.status_code == 200 + assert response.json()["area_count"] == 0 + + +def test_query_leakage_schemes_success(monkeypatch): + monkeypatch.setattr( + leakage_endpoint, + "list_leakage_identify_schemes", + lambda network, query_date=None: [ + {"scheme_name": "dma_001", "scheme_type": "dma_leak_identification"} + ], + ) + client = _build_client() + response = client.get("/api/v1/leakage/schemes/", params={"network": "demo"}) + assert response.status_code == 200 + assert response.json()[0]["scheme_name"] == "dma_001" + + +def test_query_leakage_scheme_detail_success(monkeypatch): + monkeypatch.setattr( + leakage_endpoint, + "get_leakage_identify_scheme_detail", + lambda network, scheme_name: { + "scheme_name": scheme_name, + "rows": [{"Area": "1", "LeakageFlow_m3_per_s": 0.1}], + }, + ) + client = _build_client() + response = client.get( + "/api/v1/leakage/schemes/dma_001", params={"network": "demo"} + ) + assert response.status_code == 200 + assert response.json()["scheme_name"] == "dma_001"