Get all scada elements
This commit is contained in:
@@ -128,5 +128,5 @@ from .s30_scada_device_data import get_scada_device_data_schema, get_scada_devic
|
||||
from .del_cmd import clean_scada_device_data
|
||||
|
||||
from .s31_scada_element import SCADA_ELEMENT_STATUS_OFFLINE, SCADA_ELEMENT_STATUS_ONLINE
|
||||
from .s31_scada_element import get_scada_element_schema, get_scada_element, set_scada_element, add_scada_element, delete_scada_element
|
||||
from .s31_scada_element import get_scada_element_schema, get_scada_element, set_scada_element, add_scada_element, delete_scada_element, get_scada_elements
|
||||
from .del_cmd import clean_scada_element
|
||||
|
||||
@@ -22,6 +22,14 @@ def _check_model_id(name: str, cs: ChangeSet) -> bool:
|
||||
return is_node(name, model_id) or is_link(name, model_id)
|
||||
|
||||
|
||||
def get_scada_elements(name: str) -> list[str]:
|
||||
result : list[str] = []
|
||||
rows = read_all(name, 'select id from scada_element order by id')
|
||||
for row in rows:
|
||||
result.append(str(row['id']))
|
||||
return result
|
||||
|
||||
|
||||
def get_scada_element_schema(name: str) -> dict[str, dict[str, Any]]:
|
||||
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
|
||||
'x' : {'type': 'float' , 'optional': False , 'readonly': False},
|
||||
|
||||
@@ -4798,6 +4798,8 @@ class TestApi:
|
||||
add_scada_device(p, ChangeSet({'id': 'sd0', 'name': 'device0', 'address': 'x0', 'sd_type': SCADA_DEVICE_TYPE_PRESSURE}))
|
||||
add_scada_device(p, ChangeSet({'id': 'sd1', 'name': 'device1', 'address': 'x1', 'sd_type': SCADA_DEVICE_TYPE_FLOW}))
|
||||
|
||||
assert get_scada_elements(p) == []
|
||||
|
||||
add_scada_element(p, ChangeSet({'id': 'sm0', 'x': 0.0, 'y': 1.0, 'device_id': 'sd0', 'model_id': 'j0', 'status': SCADA_ELEMENT_STATUS_OFFLINE}))
|
||||
sm = get_scada_element(p, 'sm0')
|
||||
assert sm == {}
|
||||
@@ -4822,6 +4824,11 @@ class TestApi:
|
||||
assert sm['model_id'] == 'p1'
|
||||
assert sm['status'] == SCADA_ELEMENT_STATUS_ONLINE
|
||||
|
||||
elements = get_scada_elements(p)
|
||||
assert len(elements) == 2
|
||||
assert elements[0] == 'sm0'
|
||||
assert elements[1] == 'sm1'
|
||||
|
||||
set_scada_element(p, ChangeSet({'id': 'sm0', 'x': 1.0, 'y': 2.0, 'device_id': 'sd1', 'model_id': 'p1', 'status': SCADA_ELEMENT_STATUS_ONLINE}))
|
||||
sm = get_scada_element(p, 'sm0')
|
||||
assert sm['id'] == 'sm0'
|
||||
|
||||
@@ -901,6 +901,9 @@ def clean_scada_device_data(name: str) -> ChangeSet:
|
||||
def get_scada_element_schema(name: str) -> dict[str, dict[str, Any]]:
|
||||
return api.get_scada_element_schema(name)
|
||||
|
||||
def get_scada_elements(name: str) -> list[str]:
|
||||
return api.get_scada_elements(name)
|
||||
|
||||
def get_scada_element(name: str, id: str) -> dict[str, Any]:
|
||||
return api.get_scada_element(name, id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user