from .database import * from .s0_base import is_node from .s32_region_util import to_postgis_polygon from .s32_region import get_region def get_service_area_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'boundary' : {'type': 'tuple_list' , 'optional': False , 'readonly': False }, 'source' : {'type': 'str' , 'optional': False , 'readonly': False }, 'time_index' : {'type': 'int' , 'optional': False , 'readonly': False } } def get_service_area(name: str, id: str) -> dict[str, Any]: sa = get_region(name, id) if sa == {}: return {} r = try_read(name, f"select * from region_sa where id = '{id}'") if r == None: return {} sa['source'] = r['source'] sa['nodes'] = list(eval(r['nodes'])) sa['time_index'] = r['time_index'] return sa def _set_service_area(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] new_boundary = cs.operations[0]['boundary'] old_boundary = get_region(name, id)['boundary'] new_source = cs.operations[0]['source'] f_new_source = f"'{new_source}'" new_nodes = cs.operations[0]['nodes'] str_new_nodes = str(new_nodes).replace("'", "''") new_time_index = cs.operations[0]['time_index'] old = get_service_area(name, id) old_source = old['source'] f_old_source = f"'{old_source}'" old_nodes = old['nodes'] str_old_nodes = str(old_nodes).replace("'", "''") old_time_index = old['time_index'] redo_sql = f"update region set boundary = st_geomfromtext('{to_postgis_polygon(new_boundary)}') where id = '{id}';" redo_sql += f"update region_sa set time_index = {new_time_index}, source = {f_new_source}, nodes = '{str_new_nodes}' where id = '{id}';" undo_sql = f"update region_sa set time_index = {old_time_index}, source = {f_old_source}, nodes = '{str_old_nodes}' where id = '{id}';" undo_sql += f"update region set boundary = st_geomfromtext('{to_postgis_polygon(old_boundary)}') where id = '{id}';" redo_cs = g_update_prefix | { 'type': 'service_area', 'id': id, 'boundary': new_boundary, 'time_index': new_time_index, 'source': new_source, 'nodes': new_nodes } undo_cs = g_update_prefix | { 'type': 'service_area', 'id': id, 'boundary': old_boundary, 'time_index': old_time_index, 'source': old_source, 'nodes': old_nodes } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_service_area(name: str, cs: ChangeSet) -> ChangeSet: ops = cs.operations if len(cs.operations) == 0: return ChangeSet() op = ops[0] if 'id' not in op: return ChangeSet() sa = get_service_area(name, op['id']) if sa == {}: return ChangeSet() if 'boundary' not in op: op['boundary'] = sa['boundary'] else: b = op['boundary'] if len(b) < 4 or b[0] != b[-1]: return ChangeSet() if 'time_index' not in op: op['time_index'] = sa['time_index'] if 'source' not in op: op['source'] = sa['source'] if not is_node(name, op['source']): return ChangeSet() if 'nodes' not in op: op['nodes'] = sa['nodes'] else: for node in op['nodes']: if not is_node(name, node): return ChangeSet() return execute_command(name, _set_service_area(name, cs)) def _add_service_area(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] boundary = cs.operations[0]['boundary'] time_index = cs.operations[0]['time_index'] source = cs.operations[0]['source'] f_source = f"'{source}'" nodes = cs.operations[0]['nodes'] str_nodes = str(nodes).replace("'", "''") redo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'SA');" redo_sql += f"insert into region_sa (id, time_index, source, nodes) values ('{id}', {time_index}, {f_source}, '{str_nodes}');" undo_sql = f"delete from region_sa where id = '{id}';" undo_sql += f"delete from region where id = '{id}';" redo_cs = g_add_prefix | { 'type': 'service_area', 'id': id, 'boundary': boundary, 'time_index': time_index, 'source': source, 'nodes': nodes } undo_cs = g_delete_prefix | { 'type': 'service_area', 'id': id } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def add_service_area(name: str, cs: ChangeSet) -> ChangeSet: ops = cs.operations if len(cs.operations) == 0: return ChangeSet() op = ops[0] if 'id' not in op: return ChangeSet() sa = get_service_area(name, op['id']) if sa != {}: return ChangeSet() if 'boundary' not in op: return ChangeSet() else: b = op['boundary'] if len(b) < 4 or b[0] != b[-1]: return ChangeSet() if 'time_index' not in op: return ChangeSet() if 'source' not in op: return ChangeSet() if not is_node(name, op['source']): return ChangeSet() if 'nodes' not in op: op['nodes'] = [] else: for node in op['nodes']: if not is_node(name, node): return ChangeSet() return execute_command(name, _add_service_area(name, cs)) def _delete_service_area(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] sa = get_service_area(name, id) boundary = sa['boundary'] time_index = sa['time_index'] source = sa['source'] f_source = f"'{source}'" nodes = sa['nodes'] str_nodes = str(nodes).replace("'", "''") redo_sql = f"delete from region_sa where id = '{id}';" redo_sql += f"delete from region where id = '{id}';" undo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'SA');" undo_sql += f"insert into region_sa (id, time_index, source, nodes) values ('{id}', {time_index}, {f_source}, '{str_nodes}');" redo_cs = g_delete_prefix | { 'type': 'service_area', 'id': id } undo_cs = g_add_prefix | { 'type': 'service_area', 'id': id, 'boundary': boundary, 'time_index': time_index, 'source': source, 'nodes': nodes } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def delete_service_area(name: str, cs: ChangeSet) -> ChangeSet: ops = cs.operations if len(cs.operations) == 0: return ChangeSet() op = ops[0] if 'id' not in op: return ChangeSet() sa = get_service_area(name, op['id']) if sa == {}: return ChangeSet() return execute_command(name, _delete_service_area(name, cs)) def get_all_service_area_ids(name: str) -> list[str]: ids = [] for row in read_all(name, f"select id from region_sa"): ids.append(row['id']) return ids def get_all_service_areas(name: str) -> list[dict[str, Any]]: result = [] for id in get_all_service_area_ids(name): result.append(get_service_area(name, id)) return result