Support SA
This commit is contained in:
217
api/s34_sa.py
Normal file
217
api/s34_sa.py
Normal file
@@ -0,0 +1,217 @@
|
||||
from .database import *
|
||||
from .s0_base import is_node
|
||||
from .s32_region_util import to_postgis_polygon
|
||||
from .s32_region import get_region
|
||||
|
||||
def get_service_area_schema(name: str) -> dict[str, dict[str, Any]]:
|
||||
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
|
||||
'boundary' : {'type': 'tuple_list' , 'optional': False , 'readonly': False },
|
||||
'source' : {'type': 'str' , 'optional': False , 'readonly': False },
|
||||
'time_index' : {'type': 'int' , 'optional': False , 'readonly': False } }
|
||||
|
||||
def get_service_area(name: str, id: str) -> dict[str, Any]:
|
||||
sa = get_region(name, id)
|
||||
if sa == {}:
|
||||
return {}
|
||||
r = try_read(name, f"select * from region_sa where id = '{id}'")
|
||||
if r == None:
|
||||
return {}
|
||||
sa['source'] = r['source']
|
||||
sa['nodes'] = list(eval(r['nodes']))
|
||||
sa['time_index'] = r['time_index']
|
||||
return sa
|
||||
|
||||
def _set_service_area(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
id = cs.operations[0]['id']
|
||||
|
||||
new_boundary = cs.operations[0]['boundary']
|
||||
old_boundary = get_region(name, id)['boundary']
|
||||
|
||||
new_source = cs.operations[0]['source']
|
||||
f_new_source = f"'{new_source}'"
|
||||
|
||||
new_nodes = cs.operations[0]['nodes']
|
||||
str_new_nodes = str(new_nodes).replace("'", "''")
|
||||
|
||||
new_time_index = cs.operations[0]['time_index']
|
||||
|
||||
old = get_service_area(name, id)
|
||||
old_source = old['source']
|
||||
f_old_source = f"'{old_source}'"
|
||||
|
||||
old_nodes = old['nodes']
|
||||
str_old_nodes = str(old_nodes).replace("'", "''")
|
||||
|
||||
old_time_index = old['time_index']
|
||||
|
||||
redo_sql = f"update region set boundary = st_geomfromtext('{to_postgis_polygon(new_boundary)}') where id = '{id}';"
|
||||
redo_sql += f"update region_sa set time_index = {new_time_index}, source = {f_new_source}, nodes = '{str_new_nodes}' where id = '{id}';"
|
||||
|
||||
undo_sql = f"update region_sa set time_index = {old_time_index}, source = {f_old_source}, nodes = '{str_old_nodes}' where id = '{id}';"
|
||||
undo_sql += f"update region set boundary = st_geomfromtext('{to_postgis_polygon(old_boundary)}') where id = '{id}';"
|
||||
|
||||
redo_cs = g_update_prefix | { 'type': 'service_area', 'id': id, 'boundary': new_boundary, 'time_index': new_time_index, 'source': new_source, 'nodes': new_nodes }
|
||||
undo_cs = g_update_prefix | { 'type': 'service_area', 'id': id, 'boundary': old_boundary, 'time_index': old_time_index, 'source': old_source, 'nodes': old_nodes }
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def set_service_area(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
ops = cs.operations
|
||||
|
||||
if len(cs.operations) == 0:
|
||||
return ChangeSet()
|
||||
|
||||
op = ops[0]
|
||||
|
||||
if 'id' not in op:
|
||||
return ChangeSet()
|
||||
|
||||
sa = get_service_area(name, op['id'])
|
||||
if sa == {}:
|
||||
return ChangeSet()
|
||||
|
||||
if 'boundary' not in op:
|
||||
op['boundary'] = sa['boundary']
|
||||
else:
|
||||
b = op['boundary']
|
||||
if len(b) < 4 or b[0] != b[-1]:
|
||||
return ChangeSet()
|
||||
|
||||
if 'time_index' not in op:
|
||||
op['time_index'] = sa['time_index']
|
||||
|
||||
if 'source' not in op:
|
||||
op['source'] = sa['source']
|
||||
|
||||
if not is_node(name, op['source']):
|
||||
return ChangeSet()
|
||||
|
||||
if 'nodes' not in op:
|
||||
op['nodes'] = sa['nodes']
|
||||
else:
|
||||
for node in op['nodes']:
|
||||
if not is_node(name, node):
|
||||
return ChangeSet()
|
||||
|
||||
return execute_command(name, _set_service_area(name, cs))
|
||||
|
||||
|
||||
def _add_service_area(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
id = cs.operations[0]['id']
|
||||
|
||||
boundary = cs.operations[0]['boundary']
|
||||
|
||||
time_index = cs.operations[0]['time_index']
|
||||
|
||||
source = cs.operations[0]['source']
|
||||
f_source = f"'{source}'"
|
||||
|
||||
nodes = cs.operations[0]['nodes']
|
||||
str_nodes = str(nodes).replace("'", "''")
|
||||
|
||||
redo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'SA');"
|
||||
redo_sql += f"insert into region_sa (id, time_index, source, nodes) values ('{id}', {time_index}, {f_source}, '{str_nodes}');"
|
||||
|
||||
undo_sql = f"delete from region_sa where id = '{id}';"
|
||||
undo_sql += f"delete from region where id = '{id}';"
|
||||
|
||||
redo_cs = g_add_prefix | { 'type': 'service_area', 'id': id, 'boundary': boundary, 'time_index': time_index, 'source': source, 'nodes': nodes }
|
||||
undo_cs = g_delete_prefix | { 'type': 'service_area', 'id': id }
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def add_service_area(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
ops = cs.operations
|
||||
|
||||
if len(cs.operations) == 0:
|
||||
return ChangeSet()
|
||||
|
||||
op = ops[0]
|
||||
|
||||
if 'id' not in op:
|
||||
return ChangeSet()
|
||||
|
||||
sa = get_service_area(name, op['id'])
|
||||
if sa != {}:
|
||||
return ChangeSet()
|
||||
|
||||
if 'boundary' not in op:
|
||||
return ChangeSet()
|
||||
else:
|
||||
b = op['boundary']
|
||||
if len(b) < 4 or b[0] != b[-1]:
|
||||
return ChangeSet()
|
||||
|
||||
if 'time_index' not in op:
|
||||
return ChangeSet()
|
||||
|
||||
if 'source' not in op:
|
||||
return ChangeSet()
|
||||
|
||||
if not is_node(name, op['source']):
|
||||
return ChangeSet()
|
||||
|
||||
if 'nodes' not in op:
|
||||
op['nodes'] = []
|
||||
else:
|
||||
for node in op['nodes']:
|
||||
if not is_node(name, node):
|
||||
return ChangeSet()
|
||||
|
||||
return execute_command(name, _add_service_area(name, cs))
|
||||
|
||||
|
||||
def _delete_service_area(name: str, cs: ChangeSet) -> DbChangeSet:
|
||||
id = cs.operations[0]['id']
|
||||
sa = get_service_area(name, id)
|
||||
boundary = sa['boundary']
|
||||
time_index = sa['time_index']
|
||||
source = sa['source']
|
||||
f_source = f"'{source}'"
|
||||
nodes = sa['nodes']
|
||||
str_nodes = str(nodes).replace("'", "''")
|
||||
|
||||
redo_sql = f"delete from region_sa where id = '{id}';"
|
||||
redo_sql += f"delete from region where id = '{id}';"
|
||||
|
||||
undo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'SA');"
|
||||
undo_sql += f"insert into region_sa (id, time_index, source, nodes) values ('{id}', {time_index}, {f_source}, '{str_nodes}');"
|
||||
|
||||
redo_cs = g_delete_prefix | { 'type': 'service_area', 'id': id }
|
||||
undo_cs = g_add_prefix | { 'type': 'service_area', 'id': id, 'boundary': boundary, 'time_index': time_index, 'source': source, 'nodes': nodes }
|
||||
|
||||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||||
|
||||
|
||||
def delete_service_area(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
ops = cs.operations
|
||||
|
||||
if len(cs.operations) == 0:
|
||||
return ChangeSet()
|
||||
|
||||
op = ops[0]
|
||||
|
||||
if 'id' not in op:
|
||||
return ChangeSet()
|
||||
|
||||
sa = get_service_area(name, op['id'])
|
||||
if sa == {}:
|
||||
return ChangeSet()
|
||||
|
||||
return execute_command(name, _delete_service_area(name, cs))
|
||||
|
||||
|
||||
def get_all_service_area_ids(name: str) -> list[str]:
|
||||
ids = []
|
||||
for row in read_all(name, f"select id from region_sa"):
|
||||
ids.append(row['id'])
|
||||
return ids
|
||||
|
||||
|
||||
def get_all_service_areas(name: str) -> list[dict[str, Any]]:
|
||||
result = []
|
||||
for id in get_all_service_area_ids(name):
|
||||
result.append(get_service_area(name, id))
|
||||
return result
|
||||
Reference in New Issue
Block a user