Refine scada api
This commit is contained in:
120
api/s29_scada_device.py
Normal file
120
api/s29_scada_device.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from .database import *
|
||||
|
||||
|
||||
SCADA_DEVICE_TYPE_PRESSURE = 'PRESSURE'
|
||||
SCADA_DEVICE_TYPE_DEMAND = 'DEMAND'
|
||||
SCADA_DEVICE_TYPE_QUALITY = 'QUALITY'
|
||||
SCADA_DEVICE_TYPE_LEVEL = 'LEVEL'
|
||||
SCADA_DEVICE_TYPE_FLOW = 'FLOW'
|
||||
|
||||
|
||||
def get_scada_device_schema(name: str) -> dict[str, dict[str, Any]]:
|
||||
return { 'id' : {'type': 'str', 'optional': False, 'readonly': True },
|
||||
'name' : {'type': 'str', 'optional': True , 'readonly': False},
|
||||
'address': {'type': 'str', 'optional': True , 'readonly': False},
|
||||
'sd_type': {'type': 'str', 'optional': True , 'readonly': False}}
|
||||
|
||||
|
||||
def get_scada_device(name: str, id: str) -> dict[str, Any]:
|
||||
sm = try_read(name, f"select * from scada_device where id = '{id}'")
|
||||
if sm == None:
|
||||
return {}
|
||||
d = {}
|
||||
d['id'] = str(sm['id'])
|
||||
d['name'] = str(sm['name']) if sm['name'] != None else None
|
||||
d['address'] = str(sm['address']) if sm['address'] != None else None
|
||||
d['sd_type'] = str(sm['type']) if sm['type'] != None else None
|
||||
return d
|
||||
|
||||
|
||||
class ScadaDevice(object):
|
||||
def __init__(self, input: dict[str, Any]) -> None:
|
||||
self.type = 'scada_device'
|
||||
self.id = str(input['id'])
|
||||
self.name = str(input['name']) if 'name' in input and input['name'] != None else None
|
||||
self.address = str(input['address']) if 'address' in input and input['address'] != None else None
|
||||
self.sd_type = str(input['sd_type']) if 'sd_type' in input and input['sd_type'] != None else None
|
||||
|
||||
self.f_type = f"'{self.type}'"
|
||||
self.f_id = f"'{self.id}'"
|
||||
self.f_name = f"'{self.name}'" if self.name != None else 'null'
|
||||
self.f_address = f"'{self.address}'" if self.address != None else 'null'
|
||||
self.f_sd_type = f"'{self.sd_type}'" if self.sd_type != None else 'null'
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
return { 'type': self.type, 'id': self.id, 'name': self.name, 'address': self.address, 'sd_type': self.sd_type }
|
||||
|
||||
def as_id_dict(self) -> dict[str, Any]:
|
||||
return { 'type': self.type, 'id': self.id }
|
||||
|
||||
|
||||
def set_scada_device_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
old = ScadaDevice(get_scada_device(name, cs.operations[0]['id']))
|
||||
raw_new = get_scada_device(name, cs.operations[0]['id'])
|
||||
|
||||
new_dict = cs.operations[0]
|
||||
schema = get_scada_device_schema(name)
|
||||
for key, value in schema.items():
|
||||
if key in new_dict and not value['readonly']:
|
||||
raw_new[key] = new_dict[key]
|
||||
new = ScadaDevice(raw_new)
|
||||
|
||||
redo_sql = f"update scada_device set name = {new.f_name}, address = {new.f_address}, type = {new.f_sd_type} where id = {new.f_id};"
|
||||
undo_sql = f"update scada_device set name = {old.f_name}, address = {old.f_address}, type = {old.f_sd_type} where id = {old.f_id};"
|
||||
|
||||
redo_cs = g_update_prefix | new.as_dict()
|
||||
undo_cs = g_update_prefix | old.as_dict()
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def set_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
if get_scada_device(name, cs.operations[0]['id']) == {}:
|
||||
return ChangeSet()
|
||||
return execute_command(name, set_scada_device_cmd(name, cs))
|
||||
|
||||
|
||||
def add_scada_device_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
new = ScadaDevice(cs.operations[0])
|
||||
|
||||
redo_sql = f"insert into scada_device (id, name, address, type) values ({new.f_id}, {new.f_name}, {new.f_address}, {new.f_sd_type});"
|
||||
undo_sql = f"delete from scada_device where id = {new.f_id};"
|
||||
|
||||
redo_cs = g_add_prefix | new.as_dict()
|
||||
undo_cs = g_delete_prefix | new.as_id_dict()
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def add_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
if get_scada_device(name, cs.operations[0]['id']) != {}:
|
||||
return ChangeSet()
|
||||
return execute_command(name, add_scada_device_cmd(name, cs))
|
||||
|
||||
|
||||
def delete_scada_device_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
old = ScadaDevice(get_scada_device(name, cs.operations[0]['id']))
|
||||
|
||||
redo_sql = f"delete from scada_device where id = {old.f_id};"
|
||||
undo_sql = f"insert into scada_device (id, name, address, type) values ({old.f_id}, {old.f_name}, {old.f_address}, {old.f_sd_type});"
|
||||
|
||||
redo_cs = g_delete_prefix | old.as_id_dict()
|
||||
undo_cs = g_add_prefix | old.as_dict()
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def delete_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
if get_scada_device(name, cs.operations[0]['id']) == {}:
|
||||
return ChangeSet()
|
||||
return execute_command(name, delete_scada_device_cmd(name, cs))
|
||||
|
||||
|
||||
def clean_scada_device_cmd(name: str) -> ChangeSet:
|
||||
cs = ChangeSet()
|
||||
|
||||
rows = read_all(name, 'select * from scada_device')
|
||||
for row in rows:
|
||||
cs.delete({ 'type': 'scada_device', 'id': row['id'] })
|
||||
|
||||
return cs
|
||||
Reference in New Issue
Block a user