diff --git a/app/algorithms/burst_detection/burst_detector.py b/app/algorithms/burst_detection/burst_detector.py index cf5f43a..530ae6b 100644 --- a/app/algorithms/burst_detection/burst_detector.py +++ b/app/algorithms/burst_detection/burst_detector.py @@ -121,7 +121,7 @@ class BurstDetector: if total_points < self.points_per_day * 2: raise ValueError("至少需要 2 天的观测数据才能执行爆管侦测。") if total_points % self.points_per_day != 0: - raise ValueError("观测数据长度必须能被 points_per_day 整除,以便按天切分。") + raise ValueError("观测数据长度必须能被每日采样点数整除,以便按天切分。") day_count = total_points // self.points_per_day high_freq_features = np.zeros((day_count, sensor_count), dtype=float) diff --git a/app/api/v1/endpoints/burst_detection.py b/app/api/v1/endpoints/burst_detection.py index b102e08..149343f 100644 --- a/app/api/v1/endpoints/burst_detection.py +++ b/app/api/v1/endpoints/burst_detection.py @@ -33,6 +33,9 @@ class BurstDetectionRequest(BaseModel): scada_end: datetime | None = None sensor_nodes: list[str] | None = None scheme_name: str | None = None + data_source: str = "monitoring" + simulation_scheme_name: str | None = None + simulation_scheme_type: str | None = None @router.post("/detect/") diff --git a/app/services/burst_detection.py b/app/services/burst_detection.py index bdd297a..59baf32 100644 --- a/app/services/burst_detection.py +++ b/app/services/burst_detection.py @@ -34,6 +34,9 @@ def run_burst_detection( scada_end: datetime | str | None = None, sensor_nodes: list[str] | None = None, scheme_name: str | None = None, + data_source: str = "monitoring", + simulation_scheme_name: str | None = None, + simulation_scheme_type: str | None = None, ) -> dict[str, Any]: """ 运行爆管侦测服务入口。 @@ -77,14 +80,28 @@ def run_burst_detection( if selected_sensor_nodes is not None else _get_pressure_sensor_nodes(network) ) - observed_df = _build_observed_pressure_from_scada( - network=network, - sensor_nodes=scada_sensor_nodes, - scada_start=scada_start, - scada_end=scada_end, - ) - observed_input: pd.DataFrame | dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]] = observed_df - observed_source = "backend_timerange" + if data_source == "simulation": + if not simulation_scheme_name: + raise ValueError("模拟方案模式必须提供 simulation_scheme_name。") + observed_df = _build_observed_pressure_from_simulation( + network=network, + sensor_nodes=scada_sensor_nodes, + scada_start=scada_start, + scada_end=scada_end, + simulation_scheme_name=simulation_scheme_name, + simulation_scheme_type=simulation_scheme_type, + ) + observed_input = observed_df + observed_source = "simulation_scheme_timerange" + else: + observed_df = _build_observed_pressure_from_scada( + network=network, + sensor_nodes=scada_sensor_nodes, + scada_start=scada_start, + scada_end=scada_end, + ) + observed_input = observed_df + observed_source = "backend_timerange" else: if observed_pressure_data is None: raise ValueError( @@ -114,6 +131,15 @@ def run_burst_detection( "rows": rows, "summary": _build_detection_summary(result_df), } + if data_source == "simulation": + payload["data_source"] = "simulation" + payload["simulation_scheme"] = { + "name": simulation_scheme_name, + "type": simulation_scheme_type, + } + else: + payload["data_source"] = "monitoring" + if use_scada_source: payload["scada_window"] = { "start": _to_datetime(scada_start).isoformat(), @@ -133,6 +159,84 @@ def run_burst_detection( return payload +def _build_observed_pressure_from_simulation( + *, + network: str, + sensor_nodes: list[str], + scada_start: datetime | str | None, + scada_end: datetime | str | None, + simulation_scheme_name: str | None, + simulation_scheme_type: str | None = None, +) -> pd.DataFrame: + if scada_start is None or scada_end is None: + raise ValueError("使用模拟方案查询时必须同时提供 scada_start 与 scada_end。") + + start_dt = _to_datetime(scada_start) + end_dt = _to_datetime(scada_end) + if start_dt >= end_dt: + raise ValueError("SCADA 时间窗非法:scada_start 必须早于 scada_end。") + + # Reuse burst_location logic partially here or call internal queries directly + # For burst detection, we need time series for all sensor nodes. + + # Check for missing nodes in simulation result if needed, but InternalQueries handles some of it. + # We assume sensor_nodes are valid pressure nodes. + + scheme_type = simulation_scheme_type or "burst_analysis" + + simulation_data = InternalQueries.query_scheme_simulation_by_ids_timerange( + db_name=network, + scheme_type=scheme_type, + scheme_name=simulation_scheme_name, + element_ids=sensor_nodes, + start_time=start_dt.isoformat(), + end_time=end_dt.isoformat(), + element_type="node", + field="pressure", + ) + + # simulation_data is {sensor_id: [{time, value}, ...]} + # Convert to DataFrame: index=time, columns=sensor_ids + + data_dict = {} + timestamps = set() + + for sensor_id in sensor_nodes: + records = simulation_data.get(sensor_id, []) + if not records: + continue + + # Convert records to Series with time index + ts_values = [] + ts_index = [] + for r in records: + if r.get("value") is not None: + ts_values.append(float(r["value"])) + ts_index.append(pd.to_datetime(r["time"])) + + if ts_values: + s = pd.Series(ts_values, index=ts_index) + data_dict[sensor_id] = s + timestamps.update(ts_index) + + if not data_dict: + raise ValueError("指定时间窗内未查询到模拟压力数据。") + + observation_df = pd.DataFrame(data_dict) + + # Handle missing timestamps if any (though simulation usually has uniform steps) + # Forward fill or interpolate might be needed if steps differ, but typically they align. + observation_df = observation_df.sort_index() + + # Fill NaN if any missing points for some sensors + observation_df = observation_df.fillna(method="ffill").fillna(method="bfill") + + if observation_df.empty: + raise ValueError("模拟压力数据无法构建观测矩阵。") + + return observation_df + + def list_burst_detection_schemes( network: str, query_date: datetime | str | None = None,