306 lines
9.1 KiB
Python
306 lines
9.1 KiB
Python
from typing import Any
|
|
from psycopg.rows import dict_row, Row
|
|
from .connection import g_conn_dict as conn
|
|
|
|
API_ADD = 'add'
|
|
API_UPDATE = 'update'
|
|
API_DELETE = 'delete'
|
|
|
|
g_add_prefix = { 'operation': API_ADD }
|
|
g_update_prefix = { 'operation': API_UPDATE }
|
|
g_delete_prefix = { 'operation': API_DELETE }
|
|
|
|
|
|
class ChangeSet:
|
|
def __init__(self, ps: dict[str, Any] | None = None):
|
|
self.operations : list[dict[str, Any]] = []
|
|
if ps != None:
|
|
self.append(ps)
|
|
|
|
def add(self, ps: dict[str, Any]):
|
|
self.operations.append(g_add_prefix | ps)
|
|
return self
|
|
|
|
def update(self, ps: dict[str, Any]):
|
|
self.operations.append(g_update_prefix | ps)
|
|
return self
|
|
|
|
def delete(self, ps: dict[str, Any]):
|
|
self.operations.append(g_delete_prefix | ps)
|
|
return self
|
|
|
|
def append(self, ps: dict[str, Any]):
|
|
self.operations.append(ps)
|
|
return self
|
|
|
|
def merge(self, cs):
|
|
if len(cs.operations) > 0:
|
|
self.operations += cs.operations
|
|
return self
|
|
|
|
def compress(self):
|
|
return self
|
|
|
|
|
|
class SqlChangeSet:
|
|
def __init__(self, redo_sql: str, undo_sql: str, redo_cs: dict[str, str], undo_cs: dict[str, str]) -> None:
|
|
self.redo_sql = redo_sql
|
|
self.undo_sql = undo_sql
|
|
self.redo_cs = redo_cs
|
|
self.undo_cs = undo_cs
|
|
|
|
class BatchSqlChangeSet:
|
|
def __init__(self, redo_sql: str, undo_sql: str, redo_cs: list[dict[str, str]], undo_cs: list[dict[str, str]]) -> None:
|
|
self.redo_sql = redo_sql
|
|
self.undo_sql = undo_sql
|
|
self.redo_cs = redo_cs
|
|
self.undo_cs = undo_cs
|
|
|
|
|
|
def read(name: str, sql: str) -> Row:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(sql)
|
|
row = cur.fetchone()
|
|
if row == None:
|
|
raise Exception(sql)
|
|
return row
|
|
|
|
|
|
def read_all(name: str, sql: str) -> list[Row]:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(sql)
|
|
return cur.fetchall()
|
|
|
|
|
|
def try_read(name: str, sql: str) -> Row | None:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(sql)
|
|
return cur.fetchone()
|
|
|
|
|
|
def write(name: str, sql: str) -> None:
|
|
with conn[name].cursor() as cur:
|
|
cur.execute(sql)
|
|
|
|
|
|
def get_current_operation(name: str) -> int:
|
|
return int(read(name, 'select id from current_operation')['id'])
|
|
|
|
|
|
def execute_command(name: str, command: SqlChangeSet) -> ChangeSet:
|
|
write(name, command.redo_sql)
|
|
|
|
parent = get_current_operation(name)
|
|
redo_sql = command.redo_sql.replace("'", "''")
|
|
undo_sql = command.undo_sql.replace("'", "''")
|
|
redo_cs_str = str(command.redo_cs).replace("'", "''")
|
|
undo_cs_str = str(command.undo_cs).replace("'", "''")
|
|
write(name, f"insert into operation (id, redo, undo, parent, redo_cs, undo_cs) values (default, '{redo_sql}', '{undo_sql}', {parent}, '{redo_cs_str}', '{undo_cs_str}')")
|
|
|
|
current = read(name, 'select max(id) as id from operation')['id']
|
|
write(name, f"update current_operation set id = {current}")
|
|
|
|
return ChangeSet(command.redo_cs)
|
|
|
|
|
|
def execute_batch(name: str, redo_sql: str, undo_sql: str, redo_cs_s: list[dict[str, Any]], undo_cs_s: list[dict[str, Any]]) -> ChangeSet:
|
|
write(name, redo_sql)
|
|
|
|
parent = get_current_operation(name)
|
|
redo_sql = redo_sql.replace("'", "''")
|
|
undo_sql = undo_sql.replace("'", "''")
|
|
redo_cs_str = str(redo_cs_s).replace("'", "''")
|
|
undo_cs_str = str(undo_cs_s).replace("'", "''")
|
|
write(name, f"insert into operation (id, redo, undo, parent, redo_cs, undo_cs) values (default, '{redo_sql}', '{undo_sql}', {parent}, '{redo_cs_str}', '{undo_cs_str}')")
|
|
|
|
current = read(name, 'select max(id) as id from operation')['id']
|
|
write(name, f"update current_operation set id = {current}")
|
|
|
|
cs = ChangeSet()
|
|
for r_cs in redo_cs_s:
|
|
cs.append(r_cs)
|
|
|
|
return cs
|
|
|
|
|
|
def execute_undo(name: str, discard: bool = False) -> ChangeSet:
|
|
row = read(name, f'select * from operation where id = {get_current_operation(name)}')
|
|
|
|
write(name, row['undo'])
|
|
|
|
# update foreign key
|
|
write(name, f"update current_operation set id = {row['parent']} where id = {row['id']}")
|
|
|
|
if discard:
|
|
# update foreign key
|
|
write(name, f"update operation set redo_child = null where id = {row['parent']}")
|
|
# on delete cascade => child & snapshot
|
|
write(name, f"delete from operation where id = {row['id']}")
|
|
else:
|
|
write(name, f"update operation set redo_child = {row['id']} where id = {row['parent']}")
|
|
|
|
e = eval(row['undo_cs'])
|
|
if isinstance(e, type({})):
|
|
return ChangeSet(e)
|
|
|
|
cs = ChangeSet()
|
|
for _cs in e:
|
|
cs.append(_cs)
|
|
|
|
return cs
|
|
|
|
|
|
def execute_redo(name: str) -> ChangeSet:
|
|
row = read(name, f'select * from operation where id = {get_current_operation(name)}')
|
|
if row['redo_child'] == None:
|
|
return ChangeSet()
|
|
|
|
row = read(name, f"select * from operation where id = {row['redo_child']}")
|
|
write(name, row['redo'])
|
|
|
|
write(name, f"update current_operation set id = {row['id']} where id = {row['parent']}")
|
|
|
|
e = eval(row['redo_cs'])
|
|
if isinstance(e, type({})):
|
|
return ChangeSet(e)
|
|
|
|
cs = ChangeSet()
|
|
for _cs in e:
|
|
cs.append(_cs)
|
|
|
|
return cs
|
|
|
|
|
|
def have_snapshot(name: str, tag: str) -> bool:
|
|
return read(name, f"select id from snapshot_operation where tag = '{tag}'") != None
|
|
|
|
|
|
def take_snapshot(name: str, tag: str) -> int | None:
|
|
if tag == None or tag == '':
|
|
return None
|
|
|
|
current = get_current_operation(name)
|
|
write(name, f"insert into snapshot_operation (id, tag) values ({current}, '{tag}')")
|
|
return current
|
|
|
|
|
|
def _get_parents(name: str, id: int) -> list[int]:
|
|
ids = [id]
|
|
while ids[-1] != 0:
|
|
row = read(name, f'select parent from operation where id = {ids[-1]}')
|
|
ids.append(int(row['parent']))
|
|
return ids
|
|
|
|
|
|
def pick_operation(name: str, operation: int, discard: bool) -> ChangeSet:
|
|
target = operation
|
|
curr = get_current_operation(name)
|
|
|
|
curr_parents = _get_parents(name, curr)
|
|
target_parents = _get_parents(name, target)
|
|
|
|
change = ChangeSet()
|
|
|
|
if target in curr_parents:
|
|
for _ in range(curr_parents.index(target)):
|
|
change.merge(execute_undo(name, discard))
|
|
|
|
elif curr in target_parents:
|
|
target_parents.reverse()
|
|
curr_index = target_parents.index(curr)
|
|
for i in range(curr_index, len(target_parents) - 1):
|
|
write(name, f"update operation set redo_child = '{target_parents[i + 1]}' where id = '{target_parents[i]}'")
|
|
change.merge(execute_redo(name))
|
|
|
|
else:
|
|
ancestor_index = -1
|
|
while curr_parents[ancestor_index] == target_parents[ancestor_index]:
|
|
ancestor_index -= 1
|
|
ancestor = curr_parents[ancestor_index + 1]
|
|
|
|
for _ in range(curr_parents.index(ancestor)):
|
|
change.merge(execute_undo(name, discard))
|
|
|
|
target_parents.reverse()
|
|
curr_index = target_parents.index(ancestor)
|
|
for i in range(curr_index, len(target_parents) - 1):
|
|
write(name, f"update operation set redo_child = '{target_parents[i + 1]}' where id = '{target_parents[i]}'")
|
|
change.merge(execute_redo(name))
|
|
|
|
return change.compress()
|
|
|
|
|
|
def pick_snapshot(name: str, tag: str, discard: bool) -> ChangeSet:
|
|
if not have_snapshot(name, tag):
|
|
return ChangeSet()
|
|
|
|
target = int(read(name, f"select id from snapshot_operation where tag = '{tag}'")['id'])
|
|
return pick_operation(name, target, discard)
|
|
|
|
|
|
def _get_change_set(name: str, operation: int, undo: bool) -> ChangeSet:
|
|
row = read(name, f'select * from operation where id = {operation}')
|
|
|
|
cs = ChangeSet()
|
|
if undo:
|
|
e = eval(row['undo_cs'])
|
|
if isinstance(e, type({})):
|
|
return ChangeSet(e)
|
|
|
|
cs = ChangeSet()
|
|
for _cs in e:
|
|
cs.append(_cs)
|
|
|
|
return cs
|
|
else:
|
|
e = eval(row['redo_cs'])
|
|
if isinstance(e, type({})):
|
|
return ChangeSet(e)
|
|
|
|
cs = ChangeSet()
|
|
for _cs in e:
|
|
cs.append(_cs)
|
|
|
|
return cs
|
|
|
|
|
|
def sync_with_server(name: str, operation: int) -> ChangeSet:
|
|
fr = operation
|
|
to = get_current_operation(name)
|
|
|
|
fr_parents = _get_parents(name, fr)
|
|
to_parents = _get_parents(name, to)
|
|
|
|
change = ChangeSet()
|
|
|
|
if fr in to_parents:
|
|
index = to_parents.index(fr) - 1
|
|
while index >= 0:
|
|
change.merge(_get_change_set(name, to_parents[index], False)) #redo
|
|
index -= 1
|
|
|
|
elif to in fr_parents:
|
|
index = 0
|
|
while index <= fr_parents.index(to) - 1:
|
|
change.merge(_get_change_set(name, fr_parents[index], True))
|
|
index += 1
|
|
|
|
else:
|
|
ancestor_index = -1
|
|
while fr_parents[ancestor_index] == to_parents[ancestor_index]:
|
|
ancestor_index -= 1
|
|
|
|
ancestor = fr_parents[ancestor_index + 1]
|
|
|
|
index = 0
|
|
while index <= fr_parents.index(ancestor) - 1:
|
|
change.merge(_get_change_set(name, fr_parents[index], True))
|
|
index += 1
|
|
|
|
index = to_parents.index(ancestor) - 1
|
|
while index >= 0:
|
|
change.merge(_get_change_set(name, to_parents[index], False))
|
|
index -= 1
|
|
|
|
return change.compress()
|