from .database import * from .s0_base import * from .s24_coordinates import * def get_junction_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'x' : {'type': 'float' , 'optional': False , 'readonly': False}, 'y' : {'type': 'float' , 'optional': False , 'readonly': False}, 'elevation' : {'type': 'float' , 'optional': False , 'readonly': False}, 'links' : {'type': 'str_list' , 'optional': False , 'readonly': True } } def get_junction(name: str, id: str) -> dict[str, Any]: j = try_read(name, f"select * from junctions where id = '{id}'") if j == None: return {} xy = get_node_coord(name, id) d = {} d['id'] = str(j['id']) d['x'] = float(xy['x']) d['y'] = float(xy['y']) d['elevation'] = float(j['elevation']) d['links'] = get_node_links(name, id) return d class Junction(object): def __init__(self, input: dict[str, Any]) -> None: self.type = 'junction' self.id = str(input['id']) self.x = float(input['x']) self.y = float(input['y']) self.elevation = float(input['elevation']) self.f_type = f"'{self.type}'" self.f_id = f"'{self.id}'" self.f_elevation = self.elevation def as_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id, 'x': self.x, 'y': self.y, 'elevation': self.elevation } def as_id_dict(self) -> dict[str, Any]: return { 'type': self.type, 'id': self.id } def _set_junction(name: str, cs: ChangeSet) -> DbChangeSet: old = Junction(get_junction(name, cs.operations[0]['id'])) raw_new = get_junction(name, cs.operations[0]['id']) new_dict = cs.operations[0] schema = get_junction_schema(name) for key, value in schema.items(): if key in new_dict and not value['readonly']: raw_new[key] = new_dict[key] new = Junction(raw_new) redo_sql = f"update junctions set elevation = {new.f_elevation} where id = {new.f_id};" redo_sql += f"\n{sql_update_coord(new.id, new.x, new.y)}" undo_sql = sql_update_coord(old.id, old.x, old.y) undo_sql += f"\nupdate junctions set elevation = {old.f_elevation} where id = {old.f_id};" redo_cs = g_update_prefix | new.as_dict() undo_cs = g_update_prefix | old.as_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_junction(name: str, cs: ChangeSet) -> ChangeSet: if 'id' not in cs.operations[0]: return ChangeSet() if get_junction(name, cs.operations[0]['id']) == {}: return ChangeSet() return execute_command(name, _set_junction(name, cs)) def _add_junction(name: str, cs: ChangeSet) -> DbChangeSet: new = Junction(cs.operations[0]) redo_sql = f"insert into _node (id, type) values ({new.f_id}, {new.f_type});" redo_sql += f"\ninsert into junctions (id, elevation) values ({new.f_id}, {new.f_elevation});" redo_sql += f"\n{sql_insert_coord(new.id, new.x, new.y)}" undo_sql = sql_delete_coord(new.id) undo_sql += f"\ndelete from junctions where id = {new.f_id};" undo_sql += f"\ndelete from _node where id = {new.f_id};" redo_cs = g_add_prefix | new.as_dict() undo_cs = g_delete_prefix | new.as_id_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def add_junction(name: str, cs: ChangeSet) -> ChangeSet: if 'id' not in cs.operations[0]: return ChangeSet() if get_junction(name, cs.operations[0]['id']) != {}: return ChangeSet() return execute_command(name, _add_junction(name, cs)) def _delete_junction(name: str, cs: ChangeSet) -> DbChangeSet: old = Junction(get_junction(name, cs.operations[0]['id'])) redo_sql = sql_delete_coord(old.id) redo_sql += f"\ndelete from junctions where id = {old.f_id};" redo_sql += f"\ndelete from _node where id = {old.f_id};" undo_sql = f"insert into _node (id, type) values ({old.f_id}, {old.f_type});" undo_sql += f"\ninsert into junctions (id, elevation) values ({old.f_id}, {old.f_elevation});" undo_sql += f"\n{sql_insert_coord(old.id, old.x, old.y)}" redo_cs = g_delete_prefix | old.as_id_dict() undo_cs = g_add_prefix | old.as_dict() return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def delete_junction(name: str, cs: ChangeSet) -> ChangeSet: if 'id' not in cs.operations[0]: return ChangeSet() if get_junction(name, cs.operations[0]['id']) == {}: return ChangeSet() return execute_command(name, _delete_junction(name, cs)) #-------------------------------------------------------------- # [EPA2] # [IN] # id elev. (demand) (demand pattern) ;desc # [OUT] # id elev. ;desc #-------------------------------------------------------------- # [EPA3] # [IN] # id elev. (demand) (demand pattern) # [OUT] # id elev. * * minpressure fullpressure #-------------------------------------------------------------- def inp_in_junction(line: str, demand_outside: bool) -> str: tokens = line.split() num = len(tokens) has_desc = tokens[-1].startswith(';') num_without_desc = (num - 1) if has_desc else num id = str(tokens[0]) elevation = float(tokens[1]) demand = float(tokens[2]) if num_without_desc >= 3 and tokens[2] != '*' else None pattern = str(tokens[3]) if num_without_desc >= 4 and tokens[3] != '*' else None pattern = f"'{pattern}'" if pattern != None else 'null' desc = str(tokens[-1]) if has_desc else None sql = f"insert into _node (id, type) values ('{id}', 'junction');insert into junctions (id, elevation) values ('{id}', {elevation});" if demand != None and demand_outside == False: sql += f"insert into demands (junction, demand, pattern) values ('{id}', {demand}, {pattern});" return str(sql) def inp_out_junction(name: str) -> list[str]: lines = [] objs = read_all(name, 'select * from junctions') for obj in objs: id = obj['id'] elev = obj['elevation'] desc = ';' lines.append(f'{id} {elev} {desc}') return lines