from .operation import * from .s0_base import * PIPE_STATUS_OPEN = 'OPEN' PIPE_STATUS_CLOSED = 'CLOSED' PIPE_STATUS_CV = 'CV' def get_pipe_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'node1' : {'type': 'str' , 'optional': False , 'readonly': False}, 'node2' : {'type': 'str' , 'optional': False , 'readonly': False}, 'length' : {'type': 'float' , 'optional': False , 'readonly': False}, 'diameter' : {'type': 'float' , 'optional': False , 'readonly': False}, 'roughness' : {'type': 'float' , 'optional': False , 'readonly': False}, 'minor_loss' : {'type': 'float' , 'optional': False , 'readonly': False}, 'status' : {'type': 'str' , 'optional': False , 'readonly': False} } def get_pipe(name: str, id: str) -> dict[str, Any]: p = read(name, f"select * from pipes where id = '{id}'") d = {} d['id'] = str(p['id']) d['node1'] = str(p['node1']) d['node2'] = str(p['node2']) d['length'] = float(p['length']) d['diameter'] = float(p['diameter']) d['roughness'] = float(p['roughness']) d['minor_loss'] = float(p['minor_loss']) d['status'] = str(p['status']) return d class Pipe(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'pipe' self.id = str(input['id']) self.node1 = str(input['node1']) self.node2 = str(input['node2']) self.length = float(input['length']) self.diameter = float(input['diameter']) self.roughness = float(input['roughness']) self.minor_loss = float(input['minor_loss']) self.status = str(input['status']) self.f_type = f"'{self.type}'" self.f_id = f"'{self.id}'" self.f_node1 = f"'{self.node1}'" self.f_node2 = f"'{self.node2}'" self.f_length = self.length self.f_diameter = self.diameter self.f_roughness = self.roughness self.f_minor_loss = self.minor_loss self.f_status = f"'{self.status}'" def as_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id, 'node1': self.node1, 'node2': self.node2, 'length': self.length, 'diameter': self.diameter, 'roughness': self.roughness, 'minor_loss': self.minor_loss, 'status': self.status } def as_id_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id } def set_pipe_cache(name: str, cs: ChangeSet) -> SqlChangeSet: old = Pipe(get_pipe(name, cs.operations[0]['id'])) raw_new = get_pipe(name, cs.operations[0]['id']) new_dict = cs.operations[0] schema = get_pipe_schema(name) for key, value in schema.items(): if key in new_dict and not value['readonly']: raw_new[key] = new_dict[key] new = Pipe(raw_new) redo_sql = f"update pipes set node1 = {new.f_node1}, node2 = {new.f_node2}, length = {new.f_length}, diameter = {new.f_diameter}, roughness = {new.f_roughness}, minor_loss = {new.f_minor_loss}, status = {new.f_status} where id = {new.f_id};" undo_sql = f"update pipes set node1 = {old.f_node1}, node2 = {old.f_node2}, length = {old.f_length}, diameter = {old.f_diameter}, roughness = {old.f_roughness}, minor_loss = {old.f_minor_loss}, status = {old.f_status} where id = {old.f_id};" redo_cs = g_update_prefix | new.as_dict() undo_cs = g_update_prefix | old.as_dict() return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def set_pipe(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, set_pipe_cache(name, cs)) def add_pipe_cache(name: str, cs: ChangeSet) -> SqlChangeSet: new = Pipe(cs.operations[0]) redo_sql = f"insert into _link (id, type) values ({new.f_id}, {new.f_type});" redo_sql += f"\ninsert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ({new.f_id}, {new.f_node1}, {new.f_node2}, {new.f_length}, {new.f_diameter}, {new.f_roughness}, {new.f_minor_loss}, {new.f_status});" undo_sql = f"delete from pipes where id = {new.f_id};" undo_sql += f"\ndelete from _link where id = {new.f_id};" redo_cs = g_add_prefix | new.as_dict() undo_cs = g_delete_prefix | new.as_id_dict() return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def add_pipe(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, add_pipe_cache(name, cs)) def delete_pipe_cache(name: str, cs: ChangeSet) -> SqlChangeSet: old = Pipe(get_pipe(name, cs.operations[0]['id'])) redo_sql = f"delete from pipes where id = {old.f_id};" redo_sql += f"\ndelete from _link where id = {old.f_id};" undo_sql = f"insert into _link (id, type) values ({old.f_id}, {old.f_type});" undo_sql += f"\ninsert into pipes (id, node1, node2, length, diameter, roughness, minor_loss, status) values ({old.f_id}, {old.f_node1}, {old.f_node2}, {old.f_length}, {old.f_diameter}, {old.f_roughness}, {old.f_minor_loss}, {old.f_status});" redo_cs = g_delete_prefix | old.as_id_dict() undo_cs = g_add_prefix | old.as_dict() return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def delete_pipe(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, delete_pipe_cache(name, cs)) class InpPipe: def __init__(self, line: str) -> None: tokens = line.split() num = len(tokens) has_desc = tokens[-1].startswith(';') num_without_desc = (num - 1) if has_desc else num self.id = str(tokens[0]) self.node1 = str(tokens[1]) self.node2 = str(tokens[2]) self.length = float(tokens[3]) self.diameter = float(tokens[4]) self.roughness = float(tokens[5]) self.minor_loss = float(tokens[6]) # status is must-have, here fix input self.status = str(tokens[7].upper()) if num_without_desc >= 8 else PIPE_STATUS_OPEN self.desc = str(tokens[-1]) if has_desc else None def inp_in_pipe(section: list[str]) -> ChangeSet: cs = ChangeSet() for s in section: # skip comment if s.startswith(';'): continue obj = InpPipe(s) cs.append(g_add_prefix | {'type': 'pipe', 'id': obj.id, 'node1': obj.node1, 'node2': obj.node2, 'length': obj.length, 'diameter': obj.diameter, 'roughness': obj.roughness, 'minor_loss': obj.minor_loss, 'status': obj.status}) return cs def inp_out_pipe(name: str) -> list[str]: lines = [] objs = read_all(name, 'select * from pipes') for obj in objs: id = obj['id'] node1 = obj['node1'] node2 = obj['node2'] length = obj['length'] diameter = obj['diameter'] roughness = obj['roughness'] minor_loss = obj['minor_loss'] status = obj['status'] desc = ';' lines.append(f'{id} {node1} {node2} {length} {diameter} {roughness} {minor_loss} {status} {desc}') return lines