409 lines
16 KiB
Python
409 lines
16 KiB
Python
import ctypes
|
|
import platform
|
|
import os
|
|
import math
|
|
from typing import Any
|
|
from .s0_base import get_node_links, get_link_nodes, is_pipe
|
|
from .s5_pipes import get_pipe
|
|
from .database import read, try_read, read_all, write
|
|
from .s24_coordinates import node_has_coord, get_node_coord
|
|
|
|
|
|
def from_postgis_polygon(polygon: str) -> list[tuple[float, float]]:
|
|
boundary = polygon.lower().removeprefix('polygon((').removesuffix('))').split(',')
|
|
xys = []
|
|
for pt in boundary:
|
|
xy = pt.split(' ')
|
|
xys.append((float(xy[0]), float(xy[1])))
|
|
return xys
|
|
|
|
|
|
def to_postgis_polygon(boundary: list[tuple[float, float]]) -> str:
|
|
polygon = ''
|
|
for pt in boundary:
|
|
polygon += f'{pt[0]} {pt[1]},'
|
|
return f'polygon(({polygon[:-1]}))'
|
|
|
|
|
|
def to_postgis_linestring(boundary: list[tuple[float, float]]) -> str:
|
|
line = ''
|
|
for pt in boundary:
|
|
line += f'{pt[0]} {pt[1]},'
|
|
return f'linestring({line[:-1]})'
|
|
|
|
|
|
def get_nodes_in_boundary(name: str, boundary: list[tuple[float, float]]) -> list[str]:
|
|
api = 'get_nodes_in_boundary'
|
|
write(name, f"delete from temp_region where id = '{api}'")
|
|
write(name, f"insert into temp_region (id, boundary) values ('{api}', '{to_postgis_polygon(boundary)}')")
|
|
|
|
nodes: list[str] = []
|
|
for row in read_all(name, f"select c.node from coordinates as c, temp_region as r where ST_Intersects(c.coord, r.boundary) and r.id = '{api}'"):
|
|
nodes.append(row['node'])
|
|
|
|
write(name, f"delete from temp_region where id = '{api}'")
|
|
|
|
return nodes
|
|
|
|
|
|
def get_nodes_in_region(name: str, region_id: str) -> list[str]:
|
|
nodes: list[str] = []
|
|
for row in read_all(name, f"select c.node from coordinates as c, region as r where ST_Intersects(c.coord, r.boundary) and r.id = '{region_id}'"):
|
|
nodes.append(row['node'])
|
|
return nodes
|
|
|
|
|
|
def calculate_convex_hull(name: str, nodes: list[str]) -> list[tuple[float, float]]:
|
|
write(name, f'delete from temp_node')
|
|
for node in nodes:
|
|
write(name, f"insert into temp_node values ('{node}')")
|
|
|
|
# TODO: check none
|
|
polygon = read(name, f'select st_astext(st_convexhull(st_collect(array(select coord from coordinates where node in (select * from temp_node))))) as boundary' )['boundary']
|
|
write(name, f'delete from temp_node')
|
|
|
|
return from_postgis_polygon(polygon)
|
|
|
|
|
|
def _verify_platform():
|
|
_platform = platform.system()
|
|
if _platform != "Windows":
|
|
raise Exception(f'Platform {_platform} unsupported (not yet)')
|
|
|
|
|
|
def _normal(v: tuple[float, float]) -> tuple[float, float]:
|
|
l = math.sqrt(v[0] * v[0] + v[1] * v[1])
|
|
return (v[0] / l, v[1] / l)
|
|
|
|
|
|
def _angle(v: tuple[float, float]) -> float:
|
|
if v[0] >= 0 and v[1] >= 0:
|
|
return math.asin(v[1])
|
|
elif v[0] <= 0 and v[1] >= 0:
|
|
return math.pi - math.asin(v[1])
|
|
elif v[0] <= 0 and v[1] <= 0:
|
|
return math.asin(-v[1]) + math.pi
|
|
elif v[0] >= 0 and v[1] <= 0:
|
|
return math.pi * 2 - math.asin(-v[1])
|
|
return 0
|
|
|
|
|
|
def _angle_of_node_link(node: str, link: str, nodes, links) -> float:
|
|
n1 = node
|
|
n2 = links[link]['node1'] if n1 == links[link]['node2'] else links[link]['node2']
|
|
x1, y1 = nodes[n1]['x'], nodes[n1]['y']
|
|
x2, y2 = nodes[n2]['x'], nodes[n2]['y']
|
|
v = _normal((x2 - x1, y2 - y1))
|
|
return _angle(v)
|
|
|
|
|
|
class Topology:
|
|
def __init__(self, db: str, nodes: list[str]) -> None:
|
|
self._nodes: dict[str, Any] = {}
|
|
self._max_x_node = ''
|
|
self._node_list: list[str] = []
|
|
for node in nodes:
|
|
if not node_has_coord(db, node):
|
|
continue
|
|
if get_node_links(db, node) == 0:
|
|
continue
|
|
self._nodes[node] = get_node_coord(db, node) | { 'links': [] }
|
|
self._node_list.append(node)
|
|
if self._max_x_node == '' or self._nodes[node]['x'] > self._nodes[self._max_x_node]['x']:
|
|
self._max_x_node = node
|
|
|
|
self._links: dict[str, Any] = {}
|
|
self._link_list: list[str] = []
|
|
for node in self._nodes:
|
|
for link in get_node_links(db, node):
|
|
candidate = True
|
|
link_nodes = get_link_nodes(db, link)
|
|
for link_node in link_nodes:
|
|
if link_node not in self._nodes:
|
|
candidate = False
|
|
break
|
|
if candidate:
|
|
length = get_pipe(db, link)['length'] if is_pipe(db, link) else 0.0
|
|
self._links[link] = { 'node1' : link_nodes[0], 'node2' : link_nodes[1], 'length' : length }
|
|
self._link_list.append(link)
|
|
if link not in self._nodes[link_nodes[0]]['links']:
|
|
self._nodes[link_nodes[0]]['links'].append(link)
|
|
if link not in self._nodes[link_nodes[1]]['links']:
|
|
self._nodes[link_nodes[1]]['links'].append(link)
|
|
|
|
def nodes(self):
|
|
return self._nodes
|
|
|
|
def node_list(self):
|
|
return self._node_list
|
|
|
|
def max_x_node(self):
|
|
return self._max_x_node
|
|
|
|
def links(self):
|
|
return self._links
|
|
|
|
def link_list(self):
|
|
return self._link_list
|
|
|
|
|
|
def _calculate_boundary(name: str, cursor: str, t_nodes: dict[str, Any], t_links: dict[str, Any]) -> tuple[list[str], dict[str, list[str]], list[tuple[float, float]]]:
|
|
in_angle = 0
|
|
|
|
vertices: list[str] = []
|
|
path: dict[str, list[str]] = {}
|
|
while True:
|
|
# prevent duplicated node
|
|
if len(vertices) > 0 and cursor == vertices[-1]:
|
|
break
|
|
|
|
# prevent duplicated path
|
|
if len(vertices) >= 3 and vertices[0] == vertices[-1] and vertices[1] == cursor:
|
|
break
|
|
|
|
vertices.append(cursor)
|
|
|
|
sorted_links = []
|
|
overlapped_link = ''
|
|
for link in t_nodes[cursor]['links']:
|
|
angle = _angle_of_node_link(cursor, link, t_nodes, t_links)
|
|
if angle == in_angle:
|
|
overlapped_link = link
|
|
continue
|
|
sorted_links.append((angle, link))
|
|
|
|
# work into a branch, return
|
|
if len(sorted_links) == 0:
|
|
path[overlapped_link] = []
|
|
cursor = vertices[-2]
|
|
in_angle = _angle_of_node_link(cursor, overlapped_link, t_nodes, t_links)
|
|
continue
|
|
|
|
sorted_links = sorted(sorted_links, key=lambda s:s[0])
|
|
out_link = sorted_links[0][1]
|
|
for angle, link in sorted_links:
|
|
if angle > in_angle:
|
|
out_link = link
|
|
break
|
|
|
|
path[out_link] = []
|
|
cursor = t_links[out_link]['node1'] if cursor == t_links[out_link]['node2'] else t_links[out_link]['node2']
|
|
in_angle = _angle_of_node_link(cursor, out_link, t_nodes, t_links)
|
|
|
|
boundary: list[tuple[float, float]] = []
|
|
for node in vertices:
|
|
boundary.append((t_nodes[node]['x'], t_nodes[node]['y']))
|
|
|
|
return (vertices, path, boundary)
|
|
|
|
|
|
def _collect_new_links(in_links: dict[str, list[str]], t_nodes: dict[str, Any], t_links: dict[str, Any], new_nodes: dict[str, Any], new_links: dict[str, Any]) -> tuple[dict[str, Any], dict[str, Any]]:
|
|
for link, pts in in_links.items():
|
|
node1 = t_links[link]['node1']
|
|
node2 = t_links[link]['node2']
|
|
x1, x2 = t_nodes[node1]['x'], t_nodes[node2]['x']
|
|
y1, y2 = t_nodes[node1]['y'], t_nodes[node2]['y']
|
|
|
|
if node1 not in new_nodes:
|
|
new_nodes[node1] = { 'x': x1, 'y': y1, 'links': [] }
|
|
if node2 not in new_nodes:
|
|
new_nodes[node2] = { 'x': x2, 'y': y2, 'links': [] }
|
|
|
|
x_delta = x2 - x1
|
|
y_delta = y2 - y1
|
|
use_x = abs(x_delta) > abs(y_delta)
|
|
|
|
if len(pts) == 0:
|
|
new_links[link] = t_links[link]
|
|
else:
|
|
sorted_nodes: list[tuple[float, str]] = []
|
|
sorted_nodes.append((0.0, node1))
|
|
sorted_nodes.append((1.0, node2))
|
|
i = 0
|
|
for pt in pts:
|
|
x, y = new_nodes[pt]['x'], new_nodes[pt]['y']
|
|
percent = ((x - x1) / x_delta) if use_x else ((y - y1) / y_delta)
|
|
sorted_nodes.append((percent, pt))
|
|
i += 1
|
|
sorted_nodes = sorted(sorted_nodes, key=lambda s:s[0])
|
|
|
|
for i in range(1, len(sorted_nodes)):
|
|
l = sorted_nodes[i - 1][1]
|
|
r = sorted_nodes[i][1]
|
|
new_link = f'LINK_[{l}]_[{r}]'
|
|
new_links[new_link] = { 'node1': l, 'node2': r }
|
|
|
|
return (new_nodes, new_links)
|
|
|
|
|
|
def calculate_boundary(name: str, nodes: list[str]) -> list[tuple[float, float]]:
|
|
topology = Topology(name, nodes)
|
|
t_nodes = topology.nodes()
|
|
t_links = topology.links()
|
|
|
|
vertices, path, boundary = _calculate_boundary(name, topology.max_x_node(), t_nodes, t_links)
|
|
|
|
#return boundary
|
|
|
|
api = 'calculate_boundary'
|
|
write(name, f"delete from temp_region where id = '{api}'")
|
|
# use linestring instead of polygon to reduce strict limitation
|
|
write(name, f"insert into temp_region (id, boundary) values ('{api}', '{to_postgis_linestring(boundary)}')")
|
|
|
|
write(name, f'delete from temp_node')
|
|
for node in nodes:
|
|
write(name, f"insert into temp_node values ('{node}')")
|
|
|
|
for row in read_all(name, f"select n.node from coordinates as c, temp_node as n, temp_region as r where c.node = n.node and ST_Intersects(c.coord, r.boundary) and r.id = '{api}'"):
|
|
node = row['node']
|
|
write(name, f"delete from temp_node where node = '{node}'")
|
|
|
|
outside_nodes: list[str] = []
|
|
for row in read_all(name, "select node from temp_node"):
|
|
outside_nodes.append(row['node'])
|
|
|
|
# no outside nodes, return
|
|
if len(outside_nodes) == 0:
|
|
write(name, f'delete from temp_node')
|
|
write(name, f"delete from temp_region where id = '{api}'")
|
|
return boundary
|
|
|
|
new_nodes: dict[str, Any] = {}
|
|
new_links: dict[str, Any] = {}
|
|
|
|
boundary_links: dict[str, list[str]] = {}
|
|
write(name, "delete from temp_link_2")
|
|
for node in outside_nodes:
|
|
for link in t_nodes[node]['links']:
|
|
node1 = t_links[link]['node1']
|
|
node2 = t_links[link]['node2']
|
|
if node1 in outside_nodes and node2 not in outside_nodes and node2 not in vertices and link:
|
|
if link not in boundary:
|
|
boundary_links[link] = []
|
|
line = f"LINESTRING({t_nodes[node1]['x']} {t_nodes[node1]['y']}, {t_nodes[node2]['x']} {t_nodes[node2]['y']})"
|
|
write(name, f"insert into temp_link_2 values ('{link}', '{line}')")
|
|
if node2 in outside_nodes and node1 not in outside_nodes and node1 not in vertices:
|
|
if link not in boundary:
|
|
boundary_links[link] = []
|
|
line = f"LINESTRING({t_nodes[node1]['x']} {t_nodes[node1]['y']}, {t_nodes[node2]['x']} {t_nodes[node2]['y']})"
|
|
write(name, f"insert into temp_link_2 values ('{link}', '{line}')")
|
|
if node1 in outside_nodes and node2 in outside_nodes:
|
|
x1, x2 = t_nodes[node1]['x'], t_nodes[node2]['x']
|
|
y1, y2 = t_nodes[node1]['y'], t_nodes[node2]['y']
|
|
if node1 not in new_nodes:
|
|
new_nodes[node1] = { 'x': x1, 'y': y1, 'links': [] }
|
|
if node2 not in new_nodes:
|
|
new_nodes[node2] = { 'x': x2, 'y': y2, 'links': [] }
|
|
if link not in new_links:
|
|
new_links[link] = t_links[link]
|
|
|
|
# no boundary links, return
|
|
if len(boundary_links) == 0:
|
|
write(name, "delete from temp_link_2")
|
|
write(name, f'delete from temp_node')
|
|
write(name, f"delete from temp_region where id = '{api}'")
|
|
return boundary
|
|
|
|
write(name, "delete from temp_link_1")
|
|
for link, _ in path.items():
|
|
node1 = t_links[link]['node1']
|
|
node2 = t_links[link]['node2']
|
|
line = f"LINESTRING({t_nodes[node1]['x']} {t_nodes[node1]['y']}, {t_nodes[node2]['x']} {t_nodes[node2]['y']})"
|
|
write(name, f"insert into temp_link_1 (link, geom) values ('{link}', '{line}')")
|
|
|
|
has_intersection = False
|
|
for row in read_all(name, f"select l1.link as l, l2.link as r, st_astext(st_intersection(l1.geom, l2.geom)) as p from temp_link_1 as l1, temp_link_2 as l2 where st_intersects(l1.geom, l2.geom)"):
|
|
has_intersection = True
|
|
|
|
link1, link2, pt = str(row['l']), str(row['r']), str(row['p'])
|
|
pts = pt.lower().removeprefix('point(').removesuffix(')').split(' ')
|
|
xy = (float(pts[0]), float(pts[1]))
|
|
|
|
new_node = f'NODE_[{link1}]_[{link2}]'
|
|
new_nodes[new_node] = { 'x': xy[0], 'y': xy[1], 'links': [] }
|
|
|
|
path[link1].append(new_node)
|
|
boundary_links[link2].append(new_node)
|
|
|
|
if not has_intersection:
|
|
write(name, "delete from temp_link_1")
|
|
write(name, "delete from temp_link_2")
|
|
write(name, 'delete from temp_node')
|
|
write(name, f"delete from temp_region where id = '{api}'")
|
|
return boundary
|
|
|
|
new_nodes, new_links = _collect_new_links(path, t_nodes, t_links, new_nodes, new_links)
|
|
new_nodes, new_links = _collect_new_links(boundary_links, t_nodes, t_links, new_nodes, new_links)
|
|
|
|
for link, values in new_links.items():
|
|
new_nodes[values['node1']]['links'].append(link)
|
|
new_nodes[values['node2']]['links'].append(link)
|
|
|
|
_, _, boundary = _calculate_boundary(name, topology.max_x_node(), new_nodes, new_links)
|
|
|
|
write(name, "delete from temp_link_1")
|
|
write(name, "delete from temp_link_2")
|
|
write(name, 'delete from temp_node')
|
|
write(name, f"delete from temp_region where id = '{api}'")
|
|
|
|
return boundary
|
|
|
|
|
|
'''
|
|
# CClipper2.dll
|
|
# int inflate_paths(double* path, size_t size, double delta, int jt, int et, double miter_limit, int precision, double arc_tolerance, double** out_path, size_t* out_size);
|
|
# int simplify_paths(double* path, size_t size, double epsilon, int is_closed_path, double** out_path, size_t* out_size);
|
|
# void free_paths(double** paths);
|
|
'''
|
|
def inflate_boundary(name: str, boundary: list[tuple[float, float]], delta: float = 0.5) -> list[tuple[float, float]]:
|
|
if boundary[0] == boundary[-1]:
|
|
del(boundary[-1])
|
|
|
|
lib = ctypes.CDLL(os.path.join(os.getcwd(), 'api', 'CClipper2.dll'))
|
|
|
|
c_size = ctypes.c_size_t(len(boundary) * 2)
|
|
c_path = (ctypes.c_double * c_size.value)()
|
|
i = 0
|
|
for xy in boundary:
|
|
c_path[i] = xy[0]
|
|
i += 1
|
|
c_path[i] = xy[1]
|
|
i += 1
|
|
c_delta = ctypes.c_double(delta)
|
|
JoinType_Square, JoinType_Round, JoinType_Miter = 0, 1, 2
|
|
c_jt = ctypes.c_int(JoinType_Square)
|
|
EndType_Polygon, EndType_Joined, EndType_Butt, EndType_Square, EndType_Round = 0, 1, 2, 3, 4
|
|
c_et = ctypes.c_int(EndType_Polygon)
|
|
c_miter_limit = ctypes.c_double(2.0)
|
|
c_precision = ctypes.c_int(2)
|
|
c_arc_tolerance = ctypes.c_double(0.0)
|
|
c_out_path = ctypes.POINTER(ctypes.c_double)()
|
|
c_out_size = ctypes.c_size_t(0)
|
|
|
|
lib.inflate_paths(c_path, c_size, c_delta, c_jt, c_et, c_miter_limit, c_precision, c_arc_tolerance, ctypes.byref(c_out_path), ctypes.byref(c_out_size))
|
|
if c_out_size.value == 0:
|
|
lib.free_paths(ctypes.byref(c_out_path))
|
|
return []
|
|
|
|
# TODO: simplify_paths :)
|
|
|
|
result: list[tuple[float, float]] = []
|
|
for i in range(0, c_out_size.value, 2):
|
|
result.append((c_out_path[i], c_out_path[i + 1]))
|
|
result.append(result[0])
|
|
|
|
lib.free_paths(ctypes.byref(c_out_path))
|
|
return result
|
|
|
|
|
|
def inflate_region(name: str, region_id: str, delta: float = 0.5) -> list[tuple[float, float]]:
|
|
r = try_read(name, f"select id, st_astext(boundary) as boundary_geom from region where id = '{region_id}'")
|
|
if r == None:
|
|
return []
|
|
boundary = from_postgis_polygon(str(r['boundary_geom']))
|
|
return inflate_boundary(name, boundary, delta)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
_verify_platform()
|