from .database import * from .s0_base import * SCADA_TYPE_PRESSURE = 'PRESSURE' SCADA_TYPE_DEMAND = 'DEMAND' SCADA_TYPE_QUALITY = 'QUALITY' SCADA_TYPE_LEVEL = 'LEVEL' SCADA_TYPE_FLOW = 'FLOW' SCADA_MODEL_TYPE_JUNCTION = 'JUNCTION' SCADA_MODEL_TYPE_RESERVOIR = 'RESERVOIR' SCADA_MODEL_TYPE_TANK = 'TANK' SCADA_MODEL_TYPE_PIPE = 'PIPE' SCADA_MODEL_TYPE_PUMP = 'PUMP' SCADA_MODEL_TYPE_VALVE = 'VALVE' SCADA_ELEMENT_STATUS_OFFLINE = 'OFF' SCADA_ELEMENT_STATUS_ONLINE = 'ON' _scada_model_types = [SCADA_MODEL_TYPE_JUNCTION, SCADA_MODEL_TYPE_RESERVOIR, SCADA_MODEL_TYPE_TANK, SCADA_MODEL_TYPE_PIPE, SCADA_MODEL_TYPE_PUMP, SCADA_MODEL_TYPE_VALVE] def _check_model(name: str, cs: ChangeSet) -> bool: has_model_id = 'model_id' in cs.operations[0] has_model_type = 'model_type' in cs.operations[0] if has_model_id and has_model_type: pass elif has_model_id and not has_model_type: return False elif not has_model_id and has_model_type: return False elif not has_model_id and not has_model_type: return True _model_id = cs.operations[0]['model_id'] _model_type = cs.operations[0]['model_type'] if _model_type == SCADA_MODEL_TYPE_JUNCTION: return is_junction(name, _model_id) elif _model_type == SCADA_MODEL_TYPE_RESERVOIR: return is_reservoir(name, _model_id) elif _model_type == SCADA_MODEL_TYPE_TANK: return is_tank(name, _model_id) elif _model_type == SCADA_MODEL_TYPE_PIPE: return is_pipe(name, _model_id) elif _model_type == SCADA_MODEL_TYPE_PUMP: return is_pump(name, _model_id) elif _model_type == SCADA_MODEL_TYPE_VALVE: return is_valve(name, _model_id) return False def get_scada_element_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'x' : {'type': 'float' , 'optional': False , 'readonly': False}, 'y' : {'type': 'float' , 'optional': False , 'readonly': False}, 'device_id' : {'type': 'str' , 'optional': True , 'readonly': False}, 'model_id' : {'type': 'str' , 'optional': True , 'readonly': False}, 'model_type' : {'type': 'str' , 'optional': True , 'readonly': False}, 'status' : {'type': 'str' , 'optional': True , 'readonly': False} } def get_scada_element(name: str, id: str) -> dict[str, Any]: sm = try_read(name, f"select * from scada_element where id = '{id}'") if sm == None: return {} d = {} d['id'] = str(sm['id']) d['x'] = float(sm['x']) d['y'] = float(sm['y']) d['device_id'] = str(sm['device_id']) if sm['device_id'] != None else None d['model_id'] = str(sm['model_id']) if sm['model_id'] != None else None d['model_type'] = str(sm['model_type']) if sm['model_type'] != None else None d['status'] = str(sm['status']) return d class ScadaModel(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'scada_element' self.id = str(input['id']) self.x = float(input['x']) self.y = float(input['y']) self.device_id = str(input['device_id']) if 'device_id' in input and input['device_id'] != None else None self.model_id = str(input['model_id']) if 'model_id' in input and input['model_id'] != None else None self.model_type = str(input['model_type']) if 'model_type' in input and input['model_type'] != None else None self.status = str(input['status']) if 'status' in input and input['status'] != None else SCADA_ELEMENT_STATUS_OFFLINE self.f_type = f"'{self.type}'" self.f_id = f"'{self.id}'" self.f_x = self.x self.f_y = self.y self.f_device_id = f"'{self.device_id}'" if self.device_id != None else 'null' self.f_model_id = f"'{self.model_id}'" if self.model_id != None else 'null' self.f_model_type = f"'{self.model_type}'" if self.model_type != None else 'null' self.f_status = f"'{self.status}'" def as_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id, 'x': self.x, 'y': self.y, 'device_id': self.device_id, 'model_id': self.model_id, 'model_type': self.model_type, 'status': self.status } def as_id_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id } def _set_scada_element(name: str, cs: ChangeSet) -> DbChangeSet: old = ScadaModel(get_scada_element(name, cs.operations[0]['id'])) raw_new = get_scada_element(name, cs.operations[0]['id']) new_dict = cs.operations[0] schema = get_scada_element_schema(name) for key, value in schema.items(): if key in new_dict and not value['readonly']: raw_new[key] = new_dict[key] new = ScadaModel(raw_new) redo_sql = f"update scada_element set x = {new.f_x}, y = {new.f_y}, device_id = {new.f_device_id}, model_id = {new.f_model_id}, model_type = {new.f_model_type}, status = {new.f_status} where id = {new.f_id};" undo_sql = f"update scada_element set x = {old.f_x}, y = {old.f_y}, device_id = {old.f_device_id}, model_id = {old.f_model_id}, model_type = {old.f_model_type}, status = {old.f_status} where id = {old.f_id};" redo_cs = g_update_prefix | new.as_dict() undo_cs = g_update_prefix | old.as_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_scada_element(name: str, cs: ChangeSet) -> ChangeSet: if get_scada_element(name, cs.operations[0]['id']) == {}: return ChangeSet() if _check_model(name, cs) == False: return ChangeSet() return execute_command(name, _set_scada_element(name, cs)) def _add_scada_element(name: str, cs: ChangeSet) -> DbChangeSet: new = ScadaModel(cs.operations[0]) redo_sql = f"insert into scada_element (id, x, y, device_id, model_id, model_type, status) values ({new.f_id}, {new.f_x}, {new.f_y}, {new.f_device_id}, {new.f_model_id}, {new.f_model_type}, {new.f_status});" undo_sql = f"delete from scada_element where id = {new.f_id};" redo_cs = g_add_prefix | new.as_dict() undo_cs = g_delete_prefix | new.as_id_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def add_scada_element(name: str, cs: ChangeSet) -> ChangeSet: if get_scada_element(name, cs.operations[0]['id']) != {}: return ChangeSet() if _check_model(name, cs) == False: return ChangeSet() return execute_command(name, _add_scada_element(name, cs)) def _delete_scada_element(name: str, cs: ChangeSet) -> DbChangeSet: old = ScadaModel(get_scada_element(name, cs.operations[0]['id'])) redo_sql = f"delete from scada_element where id = {old.f_id};" undo_sql = f"insert into scada_element (id, x, y, device_id, model_id, model_type, status) values ({old.f_id}, {old.f_x}, {old.f_y}, {old.f_device_id}, {old.f_model_id}, {old.f_model_type}, {old.f_status});" redo_cs = g_delete_prefix | old.as_id_dict() undo_cs = g_add_prefix | old.as_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def delete_scada_element(name: str, cs: ChangeSet) -> ChangeSet: if get_scada_element(name, cs.operations[0]['id']) == {}: return ChangeSet() return execute_command(name, _delete_scada_element(name, cs)) def get_all_scada_element_ids(name: str) -> list[str]: result : list[str] = [] rows = read_all(name, 'select id from scada_element order by id') for row in rows: result.append(str(row['id'])) return result # # create table scada_element # ( # id text primary key # , x float8 not null # , y float8 not null # , device_id text references scada_device(id) # , model_id varchar(32) -- add constraint in API # , model_type scada_model_type # , status scada_element_status not null default 'OFF' # ); # # 返回list,list里每个item是dict,内容是 'id':'abc' 这样 # scada_model type 是类似pressure,flow之类的,是由Device 决定 的 def get_all_scada_elements(name: str) -> list[dict[str, Any]]: return read_all(name, 'select * from scada_element order by id')