Files
TJWaterServerBinary/app/api/v1/endpoints/misc.py
T

65 lines
1.9 KiB
Python

from typing import Any
from fastapi import APIRouter, Query
from fastapi.responses import JSONResponse
from fastapi import status
from pydantic import BaseModel
from app.services.tjnetwork import (
get_all_sensor_placements,
get_all_burst_locate_results,
)
router = APIRouter()
@router.get("/getjson/", summary="获取JSON示例", description="获取JSON格式响应示例")
async def fastapi_get_json():
"""
获取JSON示例
返回示例JSON格式的响应
"""
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content={
"code": 400,
"message": "this is message",
"data": 123,
},
)
@router.get("/getallsensorplacements/", summary="获取所有传感器位置", description="获取网络中所有传感器的放置位置信息")
async def fastapi_get_all_sensor_placements(network: str = Query(..., description="管网名称(或数据库名称)")) -> list[dict[Any, Any]]:
"""
获取所有传感器位置
返回网络中所有传感器的放置位置及其配置信息
"""
return get_all_sensor_placements(network)
@router.get("/getallburstlocateresults/", summary="获取所有爆管定位结果", description="获取网络中所有爆管定位的分析结果")
async def fastapi_get_all_burst_locate_results(network: str = Query(..., description="管网名称(或数据库名称)")) -> list[dict[Any, Any]]:
"""
获取所有爆管定位结果
返回网络中所有的爆管定位分析结果
"""
return get_all_burst_locate_results(network)
class Item(BaseModel):
"""测试数据模型"""
str_info: str
@router.post("/test_dict/", summary="测试字典处理", description="测试处理字典类型数据")
async def fastapi_test_dict(data: Item) -> dict[str, str]:
"""
测试字典处理
接收Item模型,返回其字典格式
"""
item = data.dict()
return item