Files
TJWaterServer/api/s12_curves.py
2024-01-13 15:02:22 +08:00

187 lines
6.2 KiB
Python

from .database import *
CURVE_TYPE_PUMP = 'PUMP'
CURVE_TYPE_EFFICIENCY = 'EFFICIENCY'
CURVE_TYPE_VOLUME = 'VOLUME'
CURVE_TYPE_HEADLOSS = 'HEADLOSS'
curve_types = [CURVE_TYPE_PUMP, CURVE_TYPE_EFFICIENCY, CURVE_TYPE_VOLUME, CURVE_TYPE_HEADLOSS]
def get_curve_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
'c_type' : {'type': 'str' , 'optional': False , 'readonly': False},
'coords' : {'type': 'list' , 'optional': False , 'readonly': False,
'element': { 'x' : {'type': 'float' , 'optional': False , 'readonly': False },
'y' : {'type': 'float' , 'optional': False , 'readonly': False } }}}
def get_curve(name: str, id: str) -> dict[str, Any]:
c_one = try_read(name, f"select * from _curve where id = '{id}'")
if c_one == None:
return {}
cus = read_all(name, f"select * from curves where id = '{id}' order by _order")
cs = []
for r in cus:
cs.append({ 'x': float(r['x']), 'y': float(r['y']) })
d = {}
d['id'] = id
d['c_type'] = c_one['type']
d['coords'] = cs
return d
def _set_curve(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
f_id = f"'{id}'"
old = get_curve(name, id)
old_f_type = f"'{old['c_type']}'"
new = { 'id': id }
if 'coords' in cs.operations[0]:
new['coords'] = cs.operations[0]['coords']
else:
new['coords'] = old['coords']
if 'c_type' in cs.operations[0]:
new['c_type'] = cs.operations[0]['c_type']
else:
new['c_type'] = old['c_type']
new_f_type = f"'{new['c_type']}'"
# TODO: transaction ?
redo_sql = f"delete from curves where id = {f_id};"
redo_sql += f"\nupdate _curve set type = {new_f_type} where id = {f_id};"
for xy in new['coords']:
f_x, f_y = xy['x'], xy['y']
redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
undo_sql = f"delete from curves where id = {f_id};"
undo_sql += f"\nupdate _curve set type = {old_f_type} where id = {f_id};"
for xy in old['coords']:
f_x, f_y = xy['x'], xy['y']
undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
redo_cs = g_update_prefix | { 'type': 'curve' } | new
undo_cs = g_update_prefix | { 'type': 'curve' } | old
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def set_curve(name: str, cs: ChangeSet) -> ChangeSet:
if 'id' not in cs.operations[0]:
return ChangeSet()
if get_curve(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, _set_curve(name, cs))
def _add_curve(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
f_id = f"'{id}'"
new = { 'id': id, 'c_type': cs.operations[0]['c_type'], 'coords': [] }
new_f_type = f"'{new['c_type']}'"
# TODO: transaction ?
redo_sql = f"insert into _curve (id, type) values ({f_id}, {new_f_type});"
for xy in cs.operations[0]['coords']:
x, y = float(xy['x']), float(xy['y'])
f_x, f_y = x, y
redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
new['coords'].append({ 'x': x, 'y': y })
undo_sql = f"delete from curves where id = {f_id};"
undo_sql += f"\ndelete from _curve where id = {f_id};"
redo_cs = g_add_prefix | { 'type': 'curve' } | new
undo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def add_curve(name: str, cs: ChangeSet) -> ChangeSet:
if 'id' not in cs.operations[0]:
return ChangeSet()
if get_curve(name, cs.operations[0]['id']) != {}:
return ChangeSet()
return execute_command(name, _add_curve(name, cs))
def _delete_curve(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
f_id = f"'{id}'"
old = get_curve(name, id)
old_f_type = f"'{old['c_type']}'"
redo_sql = f"delete from curves where id = {f_id};"
redo_sql += f"\ndelete from _curve where id = {f_id};"
# TODO: transaction ?
undo_sql = f"insert into _curve (id, type) values ({f_id}, {old_f_type});"
for xy in old['coords']:
f_x, f_y = xy['x'], xy['y']
undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});"
redo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id }
undo_cs = g_add_prefix | { 'type': 'curve' } | old
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def delete_curve(name: str, cs: ChangeSet) -> ChangeSet:
if 'id' not in cs.operations[0]:
return ChangeSet()
if get_curve(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, _delete_curve(name, cs))
#--------------------------------------------------------------
# [EPA2][IN][OUT]
# ;type: desc
# id x y
#--------------------------------------------------------------
#--------------------------------------------------------------
# [EPA3][IN][OUT]
# id type
# id x y
#--------------------------------------------------------------
def inp_in_curve(line: str) -> str:
tokens = line.split()
return str(f"insert into curves (id, x, y) values ('{tokens[0]}', {float(tokens[1])}, {float(tokens[2])});")
def inp_out_curve(name: str) -> list[str]:
lines = []
types = read_all(name, f"select * from _curve")
for type in types:
id = type['id']
# ;type: desc
lines.append(f";{type['type']}:")
objs = read_all(name, f"select * from curves where id = '{id}' order by _order")
for obj in objs:
id = obj['id']
x = obj['x']
y = obj['y']
lines.append(f'{id} {x} {y}')
return lines
def inp_out_curve_v3(name: str) -> list[str]:
lines = []
types = read_all(name, f"select * from _curve")
for type in types:
id = type['id']
# id type
lines.append(f"{id} {type['type']}")
objs = read_all(name, f"select * from curves where id = '{id}' order by _order")
for obj in objs:
id = obj['id']
x = obj['x']
y = obj['y']
lines.append(f'{id} {x} {y}')
return lines