from typing import Any import random from fastapi import APIRouter, Query from fastapi.responses import JSONResponse from fastapi import status from pydantic import BaseModel from app.services.tjnetwork import ( get_all_sensor_placements, get_all_burst_locate_results, ) router = APIRouter() @router.get("/getjson/", summary="获取JSON示例", description="获取JSON格式响应示例") async def fastapi_get_json(): """ 获取JSON示例 返回示例JSON格式的响应 """ return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={ "code": 400, "message": "this is message", "data": 123, }, ) @router.get("/getallsensorplacements/", summary="获取所有传感器位置", description="获取网络中所有传感器的放置位置信息") async def fastapi_get_all_sensor_placements(network: str = Query(..., description="管网名称(或数据库名称)")) -> list[dict[Any, Any]]: """ 获取所有传感器位置 返回网络中所有传感器的放置位置及其配置信息 """ return get_all_sensor_placements(network) @router.get("/getallburstlocateresults/", summary="获取所有爆管定位结果", description="获取网络中所有爆管定位的分析结果") async def fastapi_get_all_burst_locate_results(network: str = Query(..., description="管网名称(或数据库名称)")) -> list[dict[Any, Any]]: """ 获取所有爆管定位结果 返回网络中所有的爆管定位分析结果 """ return get_all_burst_locate_results(network) class Item(BaseModel): """测试数据模型""" str_info: str @router.post("/test_dict/", summary="测试字典处理", description="测试处理字典类型数据") async def fastapi_test_dict(data: Item) -> dict[str, str]: """ 测试字典处理 接收Item模型,返回其字典格式 """ item = data.dict() return item @router.get("/getrealtimedata/", summary="获取实时数据", description="获取实时监测数据") async def fastapi_get_realtimedata(): """ 获取实时数据 返回随机生成的实时监测数据示例 """ data = [random.randint(0, 100) for _ in range(100)] return data @router.get("/getsimulationresult/", summary="获取模拟结果", description="获取仿真计算结果") async def fastapi_get_simulationresult(): """ 获取仿真结果 返回随机生成的仿真计算结果示例 """ data = [random.randint(0, 100) for _ in range(100)] return data