from .database import * from .s0_base import get_node_links def _polygon_to_nodes(polygon: str) -> list[tuple[float, float]]: boundary = polygon.removeprefix('POLYGON((').removesuffix('))').split(',') xys = [] for pt in boundary: xy = pt.split(' ') xys.append((float(xy[0]), float(xy[1]))) return xys def calculate_virtual_district(name: str, centers: list[str]) -> dict[str, Any]: write(name, 'drop table if exists vd_graph') write(name, 'create table vd_graph (id serial, source integer, target integer, cost numeric)') # map node name to index i = 0 isolated_nodes = [] node_index: dict[str, int] = {} for row in read_all(name, 'select id from _node'): node = str(row['id']) if get_node_links(name, node) == []: isolated_nodes.append(node) continue i += 1 node_index[node] = i # build topology graph pipes = read_all(name, 'select node1, node2, length from pipes') for pipe in pipes: source = node_index[str(pipe['node1'])] target = node_index[str(pipe['node2'])] cost = float(pipe['length']) write(name, f"insert into vd_graph (source, target, cost) values ({source}, {target}, {cost})") pumps = read_all(name, 'select node1, node2 from pumps') for pump in pumps: source = node_index[str(pump['node1'])] target = node_index[str(pump['node2'])] write(name, f"insert into vd_graph (source, target, cost) values ({source}, {target}, 0.0)") valves = read_all(name, 'select node1, node2 from valves') for valve in valves: source = node_index[str(valve['node1'])] target = node_index[str(valve['node2'])] write(name, f"insert into vd_graph (source, target, cost) values ({source}, {target}, 0.0)") # dijkstra distance node_distance: dict[str, dict[str, Any]] = {} for center in centers: for node, index in node_index.items(): if node == center: continue # TODO: check none distance = float(read(name, f"select max(agg_cost) as distance from pgr_dijkstraCost('select id, source, target, cost from vd_graph', {index}, {node_index[center]}, false)")['distance']) if node not in node_distance: node_distance[node] = { 'center': center, 'distance' : distance } elif distance < node_distance[node]['distance']: node_distance[node] = { 'center': center, 'distance' : distance } # destroy topology graph write(name, 'drop table vd_graph') # reorganize the distance result center_node: dict[str, list[str]] = {} for node, value in node_distance.items(): if value['center'] not in center_node: center_node[value['center']] = [] center_node[value['center']].append(node) # write(name, 'delete from virtual_district') vds: list[dict[str, Any]] = [] for center, value in center_node.items(): write(name, f'drop table if exists vd_{center}') write(name, f'create table vd_{center} (node varchar(32) primary key references _node(id))') for node in value: write(name, f"insert into vd_{center} values ('{node}')") # TODO: check none boundary = read(name, f'select st_astext(st_convexhull(st_collect(array(select coord from coordinates where node in (select * from vd_{center}))))) as boundary' )['boundary'] # write(name, f"insert into virtual_district (id, center, boundary) values ('vd_{center}', '{center}', st_geomfromtext('{boundary}'))") xys = _polygon_to_nodes(boundary) vds.append({ 'center': center, 'nodes': value, 'boundary': xys }) write(name, f'drop table vd_{center}') return { 'virtual_districts': vds, 'isolated_nodes': isolated_nodes }