Add more algorithms, such as path, inflate
This commit is contained in:
@@ -132,7 +132,7 @@ from .s31_scada_element import SCADA_ELEMENT_STATUS_OFFLINE, SCADA_ELEMENT_STATU
|
||||
from .s31_scada_element import get_scada_element_schema, get_scada_elements, get_scada_element, set_scada_element, add_scada_element, delete_scada_element
|
||||
from .clean_api import clean_scada_element
|
||||
|
||||
from .s32_region_util import get_nodes_in_boundary, get_nodes_in_region, calculate_convex_hull
|
||||
from .s32_region_util import get_nodes_in_boundary, get_nodes_in_region, calculate_convex_hull, calculate_boundary, inflate_boundary, inflate_region
|
||||
|
||||
from .s33_region import get_region_schema, get_region, set_region, add_region, delete_region
|
||||
|
||||
|
||||
@@ -1,4 +1,12 @@
|
||||
from .database import read, read_all, write
|
||||
import ctypes
|
||||
import platform
|
||||
import os
|
||||
import math
|
||||
import collections
|
||||
from .s0_base import get_node_links, get_link_nodes
|
||||
from .database import read, try_read, read_all, write
|
||||
from .s24_coordinates import node_has_coord, get_node_coord
|
||||
|
||||
|
||||
def from_postgis_polygon(polygon: str) -> list[tuple[float, float]]:
|
||||
boundary = polygon.lower().removeprefix('polygon((').removesuffix('))').split(',')
|
||||
@@ -30,9 +38,9 @@ def get_nodes_in_boundary(name: str, boundary: list[tuple[float, float]]) -> lis
|
||||
return nodes
|
||||
|
||||
|
||||
def get_nodes_in_region(name: str, id: str) -> list[str]:
|
||||
def get_nodes_in_region(name: str, region_id: str) -> list[str]:
|
||||
nodes: list[str] = []
|
||||
for row in read_all(name, f"select c.node from coordinates as c, region as r where ST_Intersects(c.coord, r.boundary) and r.id = '{id}'"):
|
||||
for row in read_all(name, f"select c.node from coordinates as c, region as r where ST_Intersects(c.coord, r.boundary) and r.id = '{region_id}'"):
|
||||
nodes.append(row['node'])
|
||||
return nodes
|
||||
|
||||
@@ -47,3 +55,163 @@ def calculate_convex_hull(name: str, nodes: list[str]) -> list[tuple[float, floa
|
||||
write(name, f'delete from temp_node')
|
||||
|
||||
return from_postgis_polygon(polygon)
|
||||
|
||||
|
||||
def _verify_platform():
|
||||
_platform = platform.system()
|
||||
if _platform != "Windows":
|
||||
raise Exception(f'Platform {_platform} unsupported (not yet)')
|
||||
|
||||
|
||||
def _normal(v: tuple[float, float]) -> tuple[float, float]:
|
||||
l = math.sqrt(v[0] * v[0] + v[1] * v[1])
|
||||
return (v[0] / l, v[1] / l)
|
||||
|
||||
|
||||
def _angle(v: tuple[float, float]) -> float:
|
||||
if v[0] >= 0 and v[1] >= 0:
|
||||
return math.asin(v[1])
|
||||
elif v[0] <= 0 and v[1] >= 0:
|
||||
return math.asin(v[1]) + math.pi * 0.5
|
||||
elif v[0] <= 0 and v[1] <= 0:
|
||||
return math.asin(-v[1]) + math.pi
|
||||
elif v[0] >= 0 and v[1] <= 0:
|
||||
return math.asin(-v[1]) + math.pi * 1.5
|
||||
return 0
|
||||
|
||||
|
||||
def _angle_of_node_link(node: str, link: str, nodes, links) -> float:
|
||||
n1 = node
|
||||
n2 = links[link]['node1'] if n1 == links[link]['node2'] else links[link]['node2']
|
||||
x1, y1 = nodes[n1]['x'], nodes[n1]['y']
|
||||
x2, y2 = nodes[n2]['x'], nodes[n2]['y']
|
||||
v = _normal((x2 - x1, y2 - y1))
|
||||
return _angle(v)
|
||||
|
||||
|
||||
def calculate_boundary(name: str, nodes: list[str]) -> list[tuple[float, float]]:
|
||||
new_nodes = {}
|
||||
max_x_node = ''
|
||||
for node in nodes:
|
||||
if not node_has_coord(name, node):
|
||||
continue
|
||||
if get_node_links(name, node) == 0:
|
||||
continue
|
||||
new_nodes[node] = get_node_coord(name, node) | { 'links': [] }
|
||||
if max_x_node == '' or new_nodes[node]['x'] > new_nodes[max_x_node]['x']:
|
||||
max_x_node = node
|
||||
|
||||
new_links = {}
|
||||
for node in new_nodes:
|
||||
for link in get_node_links(name, node):
|
||||
candidate = True
|
||||
link_nodes = get_link_nodes(name, link)
|
||||
for link_node in link_nodes:
|
||||
if link_node not in new_nodes:
|
||||
candidate = False
|
||||
break
|
||||
if candidate:
|
||||
new_links[link] = { 'node1' : link_nodes[0], 'node2' : link_nodes[1] }
|
||||
if link not in new_nodes[link_nodes[0]]['links']:
|
||||
new_nodes[link_nodes[0]]['links'].append(link)
|
||||
if link not in new_nodes[link_nodes[1]]['links']:
|
||||
new_nodes[link_nodes[1]]['links'].append(link)
|
||||
|
||||
cursor = max_x_node
|
||||
in_angle = 0
|
||||
|
||||
paths: list[str] = []
|
||||
while True:
|
||||
paths.append(cursor)
|
||||
sorted_links = []
|
||||
overlapped_link = ''
|
||||
for link in new_nodes[cursor]['links']:
|
||||
angle = _angle_of_node_link(cursor, link, new_nodes, new_links)
|
||||
if angle == in_angle:
|
||||
overlapped_link = link
|
||||
continue
|
||||
sorted_links.append((angle, link))
|
||||
|
||||
# work into a branch, return
|
||||
if len(sorted_links) == 0:
|
||||
cursor = paths[-2]
|
||||
in_angle = in_angle = _angle_of_node_link(cursor, overlapped_link, new_nodes, new_links)
|
||||
continue
|
||||
|
||||
sorted_links = sorted(sorted_links, key=lambda s:s[0])
|
||||
out_link = sorted_links[0][1]
|
||||
for angle, link in sorted_links:
|
||||
if angle > in_angle:
|
||||
out_link = link
|
||||
break
|
||||
|
||||
cursor = new_links[out_link]['node1'] if cursor == new_links[out_link]['node2'] else new_links[out_link]['node2']
|
||||
# end up trip :)
|
||||
if cursor == max_x_node:
|
||||
paths.append(cursor)
|
||||
break
|
||||
|
||||
in_angle = _angle_of_node_link(cursor, out_link, new_nodes, new_links)
|
||||
|
||||
boundary: list[tuple[float, float]] = []
|
||||
for node in paths:
|
||||
boundary.append((new_nodes[node]['x'], new_nodes[node]['y']))
|
||||
|
||||
return boundary
|
||||
|
||||
|
||||
'''
|
||||
# CClipper2.dll
|
||||
# int inflate_paths(double* path, size_t size, double delta, int jt, int et, double miter_limit, int precision, double arc_tolerance, double** out_path, size_t* out_size);
|
||||
# int simplify_paths(double* path, size_t size, double epsilon, int is_closed_path, double** out_path, size_t* out_size);
|
||||
# void free_paths(double** paths);
|
||||
'''
|
||||
def inflate_boundary(name: str, boundary: list[tuple[float, float]], delta: float = 0.5) -> list[tuple[float, float]]:
|
||||
if boundary[0] == boundary[-1]:
|
||||
del(boundary[-1])
|
||||
|
||||
lib = ctypes.CDLL(os.path.join(os.getcwd(), 'api', 'CClipper2.dll'))
|
||||
|
||||
c_size = ctypes.c_size_t(len(boundary) * 2)
|
||||
c_path = (ctypes.c_double * c_size.value)()
|
||||
i = 0
|
||||
for xy in boundary:
|
||||
c_path[i] = xy[0]
|
||||
i += 1
|
||||
c_path[i] = xy[1]
|
||||
i += 1
|
||||
c_delta = ctypes.c_double(delta)
|
||||
c_jt = ctypes.c_int(0)
|
||||
c_et = ctypes.c_int(0)
|
||||
c_miter_limit = ctypes.c_double(2.0)
|
||||
c_precision = ctypes.c_int(2)
|
||||
c_arc_tolerance = ctypes.c_double(0.0)
|
||||
c_out_path = ctypes.POINTER(ctypes.c_double)()
|
||||
c_out_size = ctypes.c_size_t(0)
|
||||
|
||||
lib.inflate_paths(c_path, c_size, c_delta, c_jt, c_et, c_miter_limit, c_precision, c_arc_tolerance, ctypes.byref(c_out_path), ctypes.byref(c_out_size))
|
||||
if c_out_size.value == 0:
|
||||
lib.free_paths(ctypes.byref(c_out_path))
|
||||
return []
|
||||
|
||||
# TODO: simplify_paths :)
|
||||
|
||||
result: list[tuple[float, float]] = []
|
||||
for i in range(0, c_out_size.value, 2):
|
||||
result.append((c_out_path[i], c_out_path[i + 1]))
|
||||
result.append(result[0])
|
||||
|
||||
lib.free_paths(ctypes.byref(c_out_path))
|
||||
return result
|
||||
|
||||
|
||||
def inflate_region(name: str, region_id: str, delta: float = 0.5) -> list[tuple[float, float]]:
|
||||
r = try_read(name, f"select id, st_astext(boundary) as boundary_geom from region where id = '{region_id}'")
|
||||
if r == None:
|
||||
return []
|
||||
boundary = from_postgis_polygon(str(r['boundary_geom']))
|
||||
return inflate_boundary(name, boundary, delta)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
_verify_platform()
|
||||
|
||||
14
tjnetwork.py
14
tjnetwork.py
@@ -942,12 +942,21 @@ def clean_scada_element(name: str) -> ChangeSet:
|
||||
def get_nodes_in_boundary(name: str, boundary: list[tuple[float, float]]) -> list[str]:
|
||||
return api.get_nodes_in_boundary(name, boundary)
|
||||
|
||||
def get_nodes_in_region(name: str, id: str) -> list[str]:
|
||||
return api.get_nodes_in_region(name, id)
|
||||
def get_nodes_in_region(name: str, region_id: str) -> list[str]:
|
||||
return api.get_nodes_in_region(name, region_id)
|
||||
|
||||
def calculate_convex_hull(name: str, nodes: list[str]) -> list[tuple[float, float]]:
|
||||
return api.calculate_convex_hull(name, nodes)
|
||||
|
||||
def calculate_boundary(name: str, nodes: list[str]) -> list[tuple[float, float]]:
|
||||
return api.calculate_boundary(name, nodes)
|
||||
|
||||
def inflate_boundary(name: str, boundary: list[tuple[float, float]], delta: float = 0.5) -> list[tuple[float, float]]:
|
||||
return api.inflate_boundary(name, boundary, delta)
|
||||
|
||||
def inflate_region(name: str, region_id: str, delta: float = 0.5) -> list[tuple[float, float]]:
|
||||
return api.inflate_region(name, region_id, delta)
|
||||
|
||||
|
||||
############################################################
|
||||
# general_region 33
|
||||
@@ -989,5 +998,6 @@ def delete_region(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
# virtual_district 37
|
||||
############################################################
|
||||
|
||||
# parent is whole network
|
||||
def calculate_virtual_district(name: str, centers: list[str]) -> dict[str, Any]:
|
||||
return api.calculate_virtual_district(name, centers)
|
||||
|
||||
Reference in New Issue
Block a user