Files
TJWaterServerBinary/app/api/v1/endpoints/burst_detection.py
T

69 lines
2.1 KiB
Python

from datetime import datetime
from typing import Any
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from app.auth.keycloak_dependencies import get_current_keycloak_username
from app.services.burst_detection import (
get_burst_detection_scheme_detail,
list_burst_detection_schemes,
run_burst_detection,
)
router = APIRouter()
class BurstDetectionRequest(BaseModel):
network: str
observed_pressure_data: (
dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]] | None
) = Field(
default=None,
description=(
"压力观测数据。支持列式字典 {sensor_id: [values,...]}、"
"逐时刻对象数组 [{sensor_id: value,...}, ...]、"
"或二维数组 [[t1_s1, t1_s2], [t2_s1, t2_s2], ...]。"
),
)
points_per_day: int = 1440
mu: int = 100
iforest_params: dict[str, Any] | None = None
scada_start: datetime | None = None
scada_end: datetime | None = None
sensor_nodes: list[str] | None = None
scheme_name: str | None = None
@router.post("/detect/")
async def detect_burst(
data: BurstDetectionRequest,
username: str = Depends(get_current_keycloak_username),
) -> dict[str, Any]:
try:
return run_burst_detection(**data.model_dump(), username=username)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
@router.get("/schemes/")
async def query_burst_detection_schemes(
network: str,
query_date: datetime | None = None,
) -> list[dict[str, Any]]:
try:
return list_burst_detection_schemes(network=network, query_date=query_date)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))
@router.get("/schemes/{scheme_name}")
async def query_burst_detection_scheme_detail(
network: str,
scheme_name: str,
) -> dict[str, Any]:
try:
return get_burst_detection_scheme_detail(network=network, scheme_name=scheme_name)
except Exception as exc:
raise HTTPException(status_code=400, detail=str(exc))