Files
TJWaterServerBinary/app/api/v1/endpoints/network/general.py
2026-01-22 18:20:18 +08:00

163 lines
5.6 KiB
Python

from fastapi import APIRouter, Request
from typing import Any, List, Dict, Union
from app.services.tjnetwork import *
router = APIRouter()
############################################################
# type
############################################################
@router.get("/isnode/")
async def fastapi_is_node(network: str, node: str) -> bool:
return is_node(network, node)
@router.get("/isjunction/")
async def fastapi_is_junction(network: str, node: str) -> bool:
return is_junction(network, node)
@router.get("/isreservoir/")
async def fastapi_is_reservoir(network: str, node: str) -> bool:
return is_reservoir(network, node)
@router.get("/istank/")
async def fastapi_is_tank(network: str, node: str) -> bool:
return is_tank(network, node)
@router.get("/islink/")
async def fastapi_is_link(network: str, link: str) -> bool:
return is_link(network, link)
@router.get("/ispipe/")
async def fastapi_is_pipe(network: str, link: str) -> bool:
return is_pipe(network, link)
@router.get("/ispump/")
async def fastapi_is_pump(network: str, link: str) -> bool:
return is_pump(network, link)
@router.get("/isvalve/")
async def fastapi_is_valve(network: str, link: str) -> bool:
return is_valve(network, link)
@router.get("/getnodetype/")
async def fastapi_get_node_type(network: str, node: str) -> str:
return get_node_type(network, node)
@router.get("/getlinktype/")
async def fastapi_get_link_type(network: str, link: str) -> str:
return get_link_type(network, link)
@router.get("/getelementtype/")
async def fastapi_get_element_type(network: str, element: str) -> str:
return get_element_type(network, element)
@router.get("/getelementtypevalue/")
async def fastapi_get_element_type_value(network: str, element: str) -> int:
return get_element_type_value(network, element)
@router.get("/getnodes/")
async def fastapi_get_nodes(network: str) -> list[str]:
return get_nodes(network)
@router.get("/getlinks/")
async def fastapi_get_links(network: str) -> list[str]:
return get_links(network)
@router.get("/getnodelinks/")
def get_node_links_endpoint(network: str, node: str) -> list[str]:
return get_node_links(network, node)
############################################################
# Node & Link properties
############################################################
@router.get("/getnodeproperties/")
async def fast_get_node_properties(network: str, node: str) -> dict[str, Any]:
return get_node_properties(network, node)
@router.get("/getlinkproperties/")
async def fast_get_link_properties(network: str, link: str) -> dict[str, Any]:
return get_link_properties(network, link)
@router.get("/getscadaproperties/")
async def fast_get_scada_properties(network: str, scada: str) -> dict[str, Any]:
return get_scada_info(network, scada)
@router.get("/getallscadaproperties/")
async def fast_get_all_scada_properties(network: str) -> list[dict[str, Any]]:
return get_all_scada_info(network)
@router.get("/getelementpropertieswithtype/")
async def fast_get_element_properties_with_type(
network: str, elementtype: str, element: str
) -> dict[str, Any]:
return get_element_properties_with_type(network, elementtype, element)
@router.get("/getelementproperties/")
async def fast_get_element_properties(network: str, element: str) -> dict[str, Any]:
return get_element_properties(network, element)
############################################################
# title 1.[TITLE]
############################################################
@router.get("/gettitleschema/")
async def fast_get_title_schema(network: str) -> dict[str, dict[str, Any]]:
return get_title_schema(network)
@router.get("/gettitle/")
async def fast_get_title(network: str) -> dict[str, Any]:
return get_title(network)
@router.get("/settitle/", response_model=None)
async def fastapi_set_title(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_title(network, ChangeSet(props))
############################################################
# status 10.[STATUS]
############################################################
@router.get("/getstatusschema")
async def fastapi_get_status_schema(network: str) -> dict[str, dict[str, Any]]:
return get_status_schema(network)
@router.get("/getstatus/")
async def fastapi_get_status(network: str, link: str) -> dict[str, Any]:
return get_status(network, link)
@router.post("/setstatus/", response_model=None)
async def fastapi_set_status_properties(
network: str, link: str, req: Request
) -> ChangeSet:
props = await req.json()
ps = {"link": link} | props
return set_status(network, ChangeSet(ps))
############################################################
# General Deletion
############################################################
@router.post("/deletenode/", response_model=None)
async def fastapi_delete_node(network: str, node: str) -> ChangeSet:
ps = {"id": node}
if is_junction(network, node):
return delete_junction(network, ChangeSet(ps))
elif is_reservoir(network, node):
return delete_reservoir(network, ChangeSet(ps))
elif is_tank(network, node):
return delete_tank(network, ChangeSet(ps))
return ChangeSet() # Should probably raise error or return empty
@router.post("/deletelink/", response_model=None)
async def fastapi_delete_link(network: str, link: str) -> ChangeSet:
ps = {"id": link}
if is_pipe(network, link):
return delete_pipe(network, ChangeSet(ps))
elif is_pump(network, link):
return delete_pump(network, ChangeSet(ps))
elif is_valve(network, link):
return delete_valve(network, ChangeSet(ps))
return ChangeSet()