221 lines
8.2 KiB
Python
221 lines
8.2 KiB
Python
from psycopg.rows import dict_row, Row
|
|
from .connection import g_conn_dict as conn
|
|
|
|
def _get_current_transaction(name: str) -> Row | None:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select * from transaction_operation")
|
|
return cur.fetchone()
|
|
|
|
def _get_current_transaction_id(name: str) -> int:
|
|
row = _get_current_transaction(name)
|
|
return int(row['id'])
|
|
|
|
def _remove_transaction(name: str) -> None:
|
|
with conn[name].cursor() as cur:
|
|
cur.execute(f"delete from transaction_operation")
|
|
|
|
def _remove_operation(name: str, id: int) -> None:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
# can not be >= to cascade delete since there is a tree !
|
|
cur.execute(f"delete from transaction_operation where id = {id}") # this should not happen
|
|
cur.execute(f"delete from snapshot_operation where id = {id}") # this may happen
|
|
cur.execute(f"delete from operation where id = {id}")
|
|
|
|
def _get_parents(name: str, id: int) -> list[int]:
|
|
ids = [id]
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
while ids[-1] != 0:
|
|
cur.execute(f"select parent from operation where id = {ids[-1]}")
|
|
ids.append(int(cur.fetchone()['parent']))
|
|
return ids
|
|
|
|
def _get_current_operation(name: str) -> int:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select id from current_operation")
|
|
return int(cur.fetchone()['id'])
|
|
|
|
def _update_current_operation(name: str, old_id: int, id: int) -> None:
|
|
with conn[name].cursor() as cur:
|
|
cur.execute(f"update current_operation set id = {id} where id = {old_id}")
|
|
|
|
def _add_redo_undo(name: str, redo: str, undo: str) -> int:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
parent = _get_current_operation(name)
|
|
cur.execute(f"insert into operation (id, redo, undo, parent) values (default, '{redo}', '{undo}', {parent})")
|
|
cur.execute("select max(id) from operation")
|
|
return int(cur.fetchone()['max'])
|
|
|
|
# execute curr undo
|
|
def _query_undo(name: str, id: str) -> dict[str, str]:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select undo, parent from operation where id = {id}")
|
|
return cur.fetchone()
|
|
|
|
# execute next redo
|
|
def _query_redo_child(name: str, id: str) -> str:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select redo_child from operation where id = {id}")
|
|
return cur.fetchone()['redo_child']
|
|
|
|
def _query_redo(name: str, id: str) -> dict[str, str]:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select redo from operation where id = {id}")
|
|
return cur.fetchone()['redo']
|
|
|
|
def _set_redo_child(name: str, id: int, child: int | str) -> None:
|
|
with conn[name].cursor() as cur:
|
|
cur.execute(f"update operation set redo_child = {child} where id = {id}")
|
|
|
|
def _execute(name: str, sql: str) -> None:
|
|
with conn[name].cursor() as cur:
|
|
sql = sql.replace("\"", "\'")
|
|
cur.execute(sql)
|
|
|
|
def add_operation(name: str, redo: str, undo: str) -> None:
|
|
curr = _add_redo_undo(name, redo, undo)
|
|
old = _get_current_operation(name)
|
|
_update_current_operation(name, old, curr)
|
|
|
|
def execute_undo(name: str, discard: bool = False) -> None:
|
|
curr = _get_current_operation(name)
|
|
|
|
# transaction control
|
|
if have_transaction(name):
|
|
tran = _get_current_transaction(name)
|
|
if tran != None and int(tran['id']) >= 0:
|
|
if bool(tran['strict']): # strict mode disallow undo
|
|
print("Do not allow to undo in strict transaction mode!")
|
|
return
|
|
elif int(tran['id']) >= curr: # normal mode disallow undo start point, and there is foreign key constraint
|
|
print("Do not allow to undo transaction start point!")
|
|
return
|
|
|
|
row = _query_undo(name, curr)
|
|
undo = row['undo']
|
|
if undo == '':
|
|
print("nothing to undo!")
|
|
return
|
|
|
|
parent = int(row['parent'])
|
|
_set_redo_child(name, parent, 'NULL' if discard else curr)
|
|
|
|
_execute(name, undo)
|
|
_update_current_operation(name, curr, parent)
|
|
|
|
if discard:
|
|
_remove_operation(name, curr)
|
|
|
|
def execute_redo(name: str) -> None:
|
|
curr = _get_current_operation(name)
|
|
redoChild = _query_redo_child(name, curr)
|
|
if redoChild == None:
|
|
print("nothing to redo!")
|
|
return
|
|
|
|
child = int(redoChild)
|
|
redo = _query_redo(name, child)
|
|
|
|
_execute(name, redo)
|
|
_update_current_operation(name, curr, child)
|
|
|
|
# snapshot support to checkout between different version of database
|
|
# snapshot is persistent
|
|
# since redo always remember the recently undo path
|
|
|
|
def have_snapshot(name: str, tag: str) -> bool:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select id from snapshot_operation where tag = '{tag}'")
|
|
return cur.rowcount > 0
|
|
|
|
def take_snapshot(name: str, tag: str) -> None:
|
|
if tag == None or tag == '':
|
|
print('Non empty tag is expected!')
|
|
return
|
|
|
|
curr = _get_current_operation(name)
|
|
|
|
with conn[name].cursor() as cur:
|
|
cur.execute(f"insert into snapshot_operation (id, tag) values ({curr}, '{tag}')")
|
|
|
|
def pick_snapshot(name: str, tag: str) -> None:
|
|
if tag == None or tag == '':
|
|
print('Non empty tag is expected!')
|
|
return
|
|
|
|
if not have_snapshot(name, tag):
|
|
print('No such snapshot!')
|
|
return
|
|
|
|
curr = _get_current_operation(name)
|
|
curr_parents = _get_parents(name, curr)
|
|
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select id from snapshot_operation where tag = '{tag}'")
|
|
target = int(cur.fetchone()['id'])
|
|
if target in curr_parents: # target -> curr
|
|
for i in range(curr_parents.index(target)):
|
|
execute_undo(name)
|
|
else:
|
|
target_parents = _get_parents(name, target)
|
|
if curr in target_parents: # curr -> target
|
|
for i in range(target_parents.index(curr)):
|
|
execute_redo(name)
|
|
else:
|
|
ancestor_index = -1
|
|
while curr_parents[ancestor_index] == target_parents[ancestor_index]:
|
|
ancestor_index -= 1
|
|
|
|
# ancestor -> curr
|
|
ancestor = curr_parents[ancestor_index + 1] # ancestor_index + 1 is common parent
|
|
for i in range(curr_parents.index(ancestor)):
|
|
execute_undo(name)
|
|
# ancestor -> redo, need assign redo_child
|
|
while target_parents[ancestor_index] != target:
|
|
cur.execute(f"update operation set redo_child = '{target_parents[ancestor_index]}' where id = '{target_parents[ancestor_index + 1]}'")
|
|
execute_redo(name)
|
|
ancestor_index -= 1
|
|
cur.execute(f"update operation set redo_child = '{target}' where id = '{target_parents[1]}'")
|
|
execute_redo(name)
|
|
|
|
# transaction is volatile, commit/abort will destroy transaction.
|
|
# can not redo an aborted transaction.
|
|
# but can undo a committed transaction... inconsistent...
|
|
# it may remove snapshot if it is in aborted transaction
|
|
|
|
def have_transaction(name: str) -> bool:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select * from transaction_operation")
|
|
return cur.rowcount > 0
|
|
|
|
def start_transaction(name: str, strict: bool = False) -> None:
|
|
if have_transaction(name):
|
|
print("Only support single transaction now, please commit/abort current transaction!")
|
|
return
|
|
|
|
curr = _get_current_operation(name)
|
|
|
|
with conn[name].cursor() as cur:
|
|
cur.execute(f"insert into transaction_operation (id, strict) values ({curr}, {strict});")
|
|
|
|
def commit_transaction(name: str) -> None:
|
|
if not have_transaction(name):
|
|
print("No active transaction!")
|
|
return
|
|
|
|
_remove_transaction(name)
|
|
|
|
def abort_transaction(name: str) -> None:
|
|
if not have_transaction(name):
|
|
print("No active transaction!")
|
|
return
|
|
|
|
tran = _get_current_transaction_id(name)
|
|
|
|
curr = _get_current_operation(name)
|
|
curr_parents = _get_parents(name, curr)
|
|
|
|
for i in range(curr_parents.index(tran)):
|
|
execute_undo(name, True)
|
|
|
|
_remove_transaction(name)
|