186 lines
6.4 KiB
Python
186 lines
6.4 KiB
Python
from .database import *
|
|
|
|
CURVE_TYPE_PUMP = 'PUMP'
|
|
CURVE_TYPE_EFFICIENCY = 'EFFICIENCY'
|
|
CURVE_TYPE_VOLUME = 'VOLUME'
|
|
CURVE_TYPE_HEADLOSS = 'HEADLOSS'
|
|
|
|
def get_curve_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
|
|
'c_type' : {'type': 'str' , 'optional': False , 'readonly': False},
|
|
'coords' : {'type': 'list' , 'optional': False , 'readonly': False,
|
|
'element': { 'x' : {'type': 'float' , 'optional': False , 'readonly': False },
|
|
'y' : {'type': 'float' , 'optional': False , 'readonly': False } }}}
|
|
|
|
|
|
def get_curve(name: str, id: str) -> dict[str, Any]:
|
|
c_one = read(name, f"select * from _curve where id = '{id}'")
|
|
cus = read_all(name, f"select * from curves where id = '{id}' order by _order")
|
|
cs = []
|
|
for r in cus:
|
|
cs.append({ 'x': float(r['x']), 'y': float(r['y']) })
|
|
d = {}
|
|
d['id'] = id
|
|
d['c_type'] = c_one['type']
|
|
d['coords'] = cs
|
|
return d
|
|
|
|
|
|
def set_curve_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
id = cs.operations[0]['id']
|
|
f_id = f"'{id}'"
|
|
|
|
old = get_curve(name, id)
|
|
old_f_type = f"'{old['c_type']}'"
|
|
|
|
new = { 'id': id }
|
|
if 'coords' in cs.operations[0]:
|
|
new['coords'] = cs.operations[0]['coords']
|
|
else:
|
|
new['coords'] = old['coords']
|
|
if 'c_type' in cs.operations[0]:
|
|
new['c_type'] = cs.operations[0]['c_type']
|
|
else:
|
|
new['c_type'] = old['c_type']
|
|
new_f_type = f"'{new['c_type']}'"
|
|
|
|
# TODO: transaction ?
|
|
redo_sql = f"delete from curves where id = {f_id};"
|
|
redo_sql += f"\nupdate _curve set type = {new_f_type} where id = {f_id};"
|
|
for xy in new['coords']:
|
|
f_x, f_y = xy['x'], xy['y']
|
|
redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
|
|
|
|
undo_sql = f"delete from curves where id = {f_id};"
|
|
undo_sql += f"\nupdate _curve set type = {old_f_type} where id = {f_id};"
|
|
for xy in old['coords']:
|
|
f_x, f_y = xy['x'], xy['y']
|
|
undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
|
|
|
|
redo_cs = g_update_prefix | { 'type': 'curve' } | new
|
|
undo_cs = g_update_prefix | { 'type': 'curve' } | old
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_curve(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, set_curve_cmd(name, cs))
|
|
|
|
|
|
def add_curve_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
id = cs.operations[0]['id']
|
|
f_id = f"'{id}'"
|
|
|
|
new = { 'id': id, 'c_type': cs.operations[0]['c_type'], 'coords': [] }
|
|
new_f_type = f"'{new['c_type']}'"
|
|
|
|
# TODO: transaction ?
|
|
redo_sql = f"insert into _curve (id, type) values ({f_id}, {new_f_type});"
|
|
for xy in cs.operations[0]['coords']:
|
|
x, y = float(xy['x']), float(xy['y'])
|
|
f_x, f_y = x, y
|
|
redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
|
|
new['coords'].append({ 'x': x, 'y': y })
|
|
|
|
undo_sql = f"delete from curves where id = {f_id};"
|
|
undo_sql += f"\ndelete from _curve where id = {f_id};"
|
|
|
|
redo_cs = g_add_prefix | { 'type': 'curve' } | new
|
|
undo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id }
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def add_curve(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, add_curve_cmd(name, cs))
|
|
|
|
|
|
def delete_curve_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
id = cs.operations[0]['id']
|
|
f_id = f"'{id}'"
|
|
|
|
old = get_curve(name, id)
|
|
old_f_type = f"'{old['c_type']}'"
|
|
|
|
redo_sql = f"delete from curves where id = {f_id};"
|
|
redo_sql += f"\ndelete from _curve where id = {f_id};"
|
|
|
|
# TODO: transaction ?
|
|
undo_sql = f"insert into _curve (id, type) values ({f_id}, {old_f_type});"
|
|
for xy in old['coords']:
|
|
f_x, f_y = xy['x'], xy['y']
|
|
undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
|
|
|
|
redo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id }
|
|
undo_cs = g_add_prefix | { 'type': 'curve' } | old
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def delete_curve(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, delete_curve_cmd(name, cs))
|
|
|
|
|
|
#--------------------------------------------------------------
|
|
# [EPA2][IN][OUT]
|
|
# ;type: desc
|
|
# id x y
|
|
#--------------------------------------------------------------
|
|
# [EPA3][IN][OUT]
|
|
# id type
|
|
# id x y
|
|
#--------------------------------------------------------------
|
|
def inp_in_curve(section: list[str]) -> ChangeSet:
|
|
types = {}
|
|
descs = {}
|
|
curves: dict[str, list[dict[str, float]]] = {}
|
|
|
|
count = len(section)
|
|
for i in range(0, count):
|
|
if section[i].startswith(';'):
|
|
# this is description
|
|
type_plus_desc = section[i].removeprefix(';')
|
|
type_plus_desc_tokens = type_plus_desc.split(':')
|
|
next = i + 1
|
|
if next < count and section[next].startswith(';') == False:
|
|
next_tokens = section[next].split()
|
|
types[next_tokens[0]] = type_plus_desc_tokens[0].strip().upper()
|
|
if len(type_plus_desc_tokens) > 1:
|
|
descs[next_tokens[0]] = type_plus_desc_tokens[1].strip()
|
|
continue
|
|
|
|
tokens = section[i].split()
|
|
|
|
# for EPA3
|
|
if tokens[1] == 'PUMP' or tokens[1] == 'EFFICIENCY' or tokens[1] == 'VOLUME' or tokens[1] == 'HEADLOSS':
|
|
types[tokens[0]] = tokens[1]
|
|
continue
|
|
|
|
if tokens[0] not in curves:
|
|
curves[tokens[0]] = []
|
|
curves[tokens[0]].append({'x': float(tokens[1]), 'y': float(tokens[2])})
|
|
|
|
cs = ChangeSet()
|
|
for id, coords in curves.items():
|
|
c_type = types[id] if id in types else CURVE_TYPE_PUMP
|
|
cs.append(g_add_prefix | {'type': 'curve', 'id' : id, 'c_type': c_type, 'coords' : coords})
|
|
|
|
#print(descs)
|
|
return cs
|
|
|
|
|
|
def inp_out_curve(name: str) -> list[str]:
|
|
lines = []
|
|
types = read_all(name, f"select * from _curve")
|
|
for type in types:
|
|
id = type['id']
|
|
# ;type: desc
|
|
lines.append(f";{type['type']}:")
|
|
objs = read_all(name, f"select * from curves where id = '{id}' order by _order")
|
|
for obj in objs:
|
|
id = obj['id']
|
|
x = obj['x']
|
|
y = obj['y']
|
|
lines.append(f'{id} {x} {y}')
|
|
return lines
|