170 lines
6.0 KiB
Python
170 lines
6.0 KiB
Python
from typing import Any
|
|
from fastapi import APIRouter, Request
|
|
from app.native.api import ChangeSet
|
|
from app.services.tjnetwork import (
|
|
get_scada_info,
|
|
get_all_scada_info,
|
|
get_scada_device_schema,
|
|
get_scada_device,
|
|
set_scada_device,
|
|
add_scada_device,
|
|
delete_scada_device,
|
|
clean_scada_device,
|
|
get_all_scada_device_ids,
|
|
get_all_scada_devices,
|
|
get_scada_device_data_schema,
|
|
get_scada_device_data,
|
|
set_scada_device_data,
|
|
add_scada_device_data,
|
|
delete_scada_device_data,
|
|
clean_scada_device_data,
|
|
get_scada_element_schema,
|
|
get_scada_element,
|
|
set_scada_element,
|
|
add_scada_element,
|
|
delete_scada_element,
|
|
clean_scada_element,
|
|
get_all_scada_elements,
|
|
get_scada_element_schema,
|
|
get_scada_info_schema,
|
|
)
|
|
|
|
router = APIRouter()
|
|
|
|
@router.get("/getscadaproperties/")
|
|
async def fast_get_scada_properties(network: str, scada: str) -> dict[str, Any]:
|
|
return get_scada_info(network, scada)
|
|
|
|
@router.get("/getallscadaproperties/")
|
|
async def fast_get_all_scada_properties(network: str) -> list[dict[str, Any]]:
|
|
return get_all_scada_info(network)
|
|
|
|
|
|
############################################################
|
|
# scada_device 29
|
|
############################################################
|
|
|
|
@router.get("/getscadadeviceschema/")
|
|
async def fastapi_get_scada_device_schema(network: str) -> dict[str, dict[str, Any]]:
|
|
return get_scada_device_schema(network)
|
|
|
|
@router.get("/getscadadevice/")
|
|
async def fastapi_get_scada_device(network: str, id: str) -> dict[str, Any]:
|
|
return get_scada_device(network, id)
|
|
|
|
@router.post("/setscadadevice/", response_model=None)
|
|
async def fastapi_set_scada_device(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return set_scada_device(network, ChangeSet(props))
|
|
|
|
@router.post("/addscadadevice/", response_model=None)
|
|
async def fastapi_add_scada_device(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return add_scada_device(network, ChangeSet(props))
|
|
|
|
@router.post("/deletescadadevice/", response_model=None)
|
|
async def fastapi_delete_scada_device(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return delete_scada_device(network, ChangeSet(props))
|
|
|
|
@router.post("/cleanscadadevice/", response_model=None)
|
|
async def fastapi_clean_scada_device(network: str) -> ChangeSet:
|
|
return clean_scada_device(network)
|
|
|
|
@router.get("/getallscadadeviceids/")
|
|
async def fastapi_get_all_scada_device_ids(network: str) -> list[str]:
|
|
return get_all_scada_device_ids(network)
|
|
|
|
@router.get("/getallscadadevices/")
|
|
async def fastapi_get_all_scada_devices(network: str) -> list[dict[str, Any]]:
|
|
return get_all_scada_devices(network)
|
|
|
|
|
|
############################################################
|
|
# scada_device_data 30
|
|
############################################################
|
|
|
|
@router.get("/getscadadevicedataschema/")
|
|
async def fastapi_get_scada_device_data_schema(
|
|
network: str,
|
|
) -> dict[str, dict[str, Any]]:
|
|
return get_scada_device_data_schema(network)
|
|
|
|
@router.get("/getscadadevicedata/")
|
|
async def fastapi_get_scada_device_data(network: str, device_id: str) -> dict[str, Any]:
|
|
return get_scada_device_data(network, device_id)
|
|
|
|
@router.post("/setscadadevicedata/", response_model=None)
|
|
async def fastapi_set_scada_device_data(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return set_scada_device_data(network, ChangeSet(props))
|
|
|
|
@router.post("/addscadadevicedata/", response_model=None)
|
|
async def fastapi_add_scada_device_data(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return add_scada_device_data(network, ChangeSet(props))
|
|
|
|
@router.post("/deletescadadevicedata/", response_model=None)
|
|
async def fastapi_delete_scada_device_data(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return delete_scada_device_data(network, ChangeSet(props))
|
|
|
|
@router.post("/cleanscadadevicedata/", response_model=None)
|
|
async def fastapi_clean_scada_device_data(network: str) -> ChangeSet:
|
|
return clean_scada_device_data(network)
|
|
|
|
|
|
############################################################
|
|
# scada_element 31
|
|
############################################################
|
|
|
|
@router.get("/getscadaelementschema/")
|
|
async def fastapi_get_scada_element_schema(
|
|
network: str,
|
|
) -> dict[str, dict[str, Any]]:
|
|
return get_scada_element_schema(network)
|
|
|
|
@router.get("/getscadaelements/")
|
|
async def fastapi_get_scada_elements(network: str) -> list[dict[str, Any]]:
|
|
return get_all_scada_elements(network)
|
|
|
|
@router.get("/getscadaelement/")
|
|
async def fastapi_get_scada_element(network: str, id: str) -> dict[str, Any]:
|
|
return get_scada_element(network, id)
|
|
|
|
@router.post("/setscadaelement/", response_model=None)
|
|
async def fastapi_set_scada_element(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return set_scada_element(network, ChangeSet(props))
|
|
|
|
@router.post("/addscadaelement/", response_model=None)
|
|
async def fastapi_add_scada_element(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return add_scada_element(network, ChangeSet(props))
|
|
|
|
@router.post("/deletescadaelement/", response_model=None)
|
|
async def fastapi_delete_scada_element(network: str, req: Request) -> ChangeSet:
|
|
props = await req.json()
|
|
return delete_scada_element(network, ChangeSet(props))
|
|
|
|
@router.post("/cleanscadaelement/", response_model=None)
|
|
async def fastapi_clean_scada_element(network: str) -> ChangeSet:
|
|
return clean_scada_element(network)
|
|
|
|
|
|
############################################################
|
|
# scada_info 38
|
|
############################################################
|
|
|
|
@router.get("/getscadainfoschema/")
|
|
async def fastapi_get_scada_info_schema(network: str) -> dict[str, dict[str, Any]]:
|
|
return get_scada_info_schema(network)
|
|
|
|
@router.get("/getscadainfo/")
|
|
async def fastapi_get_scada_info(network: str, id: str) -> dict[str, Any]:
|
|
return get_scada_info(network, id)
|
|
|
|
@router.get("/getallscadainfo/")
|
|
async def fastapi_get_all_scada_info(network: str) -> list[dict[str, Any]]:
|
|
return get_all_scada_info(network)
|