import ctypes import platform import os import math from typing import Any from .s0_base import get_node_links, get_link_nodes, is_pipe from .s5_pipes import get_pipe from .database import read, try_read, read_all, write from .s24_coordinates import node_has_coord, get_node_coord def from_postgis_polygon(polygon: str) -> list[tuple[float, float]]: boundary = polygon.lower().removeprefix('polygon((').removesuffix('))').split(',') xys = [] for pt in boundary: xy = pt.split(' ') xys.append((float(xy[0]), float(xy[1]))) return xys def to_postgis_polygon(boundary: list[tuple[float, float]]) -> str: polygon = '' for pt in boundary: polygon += f'{pt[0]} {pt[1]},' return f'polygon(({polygon[:-1]}))' def to_postgis_linestring(boundary: list[tuple[float, float]]) -> str: line = '' for pt in boundary: line += f'{pt[0]} {pt[1]},' return f'linestring({line[:-1]})' def get_nodes_in_boundary(name: str, boundary: list[tuple[float, float]]) -> list[str]: api = 'get_nodes_in_boundary' write(name, f"delete from temp_region where id = '{api}'") write(name, f"insert into temp_region (id, boundary) values ('{api}', '{to_postgis_polygon(boundary)}')") nodes: list[str] = [] for row in read_all(name, f"select c.node from coordinates as c, temp_region as r where ST_Intersects(c.coord, r.boundary) and r.id = '{api}'"): nodes.append(row['node']) write(name, f"delete from temp_region where id = '{api}'") return nodes def _get_links_on_boundary(name: str, nodes: list[str]) -> list[str]: links: list[str] = [] for node in nodes: node_links = get_node_links(name, node) for link in node_links: if link in links: continue link_nodes = get_link_nodes(name, link) if link_nodes[0] in nodes and link_nodes[1] not in nodes: links.append(link) elif link_nodes[0] not in nodes and link_nodes[1] in nodes: links.append(link) return links # if region is general or wda => get_nodes_in_boundary # if region is dma, sa or vd => get stored nodes in table def get_nodes_in_region(name: str, region_id: str) -> list[str]: nodes: list[str] = [] row = try_read(name, f"select r_type from region where id = '{region_id}'") if row == None: return nodes r_type = str(row['r_type']) if r_type == 'DMA' or r_type == 'SA' or r_type == 'VD': table = '' if r_type == 'DMA': table = 'region_dma' elif r_type == 'SA': table = 'region_sa' elif r_type == 'VD': table = 'region_vd' if table != '': row = try_read(name, f"select nodes from {table} where id = '{region_id}'") if row != None: nodes = eval(str(row['nodes'])) if nodes == []: for row in read_all(name, f"select c.node from coordinates as c, region as r where ST_Intersects(c.coord, r.boundary) and r.id = '{region_id}'"): nodes.append(row['node']) return nodes def get_links_on_region_boundary(name: str, region_id: str) -> list[str]: nodes = get_nodes_in_region(name, region_id) print(nodes) return _get_links_on_boundary(name, nodes) def calculate_convex_hull(name: str, nodes: list[str]) -> list[tuple[float, float]]: write(name, f'delete from temp_node') for node in nodes: write(name, f"insert into temp_node values ('{node}')") # TODO: check none polygon = read(name, f'select st_astext(st_convexhull(st_collect(array(select coord from coordinates where node in (select * from temp_node))))) as boundary' )['boundary'] write(name, f'delete from temp_node') return from_postgis_polygon(polygon) def _verify_platform(): _platform = platform.system() if _platform != "Windows": raise Exception(f'Platform {_platform} unsupported (not yet)') def _normal(v: tuple[float, float]) -> tuple[float, float]: l = math.sqrt(v[0] * v[0] + v[1] * v[1]) return (v[0] / l, v[1] / l) def _angle(v: tuple[float, float]) -> float: if v[0] >= 0 and v[1] >= 0: return math.asin(v[1]) elif v[0] <= 0 and v[1] >= 0: return math.pi - math.asin(v[1]) elif v[0] <= 0 and v[1] <= 0: return math.asin(-v[1]) + math.pi elif v[0] >= 0 and v[1] <= 0: return math.pi * 2 - math.asin(-v[1]) return 0 def _angle_of_node_link(node: str, link: str, nodes, links) -> float: n1 = node n2 = links[link]['node1'] if n1 == links[link]['node2'] else links[link]['node2'] x1, y1 = nodes[n1]['x'], nodes[n1]['y'] x2, y2 = nodes[n2]['x'], nodes[n2]['y'] if y1 == y2: v = ((x2 - x1) / abs(x2 - x1), 0.0) else: v = _normal((x2 - x1, y2 - y1)) return _angle(v) class Topology: def __init__(self, db: str, nodes: list[str]) -> None: self._nodes: dict[str, Any] = {} self._max_x_node = '' self._node_list: list[str] = [] for node in nodes: if not node_has_coord(db, node): continue if get_node_links(db, node) == 0: continue self._nodes[node] = get_node_coord(db, node) | { 'links': [] } self._node_list.append(node) if self._max_x_node == '' or self._nodes[node]['x'] > self._nodes[self._max_x_node]['x']: self._max_x_node = node self._links: dict[str, Any] = {} self._link_list: list[str] = [] for node in self._nodes: for link in get_node_links(db, node): candidate = True link_nodes = get_link_nodes(db, link) for link_node in link_nodes: if link_node not in self._nodes: candidate = False break if candidate: length = get_pipe(db, link)['length'] if is_pipe(db, link) else 0.0 self._links[link] = { 'node1' : link_nodes[0], 'node2' : link_nodes[1], 'length' : length } self._link_list.append(link) if link not in self._nodes[link_nodes[0]]['links']: self._nodes[link_nodes[0]]['links'].append(link) if link not in self._nodes[link_nodes[1]]['links']: self._nodes[link_nodes[1]]['links'].append(link) def nodes(self): return self._nodes def node_list(self): return self._node_list def max_x_node(self): return self._max_x_node def links(self): return self._links def link_list(self): return self._link_list def _calculate_boundary(cursor: str, t_nodes: dict[str, Any], t_links: dict[str, Any]) -> tuple[list[str], dict[str, list[str]], list[tuple[float, float]]]: in_angle = 0 vertices: list[str] = [] path: dict[str, list[str]] = {} while True: # prevent duplicated node if len(vertices) > 0 and cursor == vertices[-1]: break # prevent duplicated path if len(vertices) >= 3 and vertices[0] == vertices[-1] and vertices[1] == cursor: break vertices.append(cursor) sorted_links = [] overlapped_link = '' for link in t_nodes[cursor]['links']: angle = _angle_of_node_link(cursor, link, t_nodes, t_links) if angle == in_angle: overlapped_link = link continue sorted_links.append((angle, link)) # work into a branch, return if len(sorted_links) == 0: path[overlapped_link] = [] cursor = vertices[-2] in_angle = _angle_of_node_link(cursor, overlapped_link, t_nodes, t_links) continue sorted_links = sorted(sorted_links, key=lambda s:s[0]) out_link = sorted_links[0][1] for angle, link in sorted_links: if angle > in_angle: out_link = link break path[out_link] = [] cursor = t_links[out_link]['node1'] if cursor == t_links[out_link]['node2'] else t_links[out_link]['node2'] in_angle = _angle_of_node_link(cursor, out_link, t_nodes, t_links) boundary: list[tuple[float, float]] = [] for node in vertices: boundary.append((t_nodes[node]['x'], t_nodes[node]['y'])) return (vertices, path, boundary) def _collect_new_links(in_links: dict[str, list[str]], t_nodes: dict[str, Any], t_links: dict[str, Any], new_nodes: dict[str, Any], new_links: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]: for link, pts in in_links.items(): node1 = t_links[link]['node1'] node2 = t_links[link]['node2'] x1, x2 = t_nodes[node1]['x'], t_nodes[node2]['x'] y1, y2 = t_nodes[node1]['y'], t_nodes[node2]['y'] if node1 not in new_nodes: new_nodes[node1] = { 'x': x1, 'y': y1, 'links': [] } if node2 not in new_nodes: new_nodes[node2] = { 'x': x2, 'y': y2, 'links': [] } x_delta = x2 - x1 y_delta = y2 - y1 use_x = abs(x_delta) > abs(y_delta) if len(pts) == 0: new_links[link] = t_links[link] else: sorted_nodes: list[tuple[float, str]] = [] sorted_nodes.append((0.0, node1)) sorted_nodes.append((1.0, node2)) i = 0 for pt in pts: x, y = new_nodes[pt]['x'], new_nodes[pt]['y'] percent = ((x - x1) / x_delta) if use_x else ((y - y1) / y_delta) sorted_nodes.append((percent, pt)) i += 1 sorted_nodes = sorted(sorted_nodes, key=lambda s:s[0]) for i in range(1, len(sorted_nodes)): l = sorted_nodes[i - 1][1] r = sorted_nodes[i][1] new_link = f'LINK_[{l}]_[{r}]' new_links[new_link] = { 'node1': l, 'node2': r } return (new_nodes, new_links) def calculate_boundary(name: str, nodes: list[str], accurate = False) -> list[tuple[float, float]]: topology = Topology(name, nodes) t_nodes = topology.nodes() t_links = topology.links() vertices, path, boundary = _calculate_boundary(topology.max_x_node(), t_nodes, t_links) if not accurate: return boundary api = 'calculate_boundary' write(name, f"delete from temp_region where id = '{api}'") # use linestring instead of polygon to reduce strict limitation # TODO: linestring can not work well write(name, f"insert into temp_region (id, boundary) values ('{api}', '{to_postgis_polygon(boundary)}')") write(name, f'delete from temp_node') for node in nodes: write(name, f"insert into temp_node values ('{node}')") for row in read_all(name, f"select n.node from coordinates as c, temp_node as n, temp_region as r where c.node = n.node and ST_Intersects(c.coord, r.boundary) and r.id = '{api}'"): node = row['node'] write(name, f"delete from temp_node where node = '{node}'") outside_nodes: list[str] = [] for row in read_all(name, "select node from temp_node"): outside_nodes.append(row['node']) # no outside nodes, return if len(outside_nodes) == 0: write(name, f'delete from temp_node') write(name, f"delete from temp_region where id = '{api}'") return boundary new_nodes: dict[str, Any] = {} new_links: dict[str, Any] = {} boundary_links: dict[str, list[str]] = {} write(name, "delete from temp_link_2") for node in outside_nodes: for link in t_nodes[node]['links']: node1 = t_links[link]['node1'] node2 = t_links[link]['node2'] if node1 in outside_nodes and node2 not in outside_nodes and node2 not in vertices and link: if link not in boundary: boundary_links[link] = [] line = f"LINESTRING({t_nodes[node1]['x']} {t_nodes[node1]['y']}, {t_nodes[node2]['x']} {t_nodes[node2]['y']})" write(name, f"insert into temp_link_2 values ('{link}', '{line}')") if node2 in outside_nodes and node1 not in outside_nodes and node1 not in vertices: if link not in boundary: boundary_links[link] = [] line = f"LINESTRING({t_nodes[node1]['x']} {t_nodes[node1]['y']}, {t_nodes[node2]['x']} {t_nodes[node2]['y']})" write(name, f"insert into temp_link_2 values ('{link}', '{line}')") if node1 in outside_nodes and node2 in outside_nodes: x1, x2 = t_nodes[node1]['x'], t_nodes[node2]['x'] y1, y2 = t_nodes[node1]['y'], t_nodes[node2]['y'] if node1 not in new_nodes: new_nodes[node1] = { 'x': x1, 'y': y1, 'links': [] } if node2 not in new_nodes: new_nodes[node2] = { 'x': x2, 'y': y2, 'links': [] } if link not in new_links: new_links[link] = t_links[link] # no boundary links, return if len(boundary_links) == 0: write(name, "delete from temp_link_2") write(name, f'delete from temp_node') write(name, f"delete from temp_region where id = '{api}'") return boundary write(name, "delete from temp_link_1") for link, _ in path.items(): node1 = t_links[link]['node1'] node2 = t_links[link]['node2'] line = f"LINESTRING({t_nodes[node1]['x']} {t_nodes[node1]['y']}, {t_nodes[node2]['x']} {t_nodes[node2]['y']})" write(name, f"insert into temp_link_1 (link, geom) values ('{link}', '{line}')") has_intersection = False for row in read_all(name, f"select l1.link as l, l2.link as r, st_astext(st_intersection(l1.geom, l2.geom)) as p from temp_link_1 as l1, temp_link_2 as l2 where st_intersects(l1.geom, l2.geom)"): has_intersection = True link1, link2, pt = str(row['l']), str(row['r']), str(row['p']) pts = pt.lower().removeprefix('point(').removesuffix(')').split(' ') xy = (float(pts[0]), float(pts[1])) new_node = f'NODE_[{link1}]_[{link2}]' new_nodes[new_node] = { 'x': xy[0], 'y': xy[1], 'links': [] } path[link1].append(new_node) boundary_links[link2].append(new_node) # no intersection, return if not has_intersection: write(name, "delete from temp_link_1") write(name, "delete from temp_link_2") write(name, 'delete from temp_node') write(name, f"delete from temp_region where id = '{api}'") return boundary new_nodes, new_links = _collect_new_links(path, t_nodes, t_links, new_nodes, new_links) new_nodes, new_links = _collect_new_links(boundary_links, t_nodes, t_links, new_nodes, new_links) for link, values in new_links.items(): new_nodes[values['node1']]['links'].append(link) new_nodes[values['node2']]['links'].append(link) _, _, boundary = _calculate_boundary(topology.max_x_node(), new_nodes, new_links) write(name, "delete from temp_link_1") write(name, "delete from temp_link_2") write(name, 'delete from temp_node') write(name, f"delete from temp_region where id = '{api}'") return boundary ''' # CClipper2.dll # int inflate_paths(double* path, size_t size, double delta, int jt, int et, double miter_limit, int precision, double arc_tolerance, double** out_path, size_t* out_size); # int simplify_paths(double* path, size_t size, double epsilon, int is_closed_path, double** out_path, size_t* out_size); # void free_paths(double** paths); ''' def inflate_boundary(name: str, boundary: list[tuple[float, float]], delta: float = 0.5) -> list[tuple[float, float]]: if boundary[0] == boundary[-1]: del(boundary[-1]) lib = ctypes.CDLL(os.path.join(os.getcwd(), 'api', 'CClipper2.dll')) c_size = ctypes.c_size_t(len(boundary) * 2) c_path = (ctypes.c_double * c_size.value)() i = 0 for xy in boundary: c_path[i] = xy[0] i += 1 c_path[i] = xy[1] i += 1 c_delta = ctypes.c_double(delta) JoinType_Square, JoinType_Round, JoinType_Miter = 0, 1, 2 c_jt = ctypes.c_int(JoinType_Square) EndType_Polygon, EndType_Joined, EndType_Butt, EndType_Square, EndType_Round = 0, 1, 2, 3, 4 c_et = ctypes.c_int(EndType_Polygon) c_miter_limit = ctypes.c_double(2.0) c_precision = ctypes.c_int(2) c_arc_tolerance = ctypes.c_double(0.0) c_out_path = ctypes.POINTER(ctypes.c_double)() c_out_size = ctypes.c_size_t(0) lib.inflate_paths(c_path, c_size, c_delta, c_jt, c_et, c_miter_limit, c_precision, c_arc_tolerance, ctypes.byref(c_out_path), ctypes.byref(c_out_size)) if c_out_size.value == 0: lib.free_paths(ctypes.byref(c_out_path)) return [] # TODO: simplify_paths :) result: list[tuple[float, float]] = [] for i in range(0, c_out_size.value, 2): result.append((c_out_path[i], c_out_path[i + 1])) result.append(result[0]) lib.free_paths(ctypes.byref(c_out_path)) return result def inflate_region(name: str, region_id: str, delta: float = 0.5) -> list[tuple[float, float]]: r = try_read(name, f"select id, st_astext(boundary) as boundary_geom from region where id = '{region_id}'") if r == None: return [] boundary = from_postgis_polygon(str(r['boundary_geom'])) return inflate_boundary(name, boundary, delta) if __name__ == '__main__': _verify_platform()