138 lines
4.6 KiB
Python
138 lines
4.6 KiB
Python
from .database import *
|
|
|
|
|
|
def get_label_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'x' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'y' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'label' : {'type': 'str' , 'optional': False , 'readonly': False},
|
|
'node' : {'type': 'str' , 'optional': True , 'readonly': False} }
|
|
|
|
|
|
def get_label(name: str, x: float, y: float) -> dict[str, Any]:
|
|
d = {}
|
|
d['x'] = x
|
|
d['y'] = y
|
|
l = try_read(name, f'select * from labels where x = {x} and y = {y}')
|
|
if l == None:
|
|
d['label'] = None
|
|
d['node'] = None
|
|
else:
|
|
d['label'] = str(l['label'])
|
|
d['node'] = str(l['node']) if l['node'] != None else None
|
|
return d
|
|
|
|
|
|
class Label(object):
|
|
def __init__(self, input: dict[str, Any]) -> None:
|
|
self.type = 'label'
|
|
self.x = float(input['x'])
|
|
self.y = float(input['y'])
|
|
self.label = str(input['label'])
|
|
self.node = str(input['node']) if 'node' in input and input['node'] != None else None
|
|
|
|
self.f_type = f"'{self.type}'"
|
|
self.f_x = self.x
|
|
self.f_y = self.y
|
|
self.f_label = f"'{self.label}'"
|
|
self.f_node = f"'{self.node}'" if self.node != None else 'null'
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'x': self.x, 'y': self.y, 'label': self.label, 'node': self.node }
|
|
|
|
def as_xy_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'x': self.x, 'y': self.y }
|
|
|
|
|
|
def _set_label(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Label(get_label(name, cs.operations[0]['x'], cs.operations[0]['y']))
|
|
raw_new = get_label(name, cs.operations[0]['x'], cs.operations[0]['y'])
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_label_schema(name)
|
|
for key, value in schema.items():
|
|
if key in new_dict and not value['readonly']:
|
|
raw_new[key] = new_dict[key]
|
|
new = Label(raw_new)
|
|
|
|
redo_sql = f"update labels set label = {new.f_label}, node = {new.f_node} where x = {new.f_x} and y = {new.f_y};"
|
|
undo_sql = f"update labels set label = {old.f_label}, node = {old.f_node} where x = {old.f_x} and y = {old.f_y};"
|
|
|
|
redo_cs = g_update_prefix | new.as_dict()
|
|
undo_cs = g_update_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_label(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, _set_label(name, cs))
|
|
|
|
|
|
def _add_label(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
new = Label(cs.operations[0])
|
|
|
|
redo_sql = f"insert into labels (x, y, label, node) values ({new.f_x}, {new.f_y}, {new.f_label}, {new.f_node});"
|
|
undo_sql = f"delete from labels where x = {new.f_x} and y = {new.f_y};"
|
|
|
|
redo_cs = g_add_prefix | new.as_dict()
|
|
undo_cs = g_delete_prefix | new.as_xy_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def add_label(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, _add_label(name, cs))
|
|
|
|
|
|
def _delete_label(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Label(get_label(name, cs.operations[0]['x'], cs.operations[0]['y']))
|
|
|
|
redo_sql = f"delete from labels where x = {old.f_x} and y = {old.f_y};"
|
|
undo_sql = f"insert into labels (x, y, label, node) values ({old.f_x}, {old.f_y}, {old.f_label}, {old.f_node});"
|
|
|
|
redo_cs = g_delete_prefix | old.as_xy_dict()
|
|
undo_cs = g_add_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def delete_label(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, _delete_label(name, cs))
|
|
|
|
|
|
def inp_in_label(line: str) -> str:
|
|
tokens = line.split()
|
|
|
|
num = len(tokens)
|
|
has_desc = tokens[-1].startswith(';')
|
|
num_without_desc = (num - 1) if has_desc else num
|
|
|
|
x = float(tokens[0])
|
|
y = float(tokens[1])
|
|
label = str(tokens[2])
|
|
node = str(tokens[3]) if num >= 4 else None
|
|
node = f"'{node}'" if node != None else 'null'
|
|
|
|
return str(f"insert into labels (x, y, label, node) values ({x}, {y}, '{label}', {node});")
|
|
|
|
|
|
def inp_out_label(name: str) -> list[str]:
|
|
lines = []
|
|
objs = read_all(name, 'select * from labels')
|
|
for obj in objs:
|
|
x = obj['x']
|
|
y = obj['y']
|
|
label = obj['label']
|
|
node = obj['node'] if obj['node'] != None else ''
|
|
lines.append(f'{x} {y} {label} {node}')
|
|
return lines
|
|
|
|
|
|
def unset_label_by_node(name: str, node: str) -> ChangeSet:
|
|
cs = ChangeSet()
|
|
|
|
rows = read_all(name, f"select x, y from labels where node = '{node}'")
|
|
for row in rows:
|
|
cs.append(g_update_prefix | {'type': 'label', 'x': row['x'], 'y': row['y'], 'node': None})
|
|
|
|
return cs
|