Add scada_data test

This commit is contained in:
WQY\qiong
2023-02-10 01:45:23 +08:00
parent 5edd986ce6
commit 62a1303b8f

View File

@@ -3843,5 +3843,127 @@ class TestApi:
self.leave(p)
# 30 scada_data
def test_scada_data(self):
p = 'test_scada_data'
self.enter(p)
add_junction(p, ChangeSet({'id': 'j1', 'x': 0.0, 'y': 10.0, 'elevation': 20.0}))
add_scada_model(p, ChangeSet({'id': 'sm', 'x': 0.0, 'y': 10.0, 'device_id': 'sm_device', 'device_name': 'sm_device_name', 'address': 'xxx', 'sm_type': SCADA_TYPE_PRESSURE, 'model_id': 'j1'}))
sa = get_scada_data(p, 'sm_device')
assert sa['device_id'] == 'sm_device'
assert sa['data'] == []
set_scada_data(p, ChangeSet({'device_id': 'sm_device', 'data': [{ 'time': '2023-02-10 00:02:22', 'value': 100.0 }]}))
sa = get_scada_data(p, 'sm_device')
assert sa['device_id'] == 'sm_device'
assert len(sa['data']) == 1
assert sa['data'][0]['time'] == '2023-02-10 00:02:22'
assert sa['data'][0]['value'] == 100.0
set_scada_data(p, ChangeSet({'device_id': 'sm_device', 'data': [{ 'time': '2023-02-10 00:02:22', 'value': 100.0 }, { 'time': '2023-02-10 00:03:22', 'value': 200.0 }]}))
sa = get_scada_data(p, 'sm_device')
assert sa['device_id'] == 'sm_device'
assert len(sa['data']) == 2
assert sa['data'][0]['time'] == '2023-02-10 00:02:22'
assert sa['data'][0]['value'] == 100.0
assert sa['data'][1]['time'] == '2023-02-10 00:03:22'
assert sa['data'][1]['value'] == 200.0
set_scada_data(p, ChangeSet({'device_id': 'sm_device', 'data': []}))
sa = get_scada_data(p, 'sm_device')
assert sa['device_id'] == 'sm_device'
assert sa['data'] == []
self.leave(p)
def test_scada_data_op(self):
p = 'test_scada_data_op'
self.enter(p)
add_junction(p, ChangeSet({'id': 'j1', 'x': 0.0, 'y': 10.0, 'elevation': 20.0}))
add_scada_model(p, ChangeSet({'id': 'sm', 'x': 0.0, 'y': 10.0, 'device_id': 'sm_device', 'device_name': 'sm_device_name', 'address': 'xxx', 'sm_type': SCADA_TYPE_PRESSURE, 'model_id': 'j1'}))
cs = set_scada_data(p, ChangeSet({'device_id': 'sm_device', 'data': [{ 'time': '2023-02-10 00:02:22', 'value': 100.0 }]})).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert len(cs['data']) == 1
assert cs['data'][0]['time'] == '2023-02-10 00:02:22'
assert cs['data'][0]['value'] == 100.0
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert cs['data'] == []
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert len(cs['data']) == 1
assert cs['data'][0]['time'] == '2023-02-10 00:02:22'
assert cs['data'][0]['value'] == 100.0
cs = set_scada_data(p, ChangeSet({'device_id': 'sm_device', 'data': [{ 'time': '2023-02-10 00:02:22', 'value': 100.0 }, { 'time': '2023-02-10 00:03:22', 'value': 200.0 }]})).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert len(cs['data']) == 2
assert cs['data'][0]['time'] == '2023-02-10 00:02:22'
assert cs['data'][0]['value'] == 100.0
assert cs['data'][1]['time'] == '2023-02-10 00:03:22'
assert cs['data'][1]['value'] == 200.0
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert len(cs['data']) == 1
assert cs['data'][0]['time'] == '2023-02-10 00:02:22'
assert cs['data'][0]['value'] == 100.0
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert len(cs['data']) == 2
assert cs['data'][0]['time'] == '2023-02-10 00:02:22'
assert cs['data'][0]['value'] == 100.0
assert cs['data'][1]['time'] == '2023-02-10 00:03:22'
assert cs['data'][1]['value'] == 200.0
cs = set_scada_data(p, ChangeSet({'device_id': 'sm_device', 'data': []})).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert cs['data'] == []
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert len(cs['data']) == 2
assert cs['data'][0]['time'] == '2023-02-10 00:02:22'
assert cs['data'][0]['value'] == 100.0
assert cs['data'][1]['time'] == '2023-02-10 00:03:22'
assert cs['data'][1]['value'] == 200.0
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'scada_data'
assert cs['device_id'] == 'sm_device'
assert cs['data'] == []
self.leave(p)
if __name__ == '__main__':
pytest.main()