from fastapi import APIRouter, Request from typing import Any, List, Dict, Union from app.services.tjnetwork import * router = APIRouter() ############################################################ # type ############################################################ @router.get("/isnode/") async def fastapi_is_node(network: str, node: str) -> bool: return is_node(network, node) @router.get("/isjunction/") async def fastapi_is_junction(network: str, node: str) -> bool: return is_junction(network, node) @router.get("/isreservoir/") async def fastapi_is_reservoir(network: str, node: str) -> bool: return is_reservoir(network, node) @router.get("/istank/") async def fastapi_is_tank(network: str, node: str) -> bool: return is_tank(network, node) @router.get("/islink/") async def fastapi_is_link(network: str, link: str) -> bool: return is_link(network, link) @router.get("/ispipe/") async def fastapi_is_pipe(network: str, link: str) -> bool: return is_pipe(network, link) @router.get("/ispump/") async def fastapi_is_pump(network: str, link: str) -> bool: return is_pump(network, link) @router.get("/isvalve/") async def fastapi_is_valve(network: str, link: str) -> bool: return is_valve(network, link) @router.get("/getnodetype/") async def fastapi_get_node_type(network: str, node: str) -> str: return get_node_type(network, node) @router.get("/getlinktype/") async def fastapi_get_link_type(network: str, link: str) -> str: return get_link_type(network, link) @router.get("/getelementtype/") async def fastapi_get_element_type(network: str, element: str) -> str: return get_element_type(network, element) @router.get("/getelementtypevalue/") async def fastapi_get_element_type_value(network: str, element: str) -> int: return get_element_type_value(network, element) @router.get("/getnodes/") async def fastapi_get_nodes(network: str) -> list[str]: return get_nodes(network) @router.get("/getlinks/") async def fastapi_get_links(network: str) -> list[str]: return get_links(network) @router.get("/getnodelinks/") def get_node_links_endpoint(network: str, node: str) -> list[str]: return get_node_links(network, node) ############################################################ # Node & Link properties ############################################################ @router.get("/getnodeproperties/") async def fast_get_node_properties(network: str, node: str) -> dict[str, Any]: return get_node_properties(network, node) @router.get("/getlinkproperties/") async def fast_get_link_properties(network: str, link: str) -> dict[str, Any]: return get_link_properties(network, link) @router.get("/getscadaproperties/") async def fast_get_scada_properties(network: str, scada: str) -> dict[str, Any]: return get_scada_info(network, scada) @router.get("/getallscadaproperties/") async def fast_get_all_scada_properties(network: str) -> list[dict[str, Any]]: return get_all_scada_info(network) @router.get("/getelementpropertieswithtype/") async def fast_get_element_properties_with_type( network: str, elementtype: str, element: str ) -> dict[str, Any]: return get_element_properties_with_type(network, elementtype, element) @router.get("/getelementproperties/") async def fast_get_element_properties(network: str, element: str) -> dict[str, Any]: return get_element_properties(network, element) ############################################################ # title 1.[TITLE] ############################################################ @router.get("/gettitleschema/") async def fast_get_title_schema(network: str) -> dict[str, dict[str, Any]]: return get_title_schema(network) @router.get("/gettitle/") async def fast_get_title(network: str) -> dict[str, Any]: return get_title(network) @router.get("/settitle/", response_model=None) async def fastapi_set_title(network: str, req: Request) -> ChangeSet: props = await req.json() return set_title(network, ChangeSet(props)) ############################################################ # status 10.[STATUS] ############################################################ @router.get("/getstatusschema") async def fastapi_get_status_schema(network: str) -> dict[str, dict[str, Any]]: return get_status_schema(network) @router.get("/getstatus/") async def fastapi_get_status(network: str, link: str) -> dict[str, Any]: return get_status(network, link) @router.post("/setstatus/", response_model=None) async def fastapi_set_status_properties( network: str, link: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"link": link} | props return set_status(network, ChangeSet(ps)) ############################################################ # General Deletion ############################################################ @router.post("/deletenode/", response_model=None) async def fastapi_delete_node(network: str, node: str) -> ChangeSet: ps = {"id": node} if is_junction(network, node): return delete_junction(network, ChangeSet(ps)) elif is_reservoir(network, node): return delete_reservoir(network, ChangeSet(ps)) elif is_tank(network, node): return delete_tank(network, ChangeSet(ps)) return ChangeSet() # Should probably raise error or return empty @router.post("/deletelink/", response_model=None) async def fastapi_delete_link(network: str, link: str) -> ChangeSet: ps = {"id": link} if is_pipe(network, link): return delete_pipe(network, ChangeSet(ps)) elif is_pump(network, link): return delete_pump(network, ChangeSet(ps)) elif is_valve(network, link): return delete_valve(network, ChangeSet(ps)) return ChangeSet()