from .database import * SCADA_DEVICE_TYPE_PRESSURE = 'PRESSURE' SCADA_DEVICE_TYPE_DEMAND = 'DEMAND' SCADA_DEVICE_TYPE_QUALITY = 'QUALITY' SCADA_DEVICE_TYPE_LEVEL = 'LEVEL' SCADA_DEVICE_TYPE_FLOW = 'FLOW' SCADA_DEVICE_TYPE_UNKNOWN = 'UNKNOWN' def get_scada_device_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str', 'optional': False, 'readonly': True }, 'name' : {'type': 'str', 'optional': True , 'readonly': False}, 'address': {'type': 'str', 'optional': True , 'readonly': False}, 'sd_type': {'type': 'str', 'optional': True , 'readonly': False}} def get_scada_device(name: str, id: str) -> dict[str, Any]: sm = try_read(name, f"select * from scada_device where id = '{id}'") if sm == None: return {} d = {} d['id'] = str(sm['id']) d['name'] = str(sm['name']) if sm['name'] != None else None d['address'] = str(sm['address']) if sm['address'] != None else None d['sd_type'] = str(sm['sd_type']) if sm['sd_type'] != None else None return d class ScadaDevice(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'scada_device' self.id = str(input['id']) self.name = str(input['name']) if 'name' in input and input['name'] != None else None self.address = str(input['address']) if 'address' in input and input['address'] != None else None self.sd_type = str(input['sd_type']) if 'sd_type' in input and input['sd_type'] != None else None self.f_type = f"'{self.type}'" self.f_id = f"'{self.id}'" self.f_name = f"'{self.name}'" if self.name != None else 'null' self.f_address = f"'{self.address}'" if self.address != None else 'null' self.f_sd_type = f"'{self.sd_type}'" if self.sd_type != None else 'null' def as_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id, 'name': self.name, 'address': self.address, 'sd_type': self.sd_type } def as_id_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id } def _set_scada_device(name: str, cs: ChangeSet) -> DbChangeSet: old = ScadaDevice(get_scada_device(name, cs.operations[0]['id'])) raw_new = get_scada_device(name, cs.operations[0]['id']) new_dict = cs.operations[0] schema = get_scada_device_schema(name) for key, value in schema.items(): if key in new_dict and not value['readonly']: raw_new[key] = new_dict[key] new = ScadaDevice(raw_new) redo_sql = f"update scada_device set name = {new.f_name}, address = {new.f_address}, sd_type = {new.f_sd_type} where id = {new.f_id};" undo_sql = f"update scada_device set name = {old.f_name}, address = {old.f_address}, sd_type = {old.f_sd_type} where id = {old.f_id};" redo_cs = g_update_prefix | new.as_dict() undo_cs = g_update_prefix | old.as_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_scada_device(name: str, cs: ChangeSet) -> ChangeSet: if get_scada_device(name, cs.operations[0]['id']) == {}: return ChangeSet() return execute_command(name, _set_scada_device(name, cs), False) def _add_scada_device(name: str, cs: ChangeSet) -> DbChangeSet: new = ScadaDevice(cs.operations[0]) redo_sql = f"insert into scada_device (id, name, address, sd_type) values ({new.f_id}, {new.f_name}, {new.f_address}, {new.f_sd_type});" undo_sql = f"delete from scada_device where id = {new.f_id};" redo_cs = g_add_prefix | new.as_dict() undo_cs = g_delete_prefix | new.as_id_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def add_scada_device(name: str, cs: ChangeSet) -> ChangeSet: if get_scada_device(name, cs.operations[0]['id']) != {}: return ChangeSet() return execute_command(name, _add_scada_device(name, cs), False) def _delete_scada_device(name: str, cs: ChangeSet) -> DbChangeSet: old = ScadaDevice(get_scada_device(name, cs.operations[0]['id'])) redo_sql = f"delete from scada_device where id = {old.f_id};" undo_sql = f"insert into scada_device (id, name, address, sd_type) values ({old.f_id}, {old.f_name}, {old.f_address}, {old.f_sd_type});" redo_cs = g_delete_prefix | old.as_id_dict() undo_cs = g_add_prefix | old.as_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def delete_scada_device(name: str, cs: ChangeSet) -> ChangeSet: if get_scada_device(name, cs.operations[0]['id']) == {}: return ChangeSet() return execute_command(name, _delete_scada_device(name, cs), False) def get_all_scada_device_ids(name: str) -> list[str]: result : list[str] = [] rows = read_all(name, 'select id from scada_device order by id') for row in rows: result.append(str(row['id'])) return result def get_all_scada_devices(name: str) -> list[dict[str, Any]]: return read_all(name, 'select * from scada_device order by id')