123 lines
4.8 KiB
Python
123 lines
4.8 KiB
Python
from .database import *
|
|
|
|
|
|
SCADA_DEVICE_TYPE_PRESSURE = 'PRESSURE'
|
|
SCADA_DEVICE_TYPE_DEMAND = 'DEMAND'
|
|
SCADA_DEVICE_TYPE_QUALITY = 'QUALITY'
|
|
SCADA_DEVICE_TYPE_LEVEL = 'LEVEL'
|
|
SCADA_DEVICE_TYPE_FLOW = 'FLOW'
|
|
|
|
|
|
def get_scada_device_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'id' : {'type': 'str', 'optional': False, 'readonly': True },
|
|
'name' : {'type': 'str', 'optional': True , 'readonly': False},
|
|
'address': {'type': 'str', 'optional': True , 'readonly': False},
|
|
'sd_type': {'type': 'str', 'optional': True , 'readonly': False}}
|
|
|
|
|
|
def get_scada_device(name: str, id: str) -> dict[str, Any]:
|
|
sm = try_read(name, f"select * from scada_device where id = '{id}'")
|
|
if sm == None:
|
|
return {}
|
|
d = {}
|
|
d['id'] = str(sm['id'])
|
|
d['name'] = str(sm['name']) if sm['name'] != None else None
|
|
d['address'] = str(sm['address']) if sm['address'] != None else None
|
|
d['sd_type'] = str(sm['sd_type']) if sm['sd_type'] != None else None
|
|
return d
|
|
|
|
|
|
class ScadaDevice(object):
|
|
def __init__(self, input: dict[str, Any]) -> None:
|
|
self.type = 'scada_device'
|
|
self.id = str(input['id'])
|
|
self.name = str(input['name']) if 'name' in input and input['name'] != None else None
|
|
self.address = str(input['address']) if 'address' in input and input['address'] != None else None
|
|
self.sd_type = str(input['sd_type']) if 'sd_type' in input and input['sd_type'] != None else None
|
|
|
|
self.f_type = f"'{self.type}'"
|
|
self.f_id = f"'{self.id}'"
|
|
self.f_name = f"'{self.name}'" if self.name != None else 'null'
|
|
self.f_address = f"'{self.address}'" if self.address != None else 'null'
|
|
self.f_sd_type = f"'{self.sd_type}'" if self.sd_type != None else 'null'
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id, 'name': self.name, 'address': self.address, 'sd_type': self.sd_type }
|
|
|
|
def as_id_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id }
|
|
|
|
|
|
def _set_scada_device(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = ScadaDevice(get_scada_device(name, cs.operations[0]['id']))
|
|
raw_new = get_scada_device(name, cs.operations[0]['id'])
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_scada_device_schema(name)
|
|
for key, value in schema.items():
|
|
if key in new_dict and not value['readonly']:
|
|
raw_new[key] = new_dict[key]
|
|
new = ScadaDevice(raw_new)
|
|
|
|
redo_sql = f"update scada_device set name = {new.f_name}, address = {new.f_address}, sd_type = {new.f_sd_type} where id = {new.f_id};"
|
|
undo_sql = f"update scada_device set name = {old.f_name}, address = {old.f_address}, sd_type = {old.f_sd_type} where id = {old.f_id};"
|
|
|
|
redo_cs = g_update_prefix | new.as_dict()
|
|
undo_cs = g_update_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if get_scada_device(name, cs.operations[0]['id']) == {}:
|
|
return ChangeSet()
|
|
return execute_command(name, _set_scada_device(name, cs))
|
|
|
|
|
|
def _add_scada_device(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
new = ScadaDevice(cs.operations[0])
|
|
|
|
redo_sql = f"insert into scada_device (id, name, address, sd_type) values ({new.f_id}, {new.f_name}, {new.f_address}, {new.f_sd_type});"
|
|
undo_sql = f"delete from scada_device where id = {new.f_id};"
|
|
|
|
redo_cs = g_add_prefix | new.as_dict()
|
|
undo_cs = g_delete_prefix | new.as_id_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def add_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if get_scada_device(name, cs.operations[0]['id']) != {}:
|
|
return ChangeSet()
|
|
return execute_command(name, _add_scada_device(name, cs))
|
|
|
|
|
|
def _delete_scada_device(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = ScadaDevice(get_scada_device(name, cs.operations[0]['id']))
|
|
|
|
redo_sql = f"delete from scada_device where id = {old.f_id};"
|
|
undo_sql = f"insert into scada_device (id, name, address, sd_type) values ({old.f_id}, {old.f_name}, {old.f_address}, {old.f_sd_type});"
|
|
|
|
redo_cs = g_delete_prefix | old.as_id_dict()
|
|
undo_cs = g_add_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def delete_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if get_scada_device(name, cs.operations[0]['id']) == {}:
|
|
return ChangeSet()
|
|
return execute_command(name, _delete_scada_device(name, cs))
|
|
|
|
|
|
def get_all_scada_device_ids(name: str) -> list[str]:
|
|
result : list[str] = []
|
|
rows = read_all(name, 'select id from scada_device order by id')
|
|
for row in rows:
|
|
result.append(str(row['id']))
|
|
return result
|
|
|
|
|
|
def get_all_scada_devices(name: str) -> list[dict[str, Any]]:
|
|
return read_all(name, 'select * from scada_device order by id')
|