from psycopg.rows import dict_row from connection import g_conn_dict as conn def _get_current_operation(name: str) -> int: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select id from current_operation") return int(cur.fetchone()['id']) def _update_current_operation(name: str, old_id: int, id: int) -> None: with conn[name].cursor() as cur: cur.execute(f"update current_operation set id = {id} where id = {old_id}") def _add_redo_undo(name: str, redo: str, undo: str) -> int: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute("select max(id) from operation") parent = int(cur.fetchone()['max']) cur.execute(f"insert into operation (id, redo, undo, parent) values (default, '{redo}', '{undo}', {parent})") return parent + 1 # execute curr undo def _query_undo(name: str, id: str) -> dict[str, str]: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select undo, parent from operation where id = {id}") return cur.fetchone() # execute next redo def _query_redo_child(name: str, id: str) -> str: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select redo_child from operation where id = {id}") return cur.fetchone()['redo_child'] def _query_redo(name: str, id: str) -> dict[str, str]: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select redo from operation where id = {id}") return cur.fetchone()['redo'] def _set_redo_child(name: str, id: str, child: str) -> None: with conn[name].cursor() as cur: cur.execute(f"update operation set redo_child = {child} where id = {id}") def _execute(name: str, sql: str) -> None: with conn[name].cursor() as cur: sql = sql.replace("\"", "\'") cur.execute(sql) def add_operation(name: str, redo: str, undo: str) -> None: curr = _add_redo_undo(name, redo, undo) old = _get_current_operation(name) _update_current_operation(name, old, curr) def execute_undo(name: str) -> None: curr = _get_current_operation(name) row = _query_undo(name, curr) undo = row['undo'] if undo == '': print("nothing to undo!") return parent = int(row['parent']) _set_redo_child(name, parent, curr) _execute(name, undo) _update_current_operation(name, curr, parent) def execute_redo(name: str) -> None: curr = _get_current_operation(name) redoChild = _query_redo_child(name, curr) if redoChild == None: print("nothing to redo!") return child = int(redoChild) redo = _query_redo(name, child) _execute(name, redo) _update_current_operation(name, curr, child)