from .operation import * CURVE_TYPE_PUMP = 'PUMP' CURVE_TYPE_EFFICIENCY = 'EFFICIENCY' CURVE_TYPE_VOLUME = 'VOLUME' CURVE_TYPE_HEADLOSS = 'HEADLOSS' def get_curve_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'c_type' : {'type': 'str' , 'optional': False , 'readonly': False}, 'coords' : {'type': 'list' , 'optional': False , 'readonly': False, 'element': { 'x' : {'type': 'float' , 'optional': False , 'readonly': False }, 'y' : {'type': 'float' , 'optional': False , 'readonly': False } }}} def get_curve(name: str, id: str) -> dict[str, Any]: c_one = read(name, f"select * from _curve where id = '{id}'") cus = read_all(name, f"select * from curves where id = '{id}' order by _order") cs = [] for r in cus: cs.append({ 'x': float(r['x']), 'y': float(r['y']) }) d = {} d['id'] = id d['c_type'] = c_one['type'] d['coords'] = cs return d def set_curve_cache(name: str, cs: ChangeSet) -> SqlChangeSet: id = cs.operations[0]['id'] f_id = f"'{id}'" old = get_curve(name, id) old_f_type = f"'{old['c_type']}'" new = { 'id': id } if 'coords' in cs.operations[0]: new['coords'] = cs.operations[0]['coords'] else: new['coords'] = old['coords'] if 'c_type' in cs.operations[0]: new['c_type'] = cs.operations[0]['c_type'] else: new['c_type'] = old['c_type'] new_f_type = f"'{new['c_type']}'" # TODO: transaction ? redo_sql = f"delete from curves where id = {f_id};" redo_sql += f"\nupdate _curve set type = {new_f_type} where id = {f_id};" for xy in new['coords']: f_x, f_y = xy['x'], xy['y'] redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" undo_sql = f"delete from curves where id = {f_id};" undo_sql += f"\nupdate _curve set type = {old_f_type} where id = {f_id};" for xy in old['coords']: f_x, f_y = xy['x'], xy['y'] undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" redo_cs = g_update_prefix | { 'type': 'curve' } | new undo_cs = g_update_prefix | { 'type': 'curve' } | old return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def set_curve(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, set_curve_cache(name, cs)) def add_curve_cache(name: str, cs: ChangeSet) -> SqlChangeSet: id = cs.operations[0]['id'] f_id = f"'{id}'" new = { 'id': id, 'c_type': cs.operations[0]['c_type'], 'coords': [] } new_f_type = f"'{new['c_type']}'" # TODO: transaction ? redo_sql = f"insert into _curve (id, type) values ({f_id}, {new_f_type});" for xy in cs.operations[0]['coords']: x, y = float(xy['x']), float(xy['y']) f_x, f_y = x, y redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" new['coords'].append({ 'x': x, 'y': y }) undo_sql = f"delete from curves where id = {f_id};" undo_sql += f"\ndelete from _curve where id = {f_id};" redo_cs = g_add_prefix | { 'type': 'curve' } | new undo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id } return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def add_curve(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, add_curve_cache(name, cs)) def delete_curve_cache(name: str, cs: ChangeSet) -> SqlChangeSet: id = cs.operations[0]['id'] f_id = f"'{id}'" old = get_curve(name, id) old_f_type = f"'{old['c_type']}'" redo_sql = f"delete from curves where id = {f_id};" redo_sql += f"\ndelete from _curve where id = {f_id};" # TODO: transaction ? undo_sql = f"insert into _curve (id, type) values ({f_id}, {old_f_type});" for xy in old['coords']: f_x, f_y = xy['x'], xy['y'] undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" redo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id } undo_cs = g_add_prefix | { 'type': 'curve' } | old return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs) def delete_curve(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, delete_curve_cache(name, cs))