from psycopg.rows import dict_row, Row from .connection import g_conn_dict as conn def _get_current_transaction(name: str) -> Row | None: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select id from transaction_operation") return cur.fetchone() def _get_current_transaction_id(name: str) -> int: row = _get_current_transaction(name) return int(row['id']) def _remove_transaction(name: str) -> None: with conn[name].cursor() as cur: cur.execute(f"delete from transaction_operation") def _remove_operation(name: str, id: int) -> None: with conn[name].cursor(row_factory=dict_row) as cur: # can not be >= since there is a tree ! cur.execute(f"delete from transaction_operation where id = {id}") # this should not happen cur.execute(f"delete from snapshot_operation where id = {id}") # this may happen cur.execute(f"delete from operation where id = {id}") return int(cur.fetchone()['id']) def _get_parents(name: str, id: int) -> list[int]: ids = [id] with conn[name].cursor(row_factory=dict_row) as cur: while ids[-1] != 0: cur.execute(f"select parent from operation where id = {ids[-1]}") ids.append(int(cur.fetchone()['parent'])) return ids def _get_current_operation(name: str) -> int: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select id from current_operation") return int(cur.fetchone()['id']) def _update_current_operation(name: str, old_id: int, id: int) -> None: with conn[name].cursor() as cur: cur.execute(f"update current_operation set id = {id} where id = {old_id}") def _add_redo_undo(name: str, redo: str, undo: str) -> int: with conn[name].cursor(row_factory=dict_row) as cur: parent = _get_current_operation(name) cur.execute(f"insert into operation (id, redo, undo, parent) values (default, '{redo}', '{undo}', {parent})") cur.execute("select max(id) from operation") return int(cur.fetchone()['max']) # execute curr undo def _query_undo(name: str, id: str) -> dict[str, str]: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select undo, parent from operation where id = {id}") return cur.fetchone() # execute next redo def _query_redo_child(name: str, id: str) -> str: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select redo_child from operation where id = {id}") return cur.fetchone()['redo_child'] def _query_redo(name: str, id: str) -> dict[str, str]: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select redo from operation where id = {id}") return cur.fetchone()['redo'] def _set_redo_child(name: str, id: int, child: int | str) -> None: with conn[name].cursor() as cur: cur.execute(f"update operation set redo_child = {child} where id = {id}") def _execute(name: str, sql: str) -> None: with conn[name].cursor() as cur: sql = sql.replace("\"", "\'") cur.execute(sql) def add_operation(name: str, redo: str, undo: str) -> None: curr = _add_redo_undo(name, redo, undo) old = _get_current_operation(name) _update_current_operation(name, old, curr) def execute_undo(name: str, discard: bool = False) -> None: curr = _get_current_operation(name) # transaction control tran = _get_current_transaction(name) if int(tran['id']) >= 0: if bool(tran['strict']): # strict mode disallow undo print("Do not allow to undo in strict transaction mode!") return elif tran <= curr: # normal mode disallow undo start point, and there is foreign key constraint print("Do not allow to undo transaction start point!") return row = _query_undo(name, curr) undo = row['undo'] if undo == '': print("nothing to undo!") return parent = int(row['parent']) _set_redo_child(name, parent, 'NULL' if discard else curr) _execute(name, undo) _update_current_operation(name, curr, parent) if discard: _remove_operation(name, curr) def execute_redo(name: str) -> None: curr = _get_current_operation(name) redoChild = _query_redo_child(name, curr) if redoChild == None: print("nothing to redo!") return child = int(redoChild) redo = _query_redo(name, child) _execute(name, redo) _update_current_operation(name, curr, child) # snapshot support to check out between different version of database # snapshot is persistent # since redo always remember the recently undo path def have_snapshot(name: str, tag: str) -> bool: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select id from snapshot_operation where tag = '{tag}'") return cur.rowcount > 0 def take_snapshot(name: str, tag: str) -> None: if tag == None or tag == '': print('Non empty tag is expected!') return curr = _get_current_operation(name) with conn[name].cursor() as cur: cur.execute(f"insert into snapshot_operation (id, tag) values ({curr}, '{tag}')") def pick_snapshot(name: str, tag: str) -> None: if tag == None or tag == '': print('Non empty tag is expected!') return if not have_snapshot(name, tag): print('No such snapshot!') return curr = _get_current_operation(name) curr_parents = _get_parents(name, curr) with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(f"select id from snapshot_operation where tag = '{tag}'") target = int(cur.fetchone()['id']) if target in curr_parents: # target -> curr for i in range(curr_parents.index(target)): execute_undo(name) else: target_parents = _get_parents(name, target) if curr in target_parents: # curr -> target for i in range(target_parents.index(curr)): execute_redo(name) else: ancestor_index = -1 while curr_parents[ancestor_index] == target_parents[ancestor_index]: ancestor_index -= 1 # ancestor -> curr ancestor = curr_parents[ancestor_index + 1] # ancestor_index + 1 is common parent for i in range(curr_parents.index(ancestor)): execute_undo(name) # ancestor -> redo, need assign redo_child while target_parents[ancestor_index] != target: cur.execute(f"update operation set redo_child = '{target_parents[ancestor_index]}' where id = '{target_parents[ancestor_index + 1]}'") execute_redo(name) ancestor_index -= 1 cur.execute(f"update operation set redo_child = '{target}' where id = '{target_parents[1]}'") execute_redo(name) # transaction is volatile, commit/rollback will destroy transaction. # can not redo a rollback transaction. # but can undo a committed transaction... inconsistent... # it may remove snapshot tag if snapshot in a rollback transaction def have_transaction(name: str) -> bool: return _get_current_transaction_id(name) >= 0 def start_transaction(name: str, strict: bool = False) -> None: if have_transaction(name): print("Only support single transaction now, please commit/rollback current transaction!") return curr = _get_current_operation(name) with conn[name].cursor() as cur: cur.execute(f"insert into transaction_operation (id, strict) values ({curr}, {strict});") def commit_transaction(name: str) -> None: if not have_transaction(name): print("No active transaction!") return _remove_transaction(name) def rollback_transaction(name: str) -> None: tran = _get_current_transaction_id(name) if tran >= 0: print("No active transaction!") return curr = _get_current_operation(name) curr_parents = _get_parents(name, curr) for i in range(curr_parents.index(tran)): execute_undo(name, True) _remove_transaction(name)