Files
TJWaterServer/api/database.py
2023-03-22 19:53:26 +08:00

342 lines
11 KiB
Python

from typing import Any
from psycopg.rows import dict_row, Row
from .connection import g_conn_dict as conn
API_ADD = 'add'
API_UPDATE = 'update'
API_DELETE = 'delete'
g_add_prefix = { 'operation': API_ADD }
g_update_prefix = { 'operation': API_UPDATE }
g_delete_prefix = { 'operation': API_DELETE }
class ChangeSet:
def __init__(self, ps: dict[str, Any] | None = None):
self.operations : list[dict[str, Any]] = []
if ps != None:
self.append(ps)
@staticmethod
def from_list(ps: list[dict[str, Any]]):
cs = ChangeSet()
for _cs in ps:
cs.append(_cs)
return cs
def add(self, ps: dict[str, Any]):
self.operations.append(g_add_prefix | ps)
return self
def update(self, ps: dict[str, Any]):
self.operations.append(g_update_prefix | ps)
return self
def delete(self, ps: dict[str, Any]):
self.operations.append(g_delete_prefix | ps)
return self
def append(self, ps: dict[str, Any]):
self.operations.append(ps)
return self
def merge(self, cs):
if len(cs.operations) > 0:
self.operations += cs.operations
return self
def dump(self):
for op in self.operations:
print(op)
def compress(self):
return self
class DbChangeSet:
def __init__(self, redo_sql: str, undo_sql: str, redo_cs: list[dict[str, Any]], undo_cs: list[dict[str, Any]]) -> None:
self.redo_sql = redo_sql
self.undo_sql = undo_sql
self.redo_cs = redo_cs
self.undo_cs = undo_cs
@staticmethod
def from_list(css):
redo_sql_s : list[str] = []
undo_sql_s : list[str] = []
redo_cs_s : list[dict[str, Any]] = []
undo_cs_s : list[dict[str, Any]] = []
for r in css:
redo_sql_s.append(r.redo_sql)
undo_sql_s.append(r.undo_sql)
redo_cs_s += r.redo_cs
r.undo_cs.reverse() # reverse again...
undo_cs_s += r.undo_cs
redo_sql = '\n'.join(redo_sql_s)
undo_sql_s.reverse()
undo_sql = '\n'.join(undo_sql_s)
undo_cs_s.reverse()
return DbChangeSet(redo_sql, undo_sql, redo_cs_s, undo_cs_s)
def read(name: str, sql: str) -> Row:
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(sql)
row = cur.fetchone()
if row == None:
raise Exception(sql)
return row
def read_all(name: str, sql: str) -> list[Row]:
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(sql)
return cur.fetchall()
def try_read(name: str, sql: str) -> Row | None:
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(sql)
return cur.fetchone()
def write(name: str, sql: str) -> None:
with conn[name].cursor() as cur:
cur.execute(sql)
def get_current_operation(name: str) -> int:
return int(read(name, 'select id from current_operation')['id'])
def execute_command(name: str, command: DbChangeSet) -> ChangeSet:
write(name, command.redo_sql)
parent = get_current_operation(name)
redo_sql = command.redo_sql.replace("'", "''")
undo_sql = command.undo_sql.replace("'", "''")
redo_cs_str = str(command.redo_cs).replace("'", "''")
undo_cs_str = str(command.undo_cs).replace("'", "''")
write(name, f"insert into operation (id, redo, undo, parent, redo_cs, undo_cs) values (default, '{redo_sql}', '{undo_sql}', {parent}, '{redo_cs_str}', '{undo_cs_str}')")
current = read(name, 'select max(id) as id from operation')['id']
write(name, f"update current_operation set id = {current}")
return ChangeSet.from_list(command.redo_cs)
def execute_undo(name: str, discard: bool = False) -> ChangeSet:
row = read(name, f'select * from operation where id = {get_current_operation(name)}')
write(name, row['undo'])
# update foreign key
write(name, f"update current_operation set id = {row['parent']} where id = {row['id']}")
if discard:
# update foreign key
write(name, f"update operation set redo_child = null where id = {row['parent']}")
# on delete cascade => child & snapshot
write(name, f"delete from operation where id = {row['id']}")
else:
write(name, f"update operation set redo_child = {row['id']} where id = {row['parent']}")
e = eval(row['undo_cs'])
return ChangeSet.from_list(e)
def execute_redo(name: str) -> ChangeSet:
row = read(name, f'select * from operation where id = {get_current_operation(name)}')
if row['redo_child'] == None:
return ChangeSet()
row = read(name, f"select * from operation where id = {row['redo_child']}")
write(name, row['redo'])
write(name, f"update current_operation set id = {row['id']} where id = {row['parent']}")
e = eval(row['redo_cs'])
return ChangeSet.from_list(e)
def list_snapshot(name: str) -> list[tuple[int, str]]:
rows = read_all(name, f'select * from operation order by id')
result = []
for row in rows:
result.append((int(row['id']), str(row['tag'])))
return result
def have_snapshot(name: str, tag: str) -> bool:
return try_read(name, f"select id from snapshot_operation where tag = '{tag}'") != None
def have_snapshot_for_operation(name: str, operation: int) -> bool:
return try_read(name, f"select id from snapshot_operation where id = {operation}") != None
def have_snapshot_for_current_operation(name: str) -> bool:
return have_snapshot_for_operation(name, get_current_operation(name))
def take_snapshot_for_operation(name: str, operation: int, tag: str) -> None:
if tag == None or tag == '':
return None
write(name, f"insert into snapshot_operation (id, tag) values ({operation}, '{tag}')")
def take_snapshot_for_current_operation(name: str, tag: str) -> None:
take_snapshot_for_operation(name, get_current_operation(name), tag)
# deprecated ! use take_snapshot_for_current_operation instead
def take_snapshot(name: str, tag: str) -> None:
take_snapshot_for_current_operation(name, tag)
def update_snapshot(name: str, operation: int, tag: str) -> None:
if tag == None or tag == '':
return None
if have_snapshot_for_operation(name, operation):
write(name, f"update snapshot_operation set tag = '{tag}' where id = {operation}")
else:
take_snapshot_for_operation(name, operation, tag)
def update_snapshot_for_current_operation(name: str, tag: str) -> None:
return update_snapshot(name, get_current_operation(name), tag)
def delete_snapshot(name: str, tag: str) -> None:
write(name, f"delete from snapshot_operation where tag = '{tag}'")
def delete_snapshot_by_operation(name: str, operation: int) -> None:
write(name, f"delete from snapshot_operation where id = {operation}")
def get_operation_by_snapshot(name: str, tag: str) -> int | None:
row = try_read(name, f"select id from snapshot_operation where tag = '{tag}'")
return int(row['id']) if row != None else None
def get_snapshot_by_operation(name: str, operation: int) -> str | None:
row = try_read(name, f"select tag from snapshot_operation where id = {operation}")
return str(row['tag']) if row != None else None
def _get_parents(name: str, id: int) -> list[int]:
ids = [id]
while ids[-1] != 0:
row = read(name, f'select parent from operation where id = {ids[-1]}')
ids.append(int(row['parent']))
return ids
def pick_operation(name: str, operation: int, discard: bool) -> ChangeSet:
target = operation
curr = get_current_operation(name)
curr_parents = _get_parents(name, curr)
target_parents = _get_parents(name, target)
change = ChangeSet()
if target in curr_parents:
for _ in range(curr_parents.index(target)):
change.merge(execute_undo(name, discard))
elif curr in target_parents:
target_parents.reverse()
curr_index = target_parents.index(curr)
for i in range(curr_index, len(target_parents) - 1):
write(name, f"update operation set redo_child = '{target_parents[i + 1]}' where id = '{target_parents[i]}'")
change.merge(execute_redo(name))
else:
ancestor_index = -1
while curr_parents[ancestor_index] == target_parents[ancestor_index]:
ancestor_index -= 1
ancestor = curr_parents[ancestor_index + 1]
for _ in range(curr_parents.index(ancestor)):
change.merge(execute_undo(name, discard))
target_parents.reverse()
curr_index = target_parents.index(ancestor)
for i in range(curr_index, len(target_parents) - 1):
write(name, f"update operation set redo_child = '{target_parents[i + 1]}' where id = '{target_parents[i]}'")
change.merge(execute_redo(name))
return change.compress()
def pick_snapshot(name: str, tag: str, discard: bool) -> ChangeSet:
if not have_snapshot(name, tag):
return ChangeSet()
target = int(read(name, f"select id from snapshot_operation where tag = '{tag}'")['id'])
return pick_operation(name, target, discard)
def _get_change_set(name: str, operation: int, undo: bool) -> ChangeSet:
row = read(name, f'select * from operation where id = {operation}')
field= 'undo_cs' if undo else 'redo_cs'
return ChangeSet.from_list(eval(row[field]))
def sync_with_server(name: str, operation: int) -> ChangeSet:
fr = operation
to = get_current_operation(name)
fr_parents = _get_parents(name, fr)
to_parents = _get_parents(name, to)
change = ChangeSet()
if fr in to_parents:
index = to_parents.index(fr) - 1
while index >= 0:
change.merge(_get_change_set(name, to_parents[index], False)) #redo
index -= 1
elif to in fr_parents:
index = 0
while index <= fr_parents.index(to) - 1:
change.merge(_get_change_set(name, fr_parents[index], True))
index += 1
else:
ancestor_index = -1
while fr_parents[ancestor_index] == to_parents[ancestor_index]:
ancestor_index -= 1
ancestor = fr_parents[ancestor_index + 1]
index = 0
while index <= fr_parents.index(ancestor) - 1:
change.merge(_get_change_set(name, fr_parents[index], True))
index += 1
index = to_parents.index(ancestor) - 1
while index >= 0:
change.merge(_get_change_set(name, to_parents[index], False))
index -= 1
return change.compress()
def get_restore_operation(name: str) -> int:
return read(name, f'select * from restore_operation')['id']
def set_restore_operation(name: str, operation: int) -> None:
write(name, f'update restore_operation set id = {operation}')
def set_restore_operation_to_current(name: str) -> None:
return set_restore_operation(name, get_current_operation(name))