Files
TJWaterServerBinary/app/api/v1/endpoints/network/junctions.py
2026-01-21 18:19:48 +08:00

112 lines
4.0 KiB
Python

from fastapi import APIRouter, Request
from typing import Any, List, Dict, Union
from app.services.tjnetwork import *
router = APIRouter()
@router.get("/getjunctionschema")
async def fast_get_junction_schema(network: str) -> dict[str, dict[str, Any]]:
return get_junction_schema(network)
@router.post("/addjunction/", response_model=None)
async def fastapi_add_junction(
network: str, junction: str, x: float, y: float, z: float
) -> ChangeSet:
ps = {"id": junction, "x": x, "y": y, "elevation": z}
return add_junction(network, ChangeSet(ps))
@router.post("/deletejunction/", response_model=None)
async def fastapi_delete_junction(network: str, junction: str) -> ChangeSet:
ps = {"id": junction}
return delete_junction(network, ChangeSet(ps))
@router.get("/getjunctionelevation/")
async def fastapi_get_junction_elevation(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps["elevation"]
@router.get("/getjunctionx/")
async def fastapi_get_junction_x(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps["x"]
@router.get("/getjunctiony/")
async def fastapi_get_junction_y(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps["y"]
@router.get("/getjunctioncoord/")
async def fastapi_get_junction_coord(network: str, junction: str) -> dict[str, float]:
ps = get_junction(network, junction)
coord = {"x": ps["x"], "y": ps["y"]}
return coord
@router.get("/getjunctiondemand/")
async def fastapi_get_junction_demand(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps["demand"]
@router.get("/getjunctionpattern/")
async def fastapi_get_junction_pattern(network: str, junction: str) -> str:
ps = get_junction(network, junction)
return ps["pattern"]
@router.post("/setjunctionelevation/", response_model=None)
async def fastapi_set_junction_elevation(
network: str, junction: str, elevation: float
) -> ChangeSet:
ps = {"id": junction, "elevation": elevation}
return set_junction(network, ChangeSet(ps))
@router.post("/setjunctionx/", response_model=None)
async def fastapi_set_junction_x(network: str, junction: str, x: float) -> ChangeSet:
ps = {"id": junction, "x": x}
return set_junction(network, ChangeSet(ps))
@router.post("/setjunctiony/", response_model=None)
async def fastapi_set_junction_y(network: str, junction: str, y: float) -> ChangeSet:
ps = {"id": junction, "y": y}
return set_junction(network, ChangeSet(ps))
@router.post("/setjunctioncoord/", response_model=None)
async def fastapi_set_junction_coord(
network: str, junction: str, x: float, y: float
) -> ChangeSet:
ps = {"id": junction, "x": x, "y": y}
return set_junction(network, ChangeSet(ps))
@router.post("/setjunctiondemand/", response_model=None)
async def fastapi_set_junction_demand(
network: str, junction: str, demand: float
) -> ChangeSet:
ps = {"id": junction, "demand": demand}
return set_junction(network, ChangeSet(ps))
@router.post("/setjunctionpattern/", response_model=None)
async def fastapi_set_junction_pattern(
network: str, junction: str, pattern: str
) -> ChangeSet:
ps = {"id": junction, "pattern": pattern}
return set_junction(network, ChangeSet(ps))
@router.get("/getjunctionproperties/")
async def fastapi_get_junction_properties(
network: str, junction: str
) -> dict[str, Any]:
return get_junction(network, junction)
@router.get("/getalljunctionproperties/")
async def fastapi_get_all_junction_properties(network: str) -> list[dict[str, Any]]:
# 缓存查询结果提高性能
# global redis_client # Redis logic removed for clean split, can be re-added if needed or imported
results = get_all_junctions(network)
return results
@router.post("/setjunctionproperties/", response_model=None)
async def fastapi_set_junction_properties(
network: str, junction: str, req: Request
) -> ChangeSet:
props = await req.json()
ps = {"id": junction} | props
return set_junction(network, ChangeSet(ps))