From bc74e94fbbf41ca49d249641a0f9810019527249 Mon Sep 17 00:00:00 2001 From: Jiang Date: Fri, 6 Mar 2026 16:19:14 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E7=88=86=E7=AE=A1=E5=AE=9A?= =?UTF-8?q?=E4=BD=8D=E7=9B=B8=E5=85=B3=E5=8A=9F=E8=83=BD=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E8=BE=93=E5=85=A5=E9=AA=8C=E8=AF=81=E4=B8=8EAPI?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../burst_location/burst_location.py | 73 ++-- app/api/v1/endpoints/burst_location.py | 41 +- app/services/burst_location.py | 354 ++++++++++++++++-- app/services/scheme_management.py | 126 ++++--- 4 files changed, 496 insertions(+), 98 deletions(-) diff --git a/app/algorithms/burst_location/burst_location.py b/app/algorithms/burst_location/burst_location.py index 0f24c02..e16f07d 100644 --- a/app/algorithms/burst_location/burst_location.py +++ b/app/algorithms/burst_location/burst_location.py @@ -1,9 +1,7 @@ -"""爆管定位部署入口(基于外部 SCADA 实测数据)。""" - import argparse import json from pathlib import Path -from typing import Iterable +from typing import Any, Iterable import pandas as pd @@ -69,23 +67,11 @@ def _align_scada_series( return aligned -def run_burst_location( - wn_inp_path, - pressure_scada_ids, - burst_pressure, - normal_pressure, - burst_leakage, - flow_scada_ids=None, - burst_flow=None, - normal_flow=None, - min_dpressure=2.0, - basic_pressure=10.0, -): - if pressure_scada_ids is None or len(pressure_scada_ids) == 0: - raise ValueError("pressure_scada_ids cannot be empty.") - if burst_pressure is None or normal_pressure is None: - raise ValueError("burst_pressure and normal_pressure are required.") - +def _validate_flow_inputs( + flow_scada_ids: list[str] | None, + burst_flow: pd.Series | None, + normal_flow: pd.Series | None, +) -> tuple[bool, list[str]]: has_any_flow = any( value is not None for value in [flow_scada_ids, burst_flow, normal_flow] ) @@ -97,6 +83,46 @@ def run_burst_location( "flow_scada_ids, burst_flow, and normal_flow must be provided together." ) + if not has_all_flow: + return False, [] + + flow_ids = [str(item) for item in (flow_scada_ids or [])] + if len(flow_ids) == 0: + raise ValueError("flow_scada_ids cannot be empty when flow data is provided.") + return True, flow_ids + + +def _build_top_candidates(similarity_series: pd.Series) -> list[dict[str, Any]]: + top_series = similarity_series.iloc[:10] + return [ + {"pipe_id": str(pipe_id), "similarity": float(score)} + for pipe_id, score in top_series.items() + ] + + +def run_burst_location( + wn_inp_path: str, + pressure_scada_ids: list[str], + burst_pressure: pd.Series, + normal_pressure: pd.Series, + burst_leakage: float, + flow_scada_ids: list[str] | None = None, + burst_flow: pd.Series | None = None, + normal_flow: pd.Series | None = None, + min_dpressure: float = 2.0, + basic_pressure: float = 10.0, +) -> dict[str, Any]: + if pressure_scada_ids is None or len(pressure_scada_ids) == 0: + raise ValueError("pressure_scada_ids cannot be empty.") + if burst_pressure is None or normal_pressure is None: + raise ValueError("burst_pressure and normal_pressure are required.") + + has_all_flow, flow_ids = _validate_flow_inputs( + flow_scada_ids=flow_scada_ids, + burst_flow=burst_flow, + normal_flow=normal_flow, + ) + inp_path = Path(wn_inp_path) wn = load_inp( inp_name=inp_path.name, @@ -131,11 +157,6 @@ def run_burst_location( timestep_list = list(pressure_normal.index) if has_all_flow: - flow_ids = [str(item) for item in flow_scada_ids] - if len(flow_ids) == 0: - raise ValueError( - "flow_scada_ids cannot be empty when flow data is provided." - ) normal_flow_aligned = _align_scada_series(normal_flow, flow_ids, "normal_flow") burst_flow_aligned = _align_scada_series(burst_flow, flow_ids, "burst_flow") flow_normal = normal_flow_aligned.to_frame().T @@ -190,7 +211,7 @@ def run_burst_location( "burst_leakage": float(burst_leakage), "elapsed_seconds": elapsed_seconds, "simulation_times": int(simulation_times), - "top_candidates": list(similarity_series.index[:10]), + "top_candidates": _build_top_candidates(similarity_series), "similarity_mode": similarity_mode, } diff --git a/app/api/v1/endpoints/burst_location.py b/app/api/v1/endpoints/burst_location.py index c3898cf..9173690 100644 --- a/app/api/v1/endpoints/burst_location.py +++ b/app/api/v1/endpoints/burst_location.py @@ -1,33 +1,62 @@ from typing import Any +from datetime import datetime from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from app.auth.keycloak_dependencies import get_current_keycloak_username -from app.services.burst_location import run_burst_location_by_network +from app.services.burst_location import ( + get_burst_location_scheme_detail, + list_burst_location_schemes, + run_burst_location_by_network, +) router = APIRouter() class BurstLocationRequest(BaseModel): network: str - pressure_scada_ids: list[str] - burst_pressure: dict[str, float] | list[dict[str, Any]] - normal_pressure: dict[str, float] | list[dict[str, Any]] + pressure_scada_ids: list[str] | None = None + burst_pressure: dict[str, float] | list[dict[str, Any]] | None = None + normal_pressure: dict[str, float] | list[dict[str, Any]] | None = None burst_leakage: float flow_scada_ids: list[str] | None = None burst_flow: dict[str, float] | list[dict[str, Any]] | None = None normal_flow: dict[str, float] | list[dict[str, Any]] | None = None min_dpressure: float = 2.0 basic_pressure: float = 10.0 + scada_burst_start: datetime | None = None + scada_burst_end: datetime | None = None + scada_normal_start: datetime | None = None + scada_normal_end: datetime | None = None + use_scada_flow: bool = False + scheme_name: str | None = None @router.post("/locate/") async def locate_burst( data: BurstLocationRequest, - _username: str = Depends(get_current_keycloak_username), + username: str = Depends(get_current_keycloak_username), ) -> dict[str, Any]: try: - return run_burst_location_by_network(**data.model_dump()) + return run_burst_location_by_network(**data.model_dump(), username=username) except (TypeError, ValueError) as exc: raise HTTPException(status_code=400, detail=str(exc)) + + +@router.get("/schemes/") +async def query_burst_schemes( + network: str, query_date: datetime | None = None +) -> list[dict[str, Any]]: + try: + return list_burst_location_schemes(network=network, query_date=query_date) + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) + + +@router.get("/schemes/{scheme_name}") +async def query_burst_scheme_detail(network: str, scheme_name: str) -> dict[str, Any]: + try: + return get_burst_location_scheme_detail(network=network, scheme_name=scheme_name) + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) diff --git a/app/services/burst_location.py b/app/services/burst_location.py index 702f27a..77ee833 100644 --- a/app/services/burst_location.py +++ b/app/services/burst_location.py @@ -1,15 +1,23 @@ from __future__ import annotations -from pathlib import Path +import os +from datetime import datetime from typing import Any -from uuid import uuid4 import pandas as pd from app.algorithms.burst_location import run_burst_location -from app.services.tjnetwork import dump_inp +from app.infra.db.timescaledb.internal_queries import InternalQueries +from app.services.scheme_management import ( + query_burst_location_scheme_detail, + query_burst_location_schemes, + scheme_name_exists, + store_scheme_info, +) +from app.services.tjnetwork import dump_inp, get_all_scada_info SeriesInput = pd.Series | dict[str, Any] | list[dict[str, Any]] +FLOW_SCADA_TYPES = {"pipe_flow", "flow", "demand"} def _normalize_series(data: SeriesInput, field_name: str) -> pd.Series: @@ -36,27 +44,120 @@ def _normalize_series(data: SeriesInput, field_name: str) -> pd.Series: def run_burst_location_by_network( *, network: str, - pressure_scada_ids: list[str], - burst_pressure: SeriesInput, - normal_pressure: SeriesInput, burst_leakage: float, + pressure_scada_ids: list[str] | None = None, + burst_pressure: SeriesInput | None = None, + normal_pressure: SeriesInput | None = None, flow_scada_ids: list[str] | None = None, burst_flow: SeriesInput | None = None, normal_flow: SeriesInput | None = None, min_dpressure: float = 2.0, basic_pressure: float = 10.0, + scada_burst_start: datetime | str | None = None, + scada_burst_end: datetime | str | None = None, + scada_normal_start: datetime | str | None = None, + scada_normal_end: datetime | str | None = None, + use_scada_flow: bool = False, + scheme_name: str | None = None, + username: str = "admin", ) -> dict[str, Any]: if not network: raise ValueError("network is required.") - tmp_filename = f"burst_location_{network}_{uuid4().hex}.inp" - inp_path = Path.cwd() / tmp_filename - - try: - dump_inp(network, tmp_filename) + selected_pressure_ids = ( + _dedupe_ids(pressure_scada_ids) + if pressure_scada_ids + else _get_sensor_nodes(network, data_type="pressure") + ) + if not selected_pressure_ids: + raise ValueError("未提供有效压力传感器,且系统未识别到可用压力传感器。") + use_scada_pressure = any( + value is not None + for value in [ + scada_burst_start, + scada_burst_end, + scada_normal_start, + scada_normal_end, + ] + ) + if use_scada_pressure: + ( + burst_start_dt, + burst_end_dt, + normal_start_dt, + normal_end_dt, + ) = _validate_scada_windows( + scada_burst_start=scada_burst_start, + scada_burst_end=scada_burst_end, + scada_normal_start=scada_normal_start, + scada_normal_end=scada_normal_end, + ) + burst_pressure_series, burst_pressure_samples = _build_observed_series_from_scada( + network=network, + sensor_ids=selected_pressure_ids, + start_dt=burst_start_dt, + end_dt=burst_end_dt, + data_type="pressure", + series_name="burst_pressure", + ) + ( + normal_pressure_series, + normal_pressure_samples, + ) = _build_observed_series_from_scada( + network=network, + sensor_ids=selected_pressure_ids, + start_dt=normal_start_dt, + end_dt=normal_end_dt, + data_type="pressure", + series_name="normal_pressure", + ) + observed_source = "backend_timerange" + else: + if burst_pressure is None or normal_pressure is None: + raise ValueError( + "未提供 burst_pressure/normal_pressure,且未提供完整 SCADA 时间窗参数。" + ) burst_pressure_series = _normalize_series(burst_pressure, "burst_pressure") normal_pressure_series = _normalize_series(normal_pressure, "normal_pressure") + burst_pressure_samples = 1 + normal_pressure_samples = 1 + observed_source = "request_payload" + burst_start_dt = burst_end_dt = normal_start_dt = normal_end_dt = None + + selected_flow_ids: list[str] | None = None + burst_flow_series: pd.Series | None = None + normal_flow_series: pd.Series | None = None + use_flow_scada_source = use_scada_pressure and ( + use_scada_flow or flow_scada_ids is not None + ) + if use_flow_scada_source: + selected_flow_ids = ( + _dedupe_ids(flow_scada_ids) + if flow_scada_ids is not None + else _get_sensor_nodes(network, data_type="flow") + ) + if not selected_flow_ids: + raise ValueError("未找到可用流量传感器,无法从 SCADA 查询流量数据。") + burst_flow_series, burst_flow_samples = _build_observed_series_from_scada( + network=network, + sensor_ids=selected_flow_ids, + start_dt=burst_start_dt, + end_dt=burst_end_dt, + data_type="flow", + series_name="burst_flow", + ) + normal_flow_series, normal_flow_samples = _build_observed_series_from_scada( + network=network, + sensor_ids=selected_flow_ids, + start_dt=normal_start_dt, + end_dt=normal_end_dt, + data_type="flow", + series_name="normal_flow", + ) + else: + if flow_scada_ids is not None: + selected_flow_ids = _dedupe_ids(flow_scada_ids) burst_flow_series = ( _normalize_series(burst_flow, "burst_flow") if burst_flow is not None else None ) @@ -65,19 +166,228 @@ def run_burst_location_by_network( if normal_flow is not None else None ) + burst_flow_samples = 1 if burst_flow_series is not None else 0 + normal_flow_samples = 1 if normal_flow_series is not None else 0 - return run_burst_location( - wn_inp_path=str(inp_path), - pressure_scada_ids=pressure_scada_ids, - burst_pressure=burst_pressure_series, - normal_pressure=normal_pressure_series, + inp_path = _prepare_burst_inp(network) + result = run_burst_location( + wn_inp_path=inp_path, + pressure_scada_ids=selected_pressure_ids, + burst_pressure=burst_pressure_series, + normal_pressure=normal_pressure_series, + burst_leakage=burst_leakage, + flow_scada_ids=selected_flow_ids, + burst_flow=burst_flow_series, + normal_flow=normal_flow_series, + min_dpressure=min_dpressure, + basic_pressure=basic_pressure, + ) + + payload: dict[str, Any] = { + **result, + "network": network, + "pressure_scada_ids": selected_pressure_ids, + "flow_scada_ids": selected_flow_ids or [], + "observed_source": observed_source, + "pressure_samples": { + "burst": burst_pressure_samples, + "normal": normal_pressure_samples, + }, + "flow_samples": {"burst": burst_flow_samples, "normal": normal_flow_samples}, + } + if use_scada_pressure: + payload["scada_window"] = { + "burst_start": burst_start_dt.isoformat(), + "burst_end": burst_end_dt.isoformat(), + "normal_start": normal_start_dt.isoformat(), + "normal_end": normal_end_dt.isoformat(), + } + if scheme_name: + _store_burst_scheme( + network=network, + scheme_name=scheme_name, + username=username, + payload=payload, burst_leakage=burst_leakage, - flow_scada_ids=flow_scada_ids, - burst_flow=burst_flow_series, - normal_flow=normal_flow_series, min_dpressure=min_dpressure, basic_pressure=basic_pressure, ) - finally: - if inp_path.exists(): - inp_path.unlink() + return payload + + +def list_burst_location_schemes( + network: str, query_date: datetime | str | None = None +) -> list[dict[str, Any]]: + parsed_date = _to_datetime(query_date).date() if query_date is not None else None + return query_burst_location_schemes(name=network, network=network, query_date=parsed_date) + + +def get_burst_location_scheme_detail(network: str, scheme_name: str) -> dict[str, Any]: + result = query_burst_location_scheme_detail(network, scheme_name) + if not result: + raise ValueError(f"未找到爆管定位方案: {scheme_name}") + return result + + +def _store_burst_scheme( + *, + network: str, + scheme_name: str, + username: str, + payload: dict[str, Any], + burst_leakage: float, + min_dpressure: float, + basic_pressure: float, +) -> None: + if scheme_name_exists(network, scheme_name): + raise ValueError(f"方案名称已存在: {scheme_name}") + + now_iso = datetime.now().isoformat() + scheme_detail = { + "network": network, + "pressure_scada_ids": payload.get("pressure_scada_ids", []), + "flow_scada_ids": payload.get("flow_scada_ids", []), + "observed_source": payload.get("observed_source"), + "algorithm_params": { + "burst_leakage": burst_leakage, + "min_dpressure": min_dpressure, + "basic_pressure": basic_pressure, + }, + "scada_window": payload.get("scada_window"), + "result_summary": { + "located_pipe": payload.get("located_pipe"), + "simulation_times": payload.get("simulation_times"), + "similarity_mode": payload.get("similarity_mode"), + }, + "result_payload": payload, + } + store_scheme_info( + name=network, + scheme_name=scheme_name, + scheme_type="burst_location", + username=username, + scheme_start_time=now_iso, + scheme_detail=scheme_detail, + ) + + +def _validate_scada_windows( + *, + scada_burst_start: datetime | str | None, + scada_burst_end: datetime | str | None, + scada_normal_start: datetime | str | None, + scada_normal_end: datetime | str | None, +) -> tuple[datetime, datetime, datetime, datetime]: + values = [scada_burst_start, scada_burst_end, scada_normal_start, scada_normal_end] + if any(v is None for v in values): + raise ValueError( + "使用后端 SCADA 查询时,必须同时提供 scada_burst_start/scada_burst_end/scada_normal_start/scada_normal_end。" + ) + burst_start_dt = _to_datetime(scada_burst_start) + burst_end_dt = _to_datetime(scada_burst_end) + normal_start_dt = _to_datetime(scada_normal_start) + normal_end_dt = _to_datetime(scada_normal_end) + if burst_start_dt >= burst_end_dt: + raise ValueError("爆管时段 SCADA 时间窗非法:scada_burst_start 必须早于 scada_burst_end。") + if normal_start_dt >= normal_end_dt: + raise ValueError( + "正常时段 SCADA 时间窗非法:scada_normal_start 必须早于 scada_normal_end。" + ) + return burst_start_dt, burst_end_dt, normal_start_dt, normal_end_dt + + +def _build_observed_series_from_scada( + *, + network: str, + sensor_ids: list[str], + start_dt: datetime, + end_dt: datetime, + data_type: str, + series_name: str, +) -> tuple[pd.Series, int]: + scada_mapping = _build_scada_mapping(network=network, data_type=data_type) + missing_ids = [sensor_id for sensor_id in sensor_ids if sensor_id not in scada_mapping] + if missing_ids: + preview = ", ".join(missing_ids[:10]) + raise ValueError(f"{series_name} 缺少可用 SCADA 映射: {preview}") + + query_ids = [scada_mapping[sensor_id] for sensor_id in sensor_ids] + scada_data = InternalQueries.query_scada_by_ids_timerange( + db_name=network, + device_ids=query_ids, + start_time=start_dt.isoformat(), + end_time=end_dt.isoformat(), + ) + values: dict[str, float] = {} + sample_counts: list[int] = [] + for sensor_id, query_id in zip(sensor_ids, query_ids): + records = scada_data.get(query_id, []) + numeric_values = [ + float(item["value"]) + for item in records + if item.get("value") is not None + ] + if not numeric_values: + raise ValueError(f"{series_name} 在时间窗内无有效数据: {sensor_id}") + values[sensor_id] = float(sum(numeric_values) / len(numeric_values)) + sample_counts.append(len(numeric_values)) + + return pd.Series(values, dtype=float), min(sample_counts) + + +def _build_scada_mapping(network: str, data_type: str) -> dict[str, str]: + mapping: dict[str, str] = {} + for item in get_all_scada_info(network): + scada_type = str(item.get("type", "")).lower() + if data_type == "pressure": + if scada_type != "pressure": + continue + elif data_type == "flow": + if scada_type not in FLOW_SCADA_TYPES: + continue + else: + raise ValueError(f"Unsupported data_type: {data_type}") + node_id = item.get("associated_element_id") + query_id = item.get("api_query_id") + if ( + isinstance(node_id, str) + and node_id + and isinstance(query_id, str) + and query_id + ): + mapping[node_id] = query_id + return mapping + + +def _get_sensor_nodes(network: str, data_type: str) -> list[str]: + mapping = _build_scada_mapping(network=network, data_type=data_type) + sensor_ids = sorted(mapping.keys()) + if not sensor_ids: + type_name = "压力" if data_type == "pressure" else "流量" + raise ValueError(f"未找到{type_name}传感器对应节点(scada_info.type)。") + return sensor_ids + + +def _dedupe_ids(ids: list[str] | None) -> list[str]: + if ids is None: + return [] + return list(dict.fromkeys([str(item) for item in ids if item])) + + +def _to_datetime(value: datetime | str) -> datetime: + if isinstance(value, datetime): + return value + return datetime.fromisoformat(value) + + +def _prepare_burst_inp(network: str) -> str: + project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) + db_inp_dir = os.path.join(project_root, "db_inp") + os.makedirs(db_inp_dir, exist_ok=True) + inp_path = os.path.join(db_inp_dir, f"{network}.burst.inp") + if os.path.isfile(inp_path) and os.path.getsize(inp_path) > 0: + return inp_path + dump_inp(network, inp_path, "2") + if not os.path.isfile(inp_path) or os.path.getsize(inp_path) <= 0: + raise ValueError(f"爆管定位 INP 文件无效: {inp_path}") + return inp_path diff --git a/app/services/scheme_management.py b/app/services/scheme_management.py index e35d0a2..c816eb6 100644 --- a/app/services/scheme_management.py +++ b/app/services/scheme_management.py @@ -171,47 +171,6 @@ def query_scheme_list(name: str) -> list: print(f"查询错误:{e}") -def ensure_leakage_identify_result_table(name: str) -> None: - conn_string = get_pgconn_string(db_name=name) - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - cur.execute( - """ - CREATE TABLE IF NOT EXISTS public.leakage_identify_result ( - id BIGSERIAL PRIMARY KEY, - scheme_name VARCHAR(255) NOT NULL, - network VARCHAR(255) NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), - run_status VARCHAR(32) NOT NULL DEFAULT 'completed', - error_message TEXT, - sensor_nodes JSONB NOT NULL DEFAULT '[]'::jsonb, - result_rows JSONB NOT NULL DEFAULT '[]'::jsonb, - node_area_map JSONB NOT NULL DEFAULT '{}'::jsonb, - areas JSONB NOT NULL DEFAULT '[]'::jsonb, - drawing_payload JSONB NOT NULL DEFAULT '{"type":"FeatureCollection","features":[]}'::jsonb, - CONSTRAINT uq_leakage_identify_result_scheme UNIQUE (scheme_name), - CONSTRAINT fk_leakage_identify_result_scheme - FOREIGN KEY (scheme_name) - REFERENCES public.scheme_list (scheme_name) - ON DELETE CASCADE - ); - """ - ) - cur.execute( - "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_network ON public.leakage_identify_result (network);" - ) - cur.execute( - "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_created_at ON public.leakage_identify_result (created_at DESC);" - ) - cur.execute( - "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_run_status ON public.leakage_identify_result (run_status);" - ) - cur.execute( - "CREATE INDEX IF NOT EXISTS idx_leakage_identify_result_rows_gin ON public.leakage_identify_result USING GIN (result_rows);" - ) - conn.commit() - - def store_leakage_identify_result( name: str, scheme_name: str, @@ -224,7 +183,6 @@ def store_leakage_identify_result( run_status: str = "completed", error_message: str | None = None, ) -> None: - ensure_leakage_identify_result_table(name) conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -313,7 +271,6 @@ def query_leakage_identify_schemes( def query_leakage_identify_scheme_detail(name: str, scheme_name: str) -> dict: - ensure_leakage_identify_result_table(name) conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: @@ -358,11 +315,92 @@ def query_leakage_identify_scheme_detail(name: str, scheme_name: str) -> dict: "node_area_map": result_row[6] if isinstance(result_row[6], dict) else {}, "areas": result_row[7] if isinstance(result_row[7], list) else [], "drawing_payload": ( - result_row[8] if isinstance(result_row[8], dict) else {"type": "FeatureCollection", "features": []} + result_row[8] + if isinstance(result_row[8], dict) + else {"type": "FeatureCollection", "features": []} ), } +def query_burst_location_schemes( + name: str, + network: str, + scheme_type: str = "burst_location", + query_date: date | None = None, +) -> list[dict]: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + if query_date is None: + cur.execute( + """ + SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail + FROM public.scheme_list + WHERE scheme_type = %s + ORDER BY create_time DESC + """, + (scheme_type,), + ) + else: + cur.execute( + """ + SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail + FROM public.scheme_list + WHERE scheme_type = %s AND DATE(create_time) = %s + ORDER BY create_time DESC + """, + (scheme_type, query_date), + ) + rows = cur.fetchall() + result = [] + for row in rows: + detail = row[6] if isinstance(row[6], dict) else {} + if network and detail.get("network") not in (None, network): + continue + result.append( + { + "scheme_id": row[0], + "scheme_name": row[1], + "scheme_type": row[2], + "username": row[3], + "create_time": row[4], + "scheme_start_time": row[5], + "scheme_detail": detail, + } + ) + return result + + +def query_burst_location_scheme_detail(name: str, scheme_name: str) -> dict: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + cur.execute( + """ + SELECT scheme_id, scheme_name, scheme_type, username, create_time, scheme_start_time, scheme_detail + FROM public.scheme_list + WHERE scheme_name = %s + LIMIT 1 + """, + (scheme_name,), + ) + base_row = cur.fetchone() + if base_row is None: + return {} + detail = base_row[6] if isinstance(base_row[6], dict) else {} + return { + "scheme_id": base_row[0], + "scheme_name": base_row[1], + "scheme_type": base_row[2], + "username": base_row[3], + "create_time": base_row[4], + "scheme_start_time": base_row[5], + "scheme_detail": detail, + "network": detail.get("network"), + "result_payload": detail.get("result_payload", {}), + } + + # 2025/03/23 def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str): """