Files
TJWaterServer/api/s33_dma.py
2023-05-16 21:30:23 +08:00

231 lines
7.6 KiB
Python

from .database import *
from .s0_base import is_node
from .s32_region_util import to_postgis_polygon
from .s32_region import get_region
def get_district_metering_area_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
'boundary' : {'type': 'tuple_list' , 'optional': False , 'readonly': False },
'parent' : {'type': 'str' , 'optional': True , 'readonly': False },
'level' : {'type': 'int' , 'optional': False , 'readonly': True } }
def get_district_metering_area(name: str, id: str) -> dict[str, Any]:
dma = get_region(name, id)
if dma == {}:
return {}
r = try_read(name, f"select * from region_dma where id = '{id}'")
if r == None:
return {}
dma['parent'] = r['parent']
dma['nodes'] = list(eval(r['nodes']))
dma['level'] = 1
if dma['parent'] != None:
parent = dma['parent']
while parent != None:
parent = read(name, f"select parent from region_dma where id = '{parent}'")['parent']
dma['level'] += 1
return dma
def _set_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
new_boundary = cs.operations[0]['boundary']
old_boundary = get_region(name, id)['boundary']
new_parent = cs.operations[0]['parent']
f_new_parent = f"'{new_parent}'" if new_parent != None else 'null'
new_nodes = cs.operations[0]['nodes']
str_new_nodes = str(new_nodes).replace("'", "''")
old = get_district_metering_area(name, id)
old_parent = old['parent']
f_old_parent = f"'{old_parent}'" if old_parent != None else 'null'
old_nodes = old['nodes']
str_old_nodes = str(old_nodes).replace("'", "''")
redo_sql = f"update region set boundary = st_geomfromtext('{to_postgis_polygon(new_boundary)}') where id = '{id}';"
redo_sql += f"update region_dma set parent = {f_new_parent}, nodes = '{str_new_nodes}' where id = '{id}';"
undo_sql = f"update region_dma set parent = {f_old_parent}, nodes = '{str_old_nodes}' where id = '{id}';"
undo_sql += f"update region set boundary = st_geomfromtext('{to_postgis_polygon(old_boundary)}') where id = '{id}';"
redo_cs = g_update_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': new_boundary, 'parent': new_parent, 'nodes': new_nodes }
undo_cs = g_update_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': old_boundary, 'parent': old_parent, 'nodes': old_nodes }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def set_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
ops = cs.operations
if len(cs.operations) == 0:
return ChangeSet()
op = ops[0]
if 'id' not in op:
return ChangeSet()
dma = get_district_metering_area(name, op['id'])
if dma == {}:
return ChangeSet()
if 'boundary' not in op:
op['boundary'] = dma['boundary']
else:
b = op['boundary']
if len(b) < 4 or b[0] != b[-1]:
return ChangeSet()
if 'parent' not in op:
op['parent'] = dma['parent']
if op['parent'] != None and get_district_metering_area(name, op['parent']) == {}:
return ChangeSet()
if 'nodes' not in op:
op['nodes'] = dma['nodes']
else:
for node in op['nodes']:
if not is_node(name, node):
return ChangeSet()
return execute_command(name, _set_district_metering_area(name, cs))
def _add_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
boundary = cs.operations[0]['boundary']
parent = cs.operations[0]['parent']
f_parent = f"'{parent}'" if parent != None else 'null'
nodes = cs.operations[0]['nodes']
str_nodes = str(nodes).replace("'", "''")
redo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'DMA');"
redo_sql += f"insert into region_dma (id, parent, nodes) values ('{id}', {f_parent}, '{str_nodes}');"
undo_sql = f"delete from region_dma where id = '{id}';"
undo_sql += f"delete from region where id = '{id}';"
redo_cs = g_add_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': boundary, 'parent': parent, 'nodes': nodes }
undo_cs = g_delete_prefix | { 'type': 'district_metering_area', 'id': id }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def add_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
ops = cs.operations
if len(cs.operations) == 0:
return ChangeSet()
op = ops[0]
if 'id' not in op:
return ChangeSet()
dma = get_district_metering_area(name, op['id'])
if dma != {}:
return ChangeSet()
if 'boundary' not in op:
return ChangeSet()
else:
b = op['boundary']
if len(b) < 4 or b[0] != b[-1]:
return ChangeSet()
if 'parent' not in op:
op['parent'] = None
if op['parent'] != None and get_district_metering_area(name, op['parent']) == {}:
return ChangeSet()
if 'nodes' not in op:
op['nodes'] = []
else:
for node in op['nodes']:
if not is_node(name, node):
return ChangeSet()
return execute_command(name, _add_district_metering_area(name, cs))
def _delete_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
dma = get_district_metering_area(name, id)
boundary = dma['boundary']
parent = dma['parent']
f_parent = f"'{parent}'" if parent != None else 'null'
nodes = dma['nodes']
str_nodes = str(nodes).replace("'", "''")
redo_sql = f"delete from region_dma where id = '{id}';"
redo_sql += f"delete from region where id = '{id}';"
undo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'DMA');"
undo_sql += f"insert into region_dma (id, parent, nodes) values ('{id}', {f_parent}, '{str_nodes}');"
redo_cs = g_delete_prefix | { 'type': 'district_metering_area', 'id': id }
undo_cs = g_add_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': boundary, 'parent': parent, 'nodes': nodes }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def _has_child(name: str, parent: str) -> bool:
return try_read(name, f"select * from region_dma where parent = '{parent}'") != None
def is_descendant_of(name: str, descendant: str, ancestor: str) -> bool:
parent = descendant
while parent != None:
parent = read(name, f"select parent from region_dma where id = '{parent}'")['parent']
if parent == ancestor:
return True
return False
def delete_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
ops = cs.operations
if len(cs.operations) == 0:
return ChangeSet()
op = ops[0]
if 'id' not in op:
return ChangeSet()
dma = get_district_metering_area(name, op['id'])
if dma == {}:
return ChangeSet()
#TODO: cascade ?
if _has_child(name, dma['id']):
return ChangeSet()
return execute_command(name, _delete_district_metering_area(name, cs))
def get_all_district_metering_area_ids(name: str) -> list[str]:
ids = []
for row in read_all(name, f"select id from region_dma"):
ids.append(row['id'])
return ids
def get_all_district_metering_areas(name: str) -> list[dict[str, Any]]:
result = []
for id in get_all_district_metering_area_ids(name):
result.append(get_district_metering_area(name, id))
return result