from .database import * from .s0_base import is_node from .s32_region_util import to_postgis_polygon from .s32_region import get_region def get_district_metering_area_schema(name: str) -> dict[str, dict[str, Any]]: return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True }, 'boundary' : {'type': 'tuple_list' , 'optional': False , 'readonly': False }, 'parent' : {'type': 'str' , 'optional': True , 'readonly': False }, 'level' : {'type': 'int' , 'optional': False , 'readonly': True } } def get_district_metering_area(name: str, id: str) -> dict[str, Any]: dma = get_region(name, id) if dma == {}: return {} r = try_read(name, f"select * from region_dma where id = '{id}'") if r == None: return {} dma['parent'] = r['parent'] dma['nodes'] = list(eval(r['nodes'])) dma['level'] = 1 if dma['parent'] != None: parent = dma['parent'] while parent != None: parent = read(name, f"select parent from region_dma where id = '{parent}'")['parent'] dma['level'] += 1 return dma def _set_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] new_boundary = cs.operations[0]['boundary'] old_boundary = get_region(name, id)['boundary'] new_parent = cs.operations[0]['parent'] f_new_parent = f"'{new_parent}'" if new_parent != None else 'null' new_nodes = cs.operations[0]['nodes'] str_new_nodes = str(new_nodes).replace("'", "''") old = get_district_metering_area(name, id) old_parent = old['parent'] f_old_parent = f"'{old_parent}'" if old_parent != None else 'null' old_nodes = old['nodes'] str_old_nodes = str(old_nodes).replace("'", "''") redo_sql = f"update region set boundary = st_geomfromtext('{to_postgis_polygon(new_boundary)}') where id = '{id}';" redo_sql += f"update region_dma set parent = {f_new_parent}, nodes = '{str_new_nodes}' where id = '{id}';" undo_sql = f"update region_dma set parent = {f_old_parent}, nodes = '{str_old_nodes}' where id = '{id}';" undo_sql += f"update region set boundary = st_geomfromtext('{to_postgis_polygon(old_boundary)}') where id = '{id}';" redo_cs = g_update_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': new_boundary, 'parent': new_parent, 'nodes': new_nodes } undo_cs = g_update_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': old_boundary, 'parent': old_parent, 'nodes': old_nodes } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def set_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet: ops = cs.operations if len(cs.operations) == 0: return ChangeSet() op = ops[0] if 'id' not in op: return ChangeSet() dma = get_district_metering_area(name, op['id']) if dma == {}: return ChangeSet() if 'boundary' not in op: op['boundary'] = dma['boundary'] else: b = op['boundary'] if len(b) < 4 or b[0] != b[-1]: return ChangeSet() if 'parent' not in op: op['parent'] = dma['parent'] if op['parent'] != None and get_district_metering_area(name, op['parent']) == {}: return ChangeSet() if 'nodes' not in op: op['nodes'] = dma['nodes'] else: for node in op['nodes']: if not is_node(name, node): return ChangeSet() return execute_command(name, _set_district_metering_area(name, cs)) def _add_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] boundary = cs.operations[0]['boundary'] parent = cs.operations[0]['parent'] f_parent = f"'{parent}'" if parent != None else 'null' nodes = cs.operations[0]['nodes'] str_nodes = str(nodes).replace("'", "''") redo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'DMA');" redo_sql += f"insert into region_dma (id, parent, nodes) values ('{id}', {f_parent}, '{str_nodes}');" undo_sql = f"delete from region_dma where id = '{id}';" undo_sql += f"delete from region where id = '{id}';" redo_cs = g_add_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': boundary, 'parent': parent, 'nodes': nodes } undo_cs = g_delete_prefix | { 'type': 'district_metering_area', 'id': id } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def add_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet: ops = cs.operations if len(cs.operations) == 0: return ChangeSet() op = ops[0] if 'id' not in op: return ChangeSet() dma = get_district_metering_area(name, op['id']) if dma != {}: return ChangeSet() if 'boundary' not in op: return ChangeSet() else: b = op['boundary'] if len(b) < 4 or b[0] != b[-1]: return ChangeSet() if 'parent' not in op: op['parent'] = None if op['parent'] != None and get_district_metering_area(name, op['parent']) == {}: return ChangeSet() if 'nodes' not in op: op['nodes'] = [] else: for node in op['nodes']: if not is_node(name, node): return ChangeSet() return execute_command(name, _add_district_metering_area(name, cs)) def _delete_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet: id = cs.operations[0]['id'] dma = get_district_metering_area(name, id) boundary = dma['boundary'] parent = dma['parent'] f_parent = f"'{parent}'" if parent != None else 'null' nodes = dma['nodes'] str_nodes = str(nodes).replace("'", "''") redo_sql = f"delete from region_dma where id = '{id}';" redo_sql += f"delete from region where id = '{id}';" undo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'DMA');" undo_sql += f"insert into region_dma (id, parent, nodes) values ('{id}', {f_parent}, '{str_nodes}');" redo_cs = g_delete_prefix | { 'type': 'district_metering_area', 'id': id } undo_cs = g_add_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': boundary, 'parent': parent, 'nodes': nodes } return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs]) def _has_child(name: str, parent: str) -> bool: return try_read(name, f"select * from region_dma where parent = '{parent}'") != None def is_descendant_of(name: str, descendant: str, ancestor: str) -> bool: parent = descendant while parent != None: parent = read(name, f"select parent from region_dma where id = '{parent}'")['parent'] if parent == ancestor: return True return False def delete_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet: ops = cs.operations if len(cs.operations) == 0: return ChangeSet() op = ops[0] if 'id' not in op: return ChangeSet() dma = get_district_metering_area(name, op['id']) if dma == {}: return ChangeSet() #TODO: cascade ? if _has_child(name, dma['id']): return ChangeSet() return execute_command(name, _delete_district_metering_area(name, cs)) def get_all_district_metering_area_ids(name: str) -> list[str]: ids = [] for row in read_all(name, f"select id from region_dma"): ids.append(row['id']) return ids def get_all_district_metering_areas(name: str) -> list[dict[str, Any]]: result = [] for id in get_all_district_metering_area_ids(name): result.append(get_district_metering_area(name, id)) return result