Support database region

This commit is contained in:
WQY\qiong
2023-04-29 17:51:34 +08:00
parent a3e8f693d9
commit d66087225f
5 changed files with 224 additions and 2 deletions

View File

@@ -132,4 +132,6 @@ from .s31_scada_element import SCADA_ELEMENT_STATUS_OFFLINE, SCADA_ELEMENT_STATU
from .s31_scada_element import get_scada_element_schema, get_scada_elements, get_scada_element, set_scada_element, add_scada_element, delete_scada_element
from .clean_api import clean_scada_element
from .s37_virtual_district import calculate_virtual_district
from .s33_region import get_region_schema, get_region, set_region, add_region, delete_region
from .s37_virtual_district import calculate_virtual_district

15
api/s32_region_util.py Normal file
View File

@@ -0,0 +1,15 @@
def from_postgis_polygon(polygon: str) -> list[tuple[float, float]]:
boundary = polygon.lower().removeprefix('polygon((').removesuffix('))').split(',')
xys = []
for pt in boundary:
xy = pt.split(' ')
xys.append((float(xy[0]), float(xy[1])))
return xys
def to_postgis_polygon(boundary: list[tuple[float, float]]) -> str:
polygon = ''
for pt in boundary:
polygon += f'{pt[0]} {pt[1]},'
return f'polygon(({polygon[:-1]}))'

81
api/s33_region.py Normal file
View File

@@ -0,0 +1,81 @@
from .database import *
from .s32_region_util import from_postgis_polygon, to_postgis_polygon
def get_region_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
'boundary' : {'type': 'tuple_list' , 'optional': False , 'readonly': False} }
def get_region(name: str, id: str) -> dict[str, Any]:
r = try_read(name, f"select id, st_astext(boundary) as boundary_geom from region where id = '{id}'")
if r == None:
return {}
d = {}
d['id'] = str(r['id'])
d['boundary'] = from_postgis_polygon(str(r['boundary_geom']))
return d
def _set_region(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
new = cs.operations[0]['boundary']
old = get_region(name, id)['boundary']
redo_sql = f"update region set boundary = st_geomfromtext('{to_postgis_polygon(new)}') where id = '{id}';"
undo_sql = f"update region set boundary = st_geomfromtext('{to_postgis_polygon(old)}') where id = '{id}';"
redo_cs = g_update_prefix | { 'type': 'region', 'id': id, 'boundary': new }
undo_cs = g_update_prefix | { 'type': 'region', 'id': id, 'boundary': old }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def set_region(name: str, cs: ChangeSet) -> ChangeSet:
if 'id' not in cs.operations[0] or 'boundary' not in cs.operations[0]:
return ChangeSet()
if len(cs.operations[0]['boundary']) < 3:
return ChangeSet()
if get_region(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, _set_region(name, cs))
def _add_region(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
new = cs.operations[0]['boundary']
redo_sql = f"insert into region (id, boundary) values ('{id}', '{to_postgis_polygon(new)}');"
undo_sql = f"delete from region where id = '{id}';"
redo_cs = g_add_prefix | { 'type': 'region', 'id': id, 'boundary': new }
undo_cs = g_delete_prefix | { 'type': 'region', 'id': id }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def add_region(name: str, cs: ChangeSet) -> ChangeSet:
if 'id' not in cs.operations[0] or 'boundary' not in cs.operations[0]:
return ChangeSet()
if len(cs.operations[0]['boundary']) < 3:
return ChangeSet()
if get_region(name, cs.operations[0]['id']) != {}:
return ChangeSet()
return execute_command(name, _add_region(name, cs))
def _delete_region(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
old = get_region(name, id)['boundary']
redo_sql = f"delete from region where id = '{id}';"
undo_sql = f"insert into region (id, boundary) values ('{id}', '{to_postgis_polygon(old)}');"
redo_cs = g_delete_prefix | { 'type': 'region', 'id': id }
undo_cs = g_add_prefix | { 'type': 'region', 'id': id, 'boundary': old }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def delete_region(name: str, cs: ChangeSet) -> ChangeSet:
if 'id' not in cs.operations[0]:
return ChangeSet()
if get_region(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, _delete_region(name, cs))