from .database import * CURVE_TYPE_PUMP = 'PUMP' CURVE_TYPE_EFFICIENCY = 'EFFICIENCY' CURVE_TYPE_VOLUME = 'VOLUME' CURVE_TYPE_HEADLOSS = 'HEADLOSS' def get_curve_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'c_type' : {'type': 'str' , 'optional': False , 'readonly': False}, 'coords' : {'type': 'list' , 'optional': False , 'readonly': False, 'element': { 'x' : {'type': 'float' , 'optional': False , 'readonly': False }, 'y' : {'type': 'float' , 'optional': False , 'readonly': False } }}} def get_curve(name: str, id: str) -> dict[str, Any]: c_one = read(name, f"select * from _curve where id = '{id}'") cus = read_all(name, f"select * from curves where id = '{id}' order by _order") cs = [] for r in cus: cs.append({ 'x': float(r['x']), 'y': float(r['y']) }) d = {} d['id'] = id d['c_type'] = c_one['type'] d['coords'] = cs return d def set_curve_cmd(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] f_id = f"'{id}'" old = get_curve(name, id) old_f_type = f"'{old['c_type']}'" new = { 'id': id } if 'coords' in cs.operations[0]: new['coords'] = cs.operations[0]['coords'] else: new['coords'] = old['coords'] if 'c_type' in cs.operations[0]: new['c_type'] = cs.operations[0]['c_type'] else: new['c_type'] = old['c_type'] new_f_type = f"'{new['c_type']}'" # TODO: transaction ? redo_sql = f"delete from curves where id = {f_id};" redo_sql += f"\nupdate _curve set type = {new_f_type} where id = {f_id};" for xy in new['coords']: f_x, f_y = xy['x'], xy['y'] redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" undo_sql = f"delete from curves where id = {f_id};" undo_sql += f"\nupdate _curve set type = {old_f_type} where id = {f_id};" for xy in old['coords']: f_x, f_y = xy['x'], xy['y'] undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" redo_cs = g_update_prefix | { 'type': 'curve' } | new undo_cs = g_update_prefix | { 'type': 'curve' } | old return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_curve(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, set_curve_cmd(name, cs)) def add_curve_cmd(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] f_id = f"'{id}'" new = { 'id': id, 'c_type': cs.operations[0]['c_type'], 'coords': [] } new_f_type = f"'{new['c_type']}'" # TODO: transaction ? redo_sql = f"insert into _curve (id, type) values ({f_id}, {new_f_type});" for xy in cs.operations[0]['coords']: x, y = float(xy['x']), float(xy['y']) f_x, f_y = x, y redo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" new['coords'].append({ 'x': x, 'y': y }) undo_sql = f"delete from curves where id = {f_id};" undo_sql += f"\ndelete from _curve where id = {f_id};" redo_cs = g_add_prefix | { 'type': 'curve' } | new undo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def add_curve(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, add_curve_cmd(name, cs)) def delete_curve_cmd(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] f_id = f"'{id}'" old = get_curve(name, id) old_f_type = f"'{old['c_type']}'" redo_sql = f"delete from curves where id = {f_id};" redo_sql += f"\ndelete from _curve where id = {f_id};" # TODO: transaction ? undo_sql = f"insert into _curve (id, type) values ({f_id}, {old_f_type});" for xy in old['coords']: f_x, f_y = xy['x'], xy['y'] undo_sql += f"\ninsert into curves (id, x, y) values ({f_id}, {f_x}, {f_y});" redo_cs = g_delete_prefix | { 'type': 'curve' } | { 'id' : id } undo_cs = g_add_prefix | { 'type': 'curve' } | old return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def delete_curve(name: str, cs: ChangeSet) -> ChangeSet: return execute_command(name, delete_curve_cmd(name, cs)) #-------------------------------------------------------------- # [EPANET2] # id x y #-------------------------------------------------------------- def inp_in_curve(section: list[str]) -> ChangeSet: types = {} descs = {} curves: dict[str, list[dict[str, float]]] = {} count = len(section) for i in range(0, count): if section[i].startswith(';'): # this is description type_plus_desc = section[i].removeprefix(';') type_plus_desc_tokens = type_plus_desc.split(':') next = i + 1 if next < count and section[next].startswith(';') == False: next_tokens = section[next].split() types[next_tokens[0]] = type_plus_desc_tokens[0].strip().upper() if len(type_plus_desc_tokens) > 1: descs[next_tokens[0]] = type_plus_desc_tokens[1].strip() continue tokens = section[i].split() if tokens[0] not in curves: curves[tokens[0]] = [] curves[tokens[0]].append({'x': float(tokens[1]), 'y': float(tokens[2])}) cs = ChangeSet() for id, coords in curves.items(): c_type = types[id] if id in types else CURVE_TYPE_PUMP cs.append(g_add_prefix | {'type': 'curve', 'id' : id, 'c_type': c_type, 'coords' : coords}) #print(descs) return cs def inp_out_curve(name: str) -> list[str]: lines = [] types = read_all(name, f"select * from _curve") for type in types: id = type['id'] # ;type: desc lines.append(f";{type['type']}:") objs = read_all(name, f"select * from curves where id = '{id}' order by _order") for obj in objs: id = obj['id'] x = obj['x'] y = obj['y'] lines.append(f'{id} {x} {y}') return lines