为爆管侦测模块新增模拟方案支持及相关参数

This commit is contained in:
2026-03-11 16:15:02 +08:00
parent aa68bc73ca
commit 4ea0b8f05b
3 changed files with 116 additions and 9 deletions
@@ -121,7 +121,7 @@ class BurstDetector:
if total_points < self.points_per_day * 2:
raise ValueError("至少需要 2 天的观测数据才能执行爆管侦测。")
if total_points % self.points_per_day != 0:
raise ValueError("观测数据长度必须能被 points_per_day 整除,以便按天切分。")
raise ValueError("观测数据长度必须能被每日采样点数整除,以便按天切分。")
day_count = total_points // self.points_per_day
high_freq_features = np.zeros((day_count, sensor_count), dtype=float)
+3
View File
@@ -33,6 +33,9 @@ class BurstDetectionRequest(BaseModel):
scada_end: datetime | None = None
sensor_nodes: list[str] | None = None
scheme_name: str | None = None
data_source: str = "monitoring"
simulation_scheme_name: str | None = None
simulation_scheme_type: str | None = None
@router.post("/detect/")
+112 -8
View File
@@ -34,6 +34,9 @@ def run_burst_detection(
scada_end: datetime | str | None = None,
sensor_nodes: list[str] | None = None,
scheme_name: str | None = None,
data_source: str = "monitoring",
simulation_scheme_name: str | None = None,
simulation_scheme_type: str | None = None,
) -> dict[str, Any]:
"""
运行爆管侦测服务入口。
@@ -77,14 +80,28 @@ def run_burst_detection(
if selected_sensor_nodes is not None
else _get_pressure_sensor_nodes(network)
)
observed_df = _build_observed_pressure_from_scada(
network=network,
sensor_nodes=scada_sensor_nodes,
scada_start=scada_start,
scada_end=scada_end,
)
observed_input: pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]] = observed_df
observed_source = "backend_timerange"
if data_source == "simulation":
if not simulation_scheme_name:
raise ValueError("模拟方案模式必须提供 simulation_scheme_name。")
observed_df = _build_observed_pressure_from_simulation(
network=network,
sensor_nodes=scada_sensor_nodes,
scada_start=scada_start,
scada_end=scada_end,
simulation_scheme_name=simulation_scheme_name,
simulation_scheme_type=simulation_scheme_type,
)
observed_input = observed_df
observed_source = "simulation_scheme_timerange"
else:
observed_df = _build_observed_pressure_from_scada(
network=network,
sensor_nodes=scada_sensor_nodes,
scada_start=scada_start,
scada_end=scada_end,
)
observed_input = observed_df
observed_source = "backend_timerange"
else:
if observed_pressure_data is None:
raise ValueError(
@@ -114,6 +131,15 @@ def run_burst_detection(
"rows": rows,
"summary": _build_detection_summary(result_df),
}
if data_source == "simulation":
payload["data_source"] = "simulation"
payload["simulation_scheme"] = {
"name": simulation_scheme_name,
"type": simulation_scheme_type,
}
else:
payload["data_source"] = "monitoring"
if use_scada_source:
payload["scada_window"] = {
"start": _to_datetime(scada_start).isoformat(),
@@ -133,6 +159,84 @@ def run_burst_detection(
return payload
def _build_observed_pressure_from_simulation(
*,
network: str,
sensor_nodes: list[str],
scada_start: datetime | str | None,
scada_end: datetime | str | None,
simulation_scheme_name: str | None,
simulation_scheme_type: str | None = None,
) -> pd.DataFrame:
if scada_start is None or scada_end is None:
raise ValueError("使用模拟方案查询时必须同时提供 scada_start 与 scada_end。")
start_dt = _to_datetime(scada_start)
end_dt = _to_datetime(scada_end)
if start_dt >= end_dt:
raise ValueError("SCADA 时间窗非法:scada_start 必须早于 scada_end。")
# Reuse burst_location logic partially here or call internal queries directly
# For burst detection, we need time series for all sensor nodes.
# Check for missing nodes in simulation result if needed, but InternalQueries handles some of it.
# We assume sensor_nodes are valid pressure nodes.
scheme_type = simulation_scheme_type or "burst_analysis"
simulation_data = InternalQueries.query_scheme_simulation_by_ids_timerange(
db_name=network,
scheme_type=scheme_type,
scheme_name=simulation_scheme_name,
element_ids=sensor_nodes,
start_time=start_dt.isoformat(),
end_time=end_dt.isoformat(),
element_type="node",
field="pressure",
)
# simulation_data is {sensor_id: [{time, value}, ...]}
# Convert to DataFrame: index=time, columns=sensor_ids
data_dict = {}
timestamps = set()
for sensor_id in sensor_nodes:
records = simulation_data.get(sensor_id, [])
if not records:
continue
# Convert records to Series with time index
ts_values = []
ts_index = []
for r in records:
if r.get("value") is not None:
ts_values.append(float(r["value"]))
ts_index.append(pd.to_datetime(r["time"]))
if ts_values:
s = pd.Series(ts_values, index=ts_index)
data_dict[sensor_id] = s
timestamps.update(ts_index)
if not data_dict:
raise ValueError("指定时间窗内未查询到模拟压力数据。")
observation_df = pd.DataFrame(data_dict)
# Handle missing timestamps if any (though simulation usually has uniform steps)
# Forward fill or interpolate might be needed if steps differ, but typically they align.
observation_df = observation_df.sort_index()
# Fill NaN if any missing points for some sensors
observation_df = observation_df.fillna(method="ffill").fillna(method="bfill")
if observation_df.empty:
raise ValueError("模拟压力数据无法构建观测矩阵。")
return observation_df
def list_burst_detection_schemes(
network: str,
query_date: datetime | str | None = None,