完善区域漏损识别

This commit is contained in:
2026-03-04 15:21:31 +08:00
parent d0abad3c65
commit 61f6975296
5 changed files with 732 additions and 34 deletions
+15 -2
View File
@@ -425,10 +425,14 @@ class LeakageProblem(Problem):
demand_obj.base_value = original_val + per_node_leak demand_obj.base_value = original_val + per_node_leak
modifications.append((demand_obj, original_val)) modifications.append((demand_obj, original_val))
# 结果保存在根目录的temp/leakage文件夹中
temp_dir = os.path.abspath(os.path.join("temp", "leakage"))
os.makedirs(temp_dir, exist_ok=True)
prefix = os.path.join(temp_dir, f"temp_{os.getpid()}")
try: try:
sim = wntr.sim.EpanetSimulator(self.wn) sim = wntr.sim.EpanetSimulator(self.wn)
results = sim.run_sim() results = sim.run_sim(file_prefix=prefix)
sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes] sim_pressure = results.node["pressure"].loc[:, self.sensor_nodes]
n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0]) n_steps = min(sim_pressure.shape[0], self.obs_matrix.shape[0])
@@ -453,6 +457,15 @@ class LeakageProblem(Problem):
for demand_obj, original_val in modifications: for demand_obj, original_val in modifications:
demand_obj.base_value = original_val demand_obj.base_value = original_val
# 操作完成后删除临时文件
for ext in [".inp", ".rpt", ".bin", ".out"]:
temp_file = prefix + ext
if os.path.exists(temp_file):
try:
os.remove(temp_file)
except OSError:
pass
def main() -> int: def main() -> int:
parser = argparse.ArgumentParser(description="漏损区域识别") parser = argparse.ArgumentParser(description="漏损区域识别")
+33 -2
View File
@@ -1,16 +1,21 @@
from typing import Any from typing import Any
from datetime import datetime
from fastapi import APIRouter, HTTPException from fastapi import APIRouter, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
from app.services.leakage_identifier import run_leakage_identification from app.services.leakage_identifier import (
get_leakage_identify_scheme_detail,
list_leakage_identify_schemes,
run_leakage_identification,
)
router = APIRouter() router = APIRouter()
class LeakageIdentifyRequest(BaseModel): class LeakageIdentifyRequest(BaseModel):
network: str network: str
observed_pressure_data: str | dict[str, list[Any]] | list[dict[str, Any]] observed_pressure_data: str | dict[str, list[Any]] | list[dict[str, Any]] | None = None
start_time: float = 0 start_time: float = 0
duration: float = 24 duration: float = 24
timestep: float = 5 timestep: float = 5
@@ -20,6 +25,12 @@ class LeakageIdentifyRequest(BaseModel):
pop_size: int = 50 pop_size: int = 50
max_gen: int = 100 max_gen: int = 100
output_flow_unit: str = "m3/s" output_flow_unit: str = "m3/s"
dma_count: int | None = None
scada_start: datetime | None = None
scada_end: datetime | None = None
sensor_nodes: list[str] | None = None
scheme_name: str | None = None
username: str = "admin"
@router.post("/identify/") @router.post("/identify/")
@@ -28,3 +39,23 @@ async def identify_leakage(data: LeakageIdentifyRequest) -> dict[str, Any]:
return run_leakage_identification(**data.dict()) return run_leakage_identification(**data.dict())
except Exception as exc: except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc)) raise HTTPException(status_code=400, detail=str(exc))
@router.get("/schemes/")
async def query_leakage_schemes(
network: str, query_date: datetime | None = None
) -> list[dict[str, Any]]:
try:
return list_leakage_identify_schemes(network=network, query_date=query_date)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
@router.get("/schemes/{scheme_name}")
async def query_leakage_scheme_detail(
network: str, scheme_name: str
) -> dict[str, Any]:
try:
return get_leakage_identify_scheme_detail(network=network, scheme_name=scheme_name)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
+428 -30
View File
@@ -1,21 +1,34 @@
import math
import os import os
from collections import deque
from datetime import datetime
from typing import Any from typing import Any
import numpy as np
import pandas as pd import pandas as pd
from app.algorithms.leakage_identifier import LeakageIdentifier from app.algorithms.leakage_identifier import LeakageIdentifier
from app.infra.db.influxdb import api as influxdb_api
from app.services.scheme_management import (
query_leakage_identify_scheme_detail,
query_leakage_identify_schemes,
scheme_name_exists,
store_leakage_identify_result,
store_scheme_info,
)
from app.services.tjnetwork import ( from app.services.tjnetwork import (
PARTITION_TYPE_KWAY,
calculate_district_metering_area_for_nodes,
dump_inp, dump_inp,
get_all_scada_info, get_all_scada_info,
get_network_link_nodes,
get_network_node_coords, get_network_node_coords,
) )
def run_leakage_identification( def run_leakage_identification(
network: str, network: str,
observed_pressure_data: str | pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]], observed_pressure_data: (
str | pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]] | None
) = None,
start_time: float = 0, start_time: float = 0,
duration: float = 24, duration: float = 24,
timestep: float = 5, timestep: float = 5,
@@ -25,17 +38,47 @@ def run_leakage_identification(
pop_size: int = 50, pop_size: int = 50,
max_gen: int = 100, max_gen: int = 100,
output_flow_unit: str = "m3/s", output_flow_unit: str = "m3/s",
dma_count: int | None = None,
scada_start: datetime | str | None = None,
scada_end: datetime | str | None = None,
sensor_nodes: list[str] | None = None,
scheme_name: str | None = None,
username: str = "admin",
) -> dict[str, Any]: ) -> dict[str, Any]:
os.makedirs(output_dir, exist_ok=True) os.makedirs(output_dir, exist_ok=True)
inp_path = os.path.join(output_dir, f"{network}.leakage.inp") inp_path = os.path.join(output_dir, f"{network}.leakage.inp")
dump_inp(network, inp_path, "2") dump_inp(network, inp_path, "2")
sensor_nodes = _get_pressure_sensor_nodes(network)
area_map = _build_area_map_by_spectral_partition(network, sensor_nodes) selected_sensor_nodes = (
list(dict.fromkeys([node for node in (sensor_nodes or []) if node]))
if sensor_nodes
else _get_pressure_sensor_nodes(network)
)
if not selected_sensor_nodes:
raise ValueError("未提供有效传感器节点,且系统未识别到可用压力传感器。")
area_map, areas, drawing_payload = _build_area_map_by_topology(
network, selected_sensor_nodes, dma_count
)
observed_source = "request_payload"
if scada_start is not None or scada_end is not None:
observed_df = _build_observed_pressure_from_scada(
network=network,
sensor_nodes=selected_sensor_nodes,
scada_start=scada_start,
scada_end=scada_end,
)
observed_source = "backend_timerange"
else:
if observed_pressure_data is None:
raise ValueError("未提供 observed_pressure_data,且未提供 scada_start/scada_end。")
observed_df = observed_pressure_data
q_sum_m3s = LeakageIdentifier._flow_to_m3s(q_sum, q_sum_unit) q_sum_m3s = LeakageIdentifier._flow_to_m3s(q_sum, q_sum_unit)
identifier = LeakageIdentifier( identifier = LeakageIdentifier(
inp_path=inp_path, inp_path=inp_path,
sensor_nodes=sensor_nodes, sensor_nodes=selected_sensor_nodes,
area_map=area_map, area_map=area_map,
start_time=start_time, start_time=start_time,
duration=duration, duration=duration,
@@ -43,26 +86,97 @@ def run_leakage_identification(
q_sum=q_sum_m3s, q_sum=q_sum_m3s,
) )
result_df = identifier.run_identification( result_df = identifier.run_identification(
observed_pressure_data=observed_pressure_data, observed_pressure_data=observed_df,
output_dir=output_dir, output_dir=output_dir,
pop_size=pop_size, pop_size=pop_size,
max_gen=max_gen, max_gen=max_gen,
output_flow_unit=output_flow_unit, output_flow_unit=output_flow_unit,
save_result=False, save_result=False,
) )
return { rows = result_df.to_dict(orient="records")
payload = {
"result_path": result_df.attrs.get("result_path"), "result_path": result_df.attrs.get("result_path"),
"sensor_nodes": sensor_nodes, "sensor_nodes": selected_sensor_nodes,
"observed_source": observed_source,
"area_count": len(set(area_map.values())), "area_count": len(set(area_map.values())),
"rows": result_df.to_dict(orient="records"), "node_area_map": area_map,
"areas": areas,
"drawing_payload": drawing_payload,
"rows": rows,
} }
if scheme_name:
if scheme_name_exists(network, scheme_name):
raise ValueError(f"方案名称已存在: {scheme_name}")
scheme_start_time = (
_to_datetime(scada_start).isoformat() if scada_start is not None else datetime.now().isoformat()
)
scheme_detail = {
"network": network,
"dma_count": dma_count,
"sensor_nodes": selected_sensor_nodes,
"scada_start": _to_datetime(scada_start).isoformat() if scada_start is not None else None,
"scada_end": _to_datetime(scada_end).isoformat() if scada_end is not None else None,
"algorithm_params": {
"start_time": start_time,
"duration": duration,
"timestep": timestep,
"q_sum": q_sum,
"q_sum_unit": q_sum_unit,
"output_flow_unit": output_flow_unit,
"pop_size": pop_size,
"max_gen": max_gen,
},
"result_summary": {
"area_count": len(set(area_map.values())),
"max_leakage": max(
(float(row.get("LeakageFlow_m3_per_s", 0.0)) for row in rows), default=0.0
),
},
}
store_scheme_info(
name=network,
scheme_name=scheme_name,
scheme_type="dma_leak_identification",
username=username,
scheme_start_time=scheme_start_time,
scheme_detail=scheme_detail,
)
store_leakage_identify_result(
name=network,
scheme_name=scheme_name,
network=network,
sensor_nodes=selected_sensor_nodes,
result_rows=rows,
node_area_map=area_map,
areas=areas,
drawing_payload=drawing_payload,
)
payload["scheme_name"] = scheme_name
return payload
def list_leakage_identify_schemes(
network: str, query_date: datetime | str | None = None
) -> list[dict[str, Any]]:
parsed_date = _to_datetime(query_date).date() if query_date is not None else None
return query_leakage_identify_schemes(
name=network, network=network, query_date=parsed_date
)
def get_leakage_identify_scheme_detail(network: str, scheme_name: str) -> dict[str, Any]:
result = query_leakage_identify_scheme_detail(network, scheme_name)
if not result:
raise ValueError(f"未找到漏损识别方案: {scheme_name}")
return result
def _get_pressure_sensor_nodes(network: str) -> list[str]: def _get_pressure_sensor_nodes(network: str) -> list[str]:
scada_info = get_all_scada_info(network) scada_info = get_all_scada_info(network)
sensor_nodes: list[str] = [] sensor_nodes: list[str] = []
for item in scada_info: for item in scada_info:
if item.get("type") != "pressure": scada_type = str(item.get("type", "")).lower()
if scada_type != "pressure":
continue continue
node_id = item.get("associated_element_id") node_id = item.get("associated_element_id")
if isinstance(node_id, str) and node_id: if isinstance(node_id, str) and node_id:
@@ -73,32 +187,316 @@ def _get_pressure_sensor_nodes(network: str) -> list[str]:
return sensor_nodes return sensor_nodes
def _build_area_map_by_spectral_partition( def _build_area_map_by_topology(
network: str, sensor_nodes: list[str] network: str, sensor_nodes: list[str], dma_count: int | None
) -> dict[str, str]: ) -> tuple[dict[str, str], list[dict[str, Any]], dict[str, Any]]:
node_coords = get_network_node_coords(network) node_coords = get_network_node_coords(network)
all_nodes = list(node_coords.keys()) all_nodes = list(node_coords.keys())
if not all_nodes: if not all_nodes:
raise ValueError("管网中未获取到可分区节点。") raise ValueError("管网中未获取到可分区节点。")
part_count = min(len(sensor_nodes), len(all_nodes)) available_sensors = [node for node in sensor_nodes if node in node_coords]
if part_count <= 0: if not available_sensors:
raise ValueError("无可用压力传感器,无法生成虚拟分区。") raise ValueError("无可用压力传感器,无法生成虚拟分区。")
area_count = _resolve_dma_count(dma_count, available_sensors, all_nodes)
sensor_area_map = _cluster_sensors_to_areas(available_sensors, node_coords, area_count)
adjacency = _build_adjacency(network, all_nodes)
distance_by_sensor = {
sensor: _bfs_distances(adjacency, sensor) for sensor in available_sensors
}
groups = calculate_district_metering_area_for_nodes( assignment_count = {sensor: 0 for sensor in available_sensors}
network,
all_nodes,
part_count=part_count,
part_type=PARTITION_TYPE_KWAY,
)
if not groups:
raise ValueError("虚拟分区计算失败,未返回分区结果。")
area_map: dict[str, str] = {} area_map: dict[str, str] = {}
for idx, group_nodes in enumerate(groups, start=1): for node_id in sorted(all_nodes):
area_id = str(idx) sensor = _choose_sensor_for_node(
for node_id in group_nodes: node_id=node_id,
area_map[node_id] = area_id sensors=available_sensors,
node_coords=node_coords,
distance_by_sensor=distance_by_sensor,
assignment_count=assignment_count,
)
assignment_count[sensor] += 1
area_map[node_id] = sensor_area_map[sensor]
if not area_map: if not area_map:
raise ValueError("虚拟分区结果为空,无法生成节点区域映射。") raise ValueError("虚拟分区结果为空,无法生成节点区域映射。")
return area_map
areas = _build_area_meta(area_map, sensor_area_map)
drawing_payload = _build_drawing_payload(areas, node_coords)
return area_map, areas, drawing_payload
def _resolve_dma_count(
dma_count: int | None, sensor_nodes: list[str], all_nodes: list[str]
) -> int:
if dma_count is None:
return min(len(sensor_nodes), len(all_nodes))
if dma_count <= 0:
raise ValueError("dma_count 必须大于 0。")
if dma_count > len(all_nodes):
raise ValueError("dma_count 不能大于可分区节点数量。")
if dma_count > len(sensor_nodes):
raise ValueError("dma_count 不能大于可用传感器数量。")
return dma_count
def _cluster_sensors_to_areas(
sensor_nodes: list[str], node_coords: dict[str, dict[str, float]], area_count: int
) -> dict[str, str]:
if area_count >= len(sensor_nodes):
return {sensor: str(i + 1) for i, sensor in enumerate(sensor_nodes)}
points = np.array(
[[float(node_coords[s]["x"]), float(node_coords[s]["y"])] for s in sensor_nodes],
dtype=float,
)
centers = points[:area_count].copy()
labels = np.zeros(points.shape[0], dtype=int)
for _ in range(20):
d2 = ((points[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2)
new_labels = d2.argmin(axis=1)
if np.array_equal(labels, new_labels):
break
labels = new_labels
for i in range(area_count):
cluster_points = points[labels == i]
if cluster_points.size > 0:
centers[i] = cluster_points.mean(axis=0)
return {sensor: str(int(labels[idx]) + 1) for idx, sensor in enumerate(sensor_nodes)}
def _build_adjacency(network: str, all_nodes: list[str]) -> dict[str, set[str]]:
adjacency: dict[str, set[str]] = {node: set() for node in all_nodes}
for link in get_network_link_nodes(network):
parts = str(link).split(":")
if len(parts) < 4:
continue
node1, node2 = parts[-2], parts[-1]
if node1 in adjacency and node2 in adjacency:
adjacency[node1].add(node2)
adjacency[node2].add(node1)
return adjacency
def _bfs_distances(adjacency: dict[str, set[str]], start: str) -> dict[str, int]:
distances: dict[str, int] = {start: 0}
queue: deque[str] = deque([start])
while queue:
node = queue.popleft()
for neighbor in adjacency.get(node, set()):
if neighbor in distances:
continue
distances[neighbor] = distances[node] + 1
queue.append(neighbor)
return distances
def _choose_sensor_for_node(
node_id: str,
sensors: list[str],
node_coords: dict[str, dict[str, float]],
distance_by_sensor: dict[str, dict[str, int]],
assignment_count: dict[str, int],
) -> str:
min_distance = None
candidates: list[str] = []
for sensor in sensors:
d = distance_by_sensor.get(sensor, {}).get(node_id)
if d is None:
continue
if min_distance is None or d < min_distance:
min_distance = d
candidates = [sensor]
elif d == min_distance:
candidates.append(sensor)
if not candidates:
node_coord = node_coords[node_id]
return min(
sensors,
key=lambda sensor: _euclidean_distance(
node_coord, node_coords.get(sensor, node_coord)
),
)
return min(candidates, key=lambda sensor: (assignment_count[sensor], sensor))
def _euclidean_distance(a: dict[str, float], b: dict[str, float]) -> float:
return math.hypot(float(a["x"]) - float(b["x"]), float(a["y"]) - float(b["y"]))
def _build_area_meta(
area_map: dict[str, str], sensor_area_map: dict[str, str]
) -> list[dict[str, Any]]:
nodes_by_area: dict[str, list[str]] = {}
for node_id, area_id in area_map.items():
nodes_by_area.setdefault(area_id, []).append(node_id)
sensors_by_area: dict[str, list[str]] = {}
for sensor, area_id in sensor_area_map.items():
sensors_by_area.setdefault(area_id, []).append(sensor)
areas: list[dict[str, Any]] = []
for area_id in sorted(nodes_by_area.keys(), key=lambda x: int(x)):
node_ids = sorted(nodes_by_area.get(area_id, []))
sensor_nodes = sorted(sensors_by_area.get(area_id, []))
areas.append(
{
"area_id": area_id,
"sensor_nodes": sensor_nodes,
"node_ids": node_ids,
"node_count": len(node_ids),
}
)
return areas
def _build_drawing_payload(
areas: list[dict[str, Any]], node_coords: dict[str, dict[str, float]]
) -> dict[str, Any]:
features: list[dict[str, Any]] = []
for area in areas:
points = [
(
float(node_coords[node_id]["x"]),
float(node_coords[node_id]["y"]),
)
for node_id in area["node_ids"]
if node_id in node_coords
]
ring = _points_to_polygon_ring(points)
features.append(
{
"type": "Feature",
"properties": {
"area_id": area["area_id"],
"node_count": area["node_count"],
"sensor_nodes": area["sensor_nodes"],
},
"geometry": {"type": "Polygon", "coordinates": [ring]},
}
)
return {"type": "FeatureCollection", "features": features}
def _points_to_polygon_ring(points: list[tuple[float, float]]) -> list[list[float]]:
if not points:
return []
unique_points = list(dict.fromkeys(points))
if len(unique_points) == 1:
x, y = unique_points[0]
delta = 1e-6
return [
[x - delta, y - delta],
[x + delta, y - delta],
[x + delta, y + delta],
[x - delta, y + delta],
[x - delta, y - delta],
]
if len(unique_points) == 2:
(x1, y1), (x2, y2) = unique_points
dx, dy = x2 - x1, y2 - y1
length = math.hypot(dx, dy)
if length == 0:
return _points_to_polygon_ring([unique_points[0]])
width = max(length * 0.02, 1e-6)
nx, ny = -dy / length * width, dx / length * width
return [
[x1 + nx, y1 + ny],
[x2 + nx, y2 + ny],
[x2 - nx, y2 - ny],
[x1 - nx, y1 - ny],
[x1 + nx, y1 + ny],
]
hull = _convex_hull(unique_points)
ring = [[x, y] for x, y in hull]
ring.append([hull[0][0], hull[0][1]])
return ring
def _convex_hull(points: list[tuple[float, float]]) -> list[tuple[float, float]]:
pts = sorted(points)
if len(pts) <= 1:
return pts
def cross(
o: tuple[float, float], a: tuple[float, float], b: tuple[float, float]
) -> float:
return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])
lower: list[tuple[float, float]] = []
for p in pts:
while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0:
lower.pop()
lower.append(p)
upper: list[tuple[float, float]] = []
for p in reversed(pts):
while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0:
upper.pop()
upper.append(p)
return lower[:-1] + upper[:-1]
def _build_observed_pressure_from_scada(
network: str,
sensor_nodes: list[str],
scada_start: datetime | str | None,
scada_end: datetime | str | None,
) -> pd.DataFrame:
if scada_start is None or scada_end is None:
raise ValueError("使用后端 SCADA 查询时必须同时提供 scada_start 与 scada_end。")
start_dt = _to_datetime(scada_start)
end_dt = _to_datetime(scada_end)
if start_dt >= end_dt:
raise ValueError("SCADA 时间窗非法:scada_start 必须早于 scada_end。")
node_query_id: dict[str, str] = {}
for item in get_all_scada_info(network):
if str(item.get("type", "")).lower() != "pressure":
continue
node_id = item.get("associated_element_id")
query_id = item.get("api_query_id")
if isinstance(node_id, str) and node_id and isinstance(query_id, str) and query_id:
node_query_id[node_id] = query_id
query_ids = [node_query_id[node] for node in sensor_nodes if node in node_query_id]
if not query_ids:
raise ValueError("未找到可用于压力观测的 SCADA api_query_id。")
scada_data = influxdb_api.query_SCADA_data_by_device_ID_and_timerange(
query_ids_list=query_ids,
start_time=start_dt.isoformat(),
end_time=end_dt.isoformat(),
)
available_lengths = [
len(scada_data.get(query_id, []))
for query_id in query_ids
if len(scada_data.get(query_id, [])) > 0
]
if not available_lengths:
raise ValueError("指定时间窗内未查询到压力 SCADA 数据。")
min_len = min(available_lengths)
obs_df = pd.DataFrame()
for node_id in sensor_nodes:
query_id = node_query_id.get(node_id)
if not query_id:
continue
records = scada_data.get(query_id, [])[:min_len]
if len(records) < min_len:
continue
obs_df[node_id] = [float(item["value"]) for item in records]
if obs_df.empty:
raise ValueError("SCADA 压力数据无法构建观测矩阵。")
return obs_df
def _to_datetime(value: datetime | str) -> datetime:
if isinstance(value, datetime):
return value
return datetime.fromisoformat(value)
+193
View File
@@ -1,5 +1,6 @@
import ast import ast
import json import json
from datetime import date
import geopandas as gpd import geopandas as gpd
import pandas as pd import pandas as pd
@@ -170,6 +171,198 @@ def query_scheme_list(name: str) -> list:
print(f"查询错误:{e}") print(f"查询错误:{e}")
def ensure_leakage_identify_result_table(name: str) -> None:
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
cur.execute(
"""
CREATE TABLE IF NOT EXISTS public.leakage_identify_result (
id BIGSERIAL PRIMARY KEY,
scheme_name VARCHAR(255) NOT NULL,
network VARCHAR(255) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
run_status VARCHAR(32) NOT NULL DEFAULT 'completed',
error_message TEXT,
sensor_nodes JSONB NOT NULL DEFAULT '[]'::jsonb,
result_rows JSONB NOT NULL DEFAULT '[]'::jsonb,
node_area_map JSONB NOT NULL DEFAULT '{}'::jsonb,
areas JSONB NOT NULL DEFAULT '[]'::jsonb,
drawing_payload JSONB NOT NULL DEFAULT '{"type":"FeatureCollection","features":[]}'::jsonb,
CONSTRAINT uq_leakage_identify_result_scheme UNIQUE (scheme_name),
CONSTRAINT fk_leakage_identify_result_scheme
FOREIGN KEY (scheme_name)
REFERENCES public.scheme_list (scheme_name)
ON DELETE CASCADE
);
"""
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_network ON public.leakage_identify_result (network);"
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_created_at ON public.leakage_identify_result (created_at DESC);"
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_run_status ON public.leakage_identify_result (run_status);"
)
cur.execute(
"CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_rows_gin ON public.leakage_identify_result USING GIN (result_rows);"
)
conn.commit()
def store_leakage_identify_result(
name: str,
scheme_name: str,
network: str,
sensor_nodes: list[str],
result_rows: list[dict],
node_area_map: dict[str, str],
areas: list[dict],
drawing_payload: dict,
run_status: str = "completed",
error_message: str | None = None,
) -> None:
ensure_leakage_identify_result_table(name)
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO public.leakage_identify_result
(
scheme_name, network, run_status, error_message,
sensor_nodes, result_rows, node_area_map, areas, drawing_payload
)
VALUES (%s, %s, %s, %s, %s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb, %s::jsonb)
ON CONFLICT (scheme_name)
DO UPDATE SET
network = EXCLUDED.network,
run_status = EXCLUDED.run_status,
error_message = EXCLUDED.error_message,
sensor_nodes = EXCLUDED.sensor_nodes,
result_rows = EXCLUDED.result_rows,
node_area_map = EXCLUDED.node_area_map,
areas = EXCLUDED.areas,
drawing_payload = EXCLUDED.drawing_payload,
created_at = NOW();
""",
(
scheme_name,
network,
run_status,
error_message,
json.dumps(sensor_nodes),
json.dumps(result_rows),
json.dumps(node_area_map),
json.dumps(areas),
json.dumps(drawing_payload),
),
)
conn.commit()
def query_leakage_identify_schemes(
name: str,
network: str,
scheme_type: str = "dma_leak_identification",
query_date: date | None = None,
) -> list[dict]:
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
if query_date is None:
cur.execute(
"""
SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail
FROM public.scheme_list
WHERE scheme_type = %s
ORDER BY create_time DESC
""",
(scheme_type,),
)
else:
cur.execute(
"""
SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail
FROM public.scheme_list
WHERE scheme_type = %s AND DATE(create_time) = %s
ORDER BY create_time DESC
""",
(scheme_type, query_date),
)
rows = cur.fetchall()
result = []
for row in rows:
detail = row[6] if isinstance(row[6], dict) else {}
if network and detail.get("network") not in (None, network):
continue
result.append(
{
"scheme_id": row[0],
"scheme_name": row[1],
"scheme_type": row[2],
"username": row[3],
"create_time": row[4],
"scheme_start_time": row[5],
"scheme_detail": detail,
}
)
return result
def query_leakage_identify_scheme_detail(name: str, scheme_name: str) -> dict:
ensure_leakage_identify_result_table(name)
conn_string = get_pgconn_string(db_name=name)
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
cur.execute(
"""
SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail
FROM public.scheme_list
WHERE scheme_name = %s
LIMIT 1
""",
(scheme_name,),
)
base_row = cur.fetchone()
if base_row is None:
return {}
cur.execute(
"""
SELECT network, created_at, run_status, error_message, sensor_nodes, result_rows, node_area_map, areas, drawing_payload
FROM public.leakage_identify_result
WHERE scheme_name = %s
LIMIT 1
""",
(scheme_name,),
)
result_row = cur.fetchone()
if result_row is None:
return {}
return {
"scheme_id": base_row[0],
"scheme_name": base_row[1],
"scheme_type": base_row[2],
"username": base_row[3],
"create_time": base_row[4],
"scheme_start_time": base_row[5],
"scheme_detail": base_row[6] if isinstance(base_row[6], dict) else {},
"network": result_row[0],
"result_created_at": result_row[1],
"run_status": result_row[2],
"error_message": result_row[3],
"sensor_nodes": result_row[4] if isinstance(result_row[4], list) else [],
"rows": result_row[5] if isinstance(result_row[5], list) else [],
"node_area_map": result_row[6] if isinstance(result_row[6], dict) else {},
"areas": result_row[7] if isinstance(result_row[7], list) else [],
"drawing_payload": (
result_row[8] if isinstance(result_row[8], dict) else {"type": "FeatureCollection", "features": []}
),
}
# 2025/03/23 # 2025/03/23
def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str): def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str):
""" """
+63
View File
@@ -0,0 +1,63 @@
from fastapi import FastAPI
from fastapi.testclient import TestClient
from app.api.v1.endpoints import leakage as leakage_endpoint
def _build_client() -> TestClient:
app = FastAPI()
app.include_router(leakage_endpoint.router, prefix="/api/v1/leakage")
return TestClient(app)
def test_identify_leakage_success(monkeypatch):
def fake_run_leakage_identification(**kwargs):
assert kwargs["network"] == "demo"
return {"rows": [], "area_count": 0}
monkeypatch.setattr(
leakage_endpoint, "run_leakage_identification", fake_run_leakage_identification
)
client = _build_client()
response = client.post(
"/api/v1/leakage/identify/",
json={
"network": "demo",
"scada_start": "2026-01-01T00:00:00+08:00",
"scada_end": "2026-01-01T01:00:00+08:00",
"scheme_name": "dma_001",
},
)
assert response.status_code == 200
assert response.json()["area_count"] == 0
def test_query_leakage_schemes_success(monkeypatch):
monkeypatch.setattr(
leakage_endpoint,
"list_leakage_identify_schemes",
lambda network, query_date=None: [
{"scheme_name": "dma_001", "scheme_type": "dma_leak_identification"}
],
)
client = _build_client()
response = client.get("/api/v1/leakage/schemes/", params={"network": "demo"})
assert response.status_code == 200
assert response.json()[0]["scheme_name"] == "dma_001"
def test_query_leakage_scheme_detail_success(monkeypatch):
monkeypatch.setattr(
leakage_endpoint,
"get_leakage_identify_scheme_detail",
lambda network, scheme_name: {
"scheme_name": scheme_name,
"rows": [{"Area": "1", "LeakageFlow_m3_per_s": 0.1}],
},
)
client = _build_client()
response = client.get(
"/api/v1/leakage/schemes/dma_001", params={"network": "demo"}
)
assert response.status_code == 200
assert response.json()["scheme_name"] == "dma_001"