from typing import Any from psycopg.rows import dict_row, Row from .connection import g_conn_dict as conn API_ADD = 'add' API_UPDATE = 'update' API_DELETE = 'delete' g_add_prefix = { 'operation': API_ADD } g_update_prefix = { 'operation': API_UPDATE } g_delete_prefix = { 'operation': API_DELETE } class ChangeSet: def __init__(self, ps: dict[str, Any] | None = None): self.operations : list[dict[str, Any]] = [] if ps != None: self.append(ps) @staticmethod def from_list(ps: list[dict[str, Any]]): cs = ChangeSet() for _cs in ps: cs.append(_cs) return cs def add(self, ps: dict[str, Any]): self.operations.append(g_add_prefix | ps) return self def update(self, ps: dict[str, Any]): self.operations.append(g_update_prefix | ps) return self def delete(self, ps: dict[str, Any]): self.operations.append(g_delete_prefix | ps) return self def append(self, ps: dict[str, Any]): self.operations.append(ps) return self def merge(self, cs): if len(cs.operations) > 0: self.operations += cs.operations return self def dump(self): for op in self.operations: print(op) def compress(self): return self class DbChangeSet: def __init__(self, redo_sql: str, undo_sql: str, redo_cs: list[dict[str, Any]], undo_cs: list[dict[str, Any]]) -> None: self.redo_sql = redo_sql self.undo_sql = undo_sql self.redo_cs = redo_cs self.undo_cs = undo_cs @staticmethod def from_list(css): redo_sql_s : list[str] = [] undo_sql_s : list[str] = [] redo_cs_s : list[dict[str, Any]] = [] undo_cs_s : list[dict[str, Any]] = [] for r in css: redo_sql_s.append(r.redo_sql) undo_sql_s.append(r.undo_sql) redo_cs_s += r.redo_cs r.undo_cs.reverse() # reverse again... undo_cs_s += r.undo_cs redo_sql = '\n'.join(redo_sql_s) undo_sql_s.reverse() undo_sql = '\n'.join(undo_sql_s) undo_cs_s.reverse() return DbChangeSet(redo_sql, undo_sql, redo_cs_s, undo_cs_s) def read(name: str, sql: str) -> Row: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(sql) row = cur.fetchone() if row == None: raise Exception(sql) return row def read_all(name: str, sql: str) -> list[Row]: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(sql) return cur.fetchall() def try_read(name: str, sql: str) -> Row | None: with conn[name].cursor(row_factory=dict_row) as cur: cur.execute(sql) return cur.fetchone() def write(name: str, sql: str) -> None: with conn[name].cursor() as cur: cur.execute(sql) def get_current_operation(name: str) -> int: return int(read(name, 'select id from current_operation')['id']) def execute_command(name: str, command: DbChangeSet) -> ChangeSet: op_table = read(name, "select * from operation_table")['option'] write(name, command.redo_sql) parent = get_current_operation(name) redo_sql = command.redo_sql.replace("'", "''") undo_sql = command.undo_sql.replace("'", "''") redo_cs_str = str(command.redo_cs).replace("'", "''") undo_cs_str = str(command.undo_cs).replace("'", "''") write(name, f"insert into {op_table} (id, redo, undo, parent, redo_cs, undo_cs) values (default, '{redo_sql}', '{undo_sql}', {parent}, '{redo_cs_str}', '{undo_cs_str}')") if op_table == 'operation': current = read(name, 'select max(id) as id from operation')['id'] write(name, f"update current_operation set id = {current}") return ChangeSet.from_list(command.redo_cs) def execute_undo(name: str, discard: bool = False) -> ChangeSet: row = read(name, f'select * from operation where id = {get_current_operation(name)}') write(name, row['undo']) # update foreign key write(name, f"update current_operation set id = {row['parent']} where id = {row['id']}") if discard: # update foreign key write(name, f"update operation set redo_child = null where id = {row['parent']}") # on delete cascade => child & snapshot write(name, f"delete from operation where id = {row['id']}") else: write(name, f"update operation set redo_child = {row['id']} where id = {row['parent']}") e = eval(row['undo_cs']) return ChangeSet.from_list(e) def execute_redo(name: str) -> ChangeSet: row = read(name, f'select * from operation where id = {get_current_operation(name)}') if row['redo_child'] == None: return ChangeSet() row = read(name, f"select * from operation where id = {row['redo_child']}") write(name, row['redo']) write(name, f"update current_operation set id = {row['id']} where id = {row['parent']}") e = eval(row['redo_cs']) return ChangeSet.from_list(e) def list_snapshot(name: str) -> list[tuple[int, str]]: rows = read_all(name, f'select * from snapshot_operation order by id') result = [] for row in rows: result.append((int(row['id']), str(row['tag']))) return result def have_snapshot(name: str, tag: str) -> bool: return try_read(name, f"select id from snapshot_operation where tag = '{tag}'") != None def have_snapshot_for_operation(name: str, operation: int) -> bool: return try_read(name, f"select id from snapshot_operation where id = {operation}") != None def have_snapshot_for_current_operation(name: str) -> bool: return have_snapshot_for_operation(name, get_current_operation(name)) def take_snapshot_for_operation(name: str, operation: int, tag: str) -> None: if tag == None or tag == '': return None write(name, f"insert into snapshot_operation (id, tag) values ({operation}, '{tag}')") def take_snapshot_for_current_operation(name: str, tag: str) -> None: take_snapshot_for_operation(name, get_current_operation(name), tag) # deprecated ! use take_snapshot_for_current_operation instead def take_snapshot(name: str, tag: str) -> None: take_snapshot_for_current_operation(name, tag) def update_snapshot(name: str, operation: int, tag: str) -> None: if tag == None or tag == '': return None if have_snapshot_for_operation(name, operation): write(name, f"update snapshot_operation set tag = '{tag}' where id = {operation}") else: take_snapshot_for_operation(name, operation, tag) def update_snapshot_for_current_operation(name: str, tag: str) -> None: return update_snapshot(name, get_current_operation(name), tag) def delete_snapshot(name: str, tag: str) -> None: write(name, f"delete from snapshot_operation where tag = '{tag}'") def delete_snapshot_by_operation(name: str, operation: int) -> None: write(name, f"delete from snapshot_operation where id = {operation}") def get_operation_by_snapshot(name: str, tag: str) -> int | None: row = try_read(name, f"select id from snapshot_operation where tag = '{tag}'") return int(row['id']) if row != None else None def get_snapshot_by_operation(name: str, operation: int) -> str | None: row = try_read(name, f"select tag from snapshot_operation where id = {operation}") return str(row['tag']) if row != None else None def _get_parents(name: str, id: int) -> list[int]: ids = [id] while ids[-1] != 0: row = read(name, f'select parent from operation where id = {ids[-1]}') ids.append(int(row['parent'])) return ids def pick_operation(name: str, operation: int, discard: bool) -> ChangeSet: target = operation curr = get_current_operation(name) curr_parents = _get_parents(name, curr) target_parents = _get_parents(name, target) change = ChangeSet() if target in curr_parents: for _ in range(curr_parents.index(target)): change.merge(execute_undo(name, discard)) elif curr in target_parents: target_parents.reverse() curr_index = target_parents.index(curr) for i in range(curr_index, len(target_parents) - 1): write(name, f"update operation set redo_child = '{target_parents[i + 1]}' where id = '{target_parents[i]}'") change.merge(execute_redo(name)) else: ancestor_index = -1 while curr_parents[ancestor_index] == target_parents[ancestor_index]: ancestor_index -= 1 ancestor = curr_parents[ancestor_index + 1] for _ in range(curr_parents.index(ancestor)): change.merge(execute_undo(name, discard)) target_parents.reverse() curr_index = target_parents.index(ancestor) for i in range(curr_index, len(target_parents) - 1): write(name, f"update operation set redo_child = '{target_parents[i + 1]}' where id = '{target_parents[i]}'") change.merge(execute_redo(name)) return change.compress() def pick_snapshot(name: str, tag: str, discard: bool) -> ChangeSet: if not have_snapshot(name, tag): return ChangeSet() target = int(read(name, f"select id from snapshot_operation where tag = '{tag}'")['id']) return pick_operation(name, target, discard) def _get_change_set(name: str, operation: int, undo: bool) -> ChangeSet: row = read(name, f'select * from operation where id = {operation}') field= 'undo_cs' if undo else 'redo_cs' return ChangeSet.from_list(eval(row[field])) def sync_with_server(name: str, operation: int) -> ChangeSet: fr = operation to = get_current_operation(name) fr_parents = _get_parents(name, fr) to_parents = _get_parents(name, to) change = ChangeSet() if fr in to_parents: index = to_parents.index(fr) - 1 while index >= 0: change.merge(_get_change_set(name, to_parents[index], False)) #redo index -= 1 elif to in fr_parents: index = 0 while index <= fr_parents.index(to) - 1: change.merge(_get_change_set(name, fr_parents[index], True)) index += 1 else: ancestor_index = -1 while fr_parents[ancestor_index] == to_parents[ancestor_index]: ancestor_index -= 1 ancestor = fr_parents[ancestor_index + 1] index = 0 while index <= fr_parents.index(ancestor) - 1: change.merge(_get_change_set(name, fr_parents[index], True)) index += 1 index = to_parents.index(ancestor) - 1 while index >= 0: change.merge(_get_change_set(name, to_parents[index], False)) index -= 1 return change.compress() def get_restore_operation(name: str) -> int: return read(name, f'select * from restore_operation')['id'] def set_restore_operation(name: str, operation: int) -> None: write(name, f'update restore_operation set id = {operation}') def set_restore_operation_to_current(name: str) -> None: return set_restore_operation(name, get_current_operation(name)) def restore(name: str, discard: bool) -> ChangeSet: op = get_restore_operation(name) return pick_operation(name, op, discard)