Merge branch 'master' of https://e.coding.net/tjwater/tjwatercloud/TJWaterServer
This commit is contained in:
@@ -1,57 +1,60 @@
|
||||
from .database import *
|
||||
from .s23_options_util import get_option_schema, generate_v3
|
||||
|
||||
|
||||
def _inp_in_option(section: list[str]) -> ChangeSet:
|
||||
if len(section) <= 0:
|
||||
return ChangeSet()
|
||||
|
||||
cs = g_update_prefix | { 'type' : 'option' }
|
||||
for s in section:
|
||||
if s.startswith(';'):
|
||||
continue
|
||||
|
||||
tokens = s.strip().split()
|
||||
if tokens[0].upper() == 'PATTERN': # can not upper id
|
||||
value = tokens[1] if len(tokens) > 1 else ''
|
||||
cs |= { 'PATTERN' : value }
|
||||
elif tokens[0].upper() == 'QUALITY': # can not upper trace node
|
||||
value = tokens[1] if len(tokens) > 1 else ''
|
||||
if len(tokens) > 2:
|
||||
value += f' {tokens[2]}'
|
||||
cs |= { 'QUALITY' : value }
|
||||
else:
|
||||
line = s.upper().strip()
|
||||
for key in get_option_schema('').keys():
|
||||
if line.startswith(key):
|
||||
value = line.removeprefix(key).strip()
|
||||
cs |= { key : value }
|
||||
|
||||
result = ChangeSet(cs)
|
||||
result.merge(generate_v3(result))
|
||||
return result
|
||||
|
||||
|
||||
def inp_in_option(section: list[str]) -> str:
|
||||
sql = ''
|
||||
result = _inp_in_option(section)
|
||||
for op in result.operations:
|
||||
for key in op.keys():
|
||||
if key == 'operation' or key == 'type':
|
||||
continue
|
||||
if op['type'] == 'option':
|
||||
sql += f"update options set value = '{op[key]}' where key = '{key}';"
|
||||
else:
|
||||
sql += f"update options_v3 set value = '{op[key]}' where key = '{key}';"
|
||||
return sql
|
||||
|
||||
|
||||
def inp_out_option(name: str) -> list[str]:
|
||||
lines = []
|
||||
objs = read_all(name, f"select * from options")
|
||||
for obj in objs:
|
||||
key = obj['key']
|
||||
value = obj['value']
|
||||
if str(value).strip() != '':
|
||||
lines.append(f'{key} {value}')
|
||||
return lines
|
||||
from .database import *
|
||||
from .s23_options_util import get_option_schema, generate_v3
|
||||
|
||||
|
||||
def _inp_in_option(section: list[str]) -> ChangeSet:
|
||||
if len(section) <= 0:
|
||||
return ChangeSet()
|
||||
|
||||
cs = g_update_prefix | { 'type' : 'option' }
|
||||
for s in section:
|
||||
if s.startswith(';'):
|
||||
continue
|
||||
|
||||
tokens = s.strip().split()
|
||||
if tokens[0].upper() == 'PATTERN': # can not upper id
|
||||
value = tokens[1] if len(tokens) > 1 else ''
|
||||
cs |= { 'PATTERN' : value }
|
||||
elif tokens[0].upper() == 'QUALITY': # can not upper trace node
|
||||
value = tokens[1] if len(tokens) > 1 else ''
|
||||
if len(tokens) > 2:
|
||||
value += f' {tokens[2]}'
|
||||
cs |= { 'QUALITY' : value }
|
||||
else:
|
||||
line = s.upper().strip()
|
||||
for key in get_option_schema('').keys():
|
||||
if line.startswith(key):
|
||||
value = line.removeprefix(key).strip()
|
||||
cs |= { key : value }
|
||||
|
||||
result = ChangeSet(cs)
|
||||
result.merge(generate_v3(result))
|
||||
return result
|
||||
|
||||
|
||||
def inp_in_option(section: list[str]) -> str:
|
||||
sql = ''
|
||||
result = _inp_in_option(section)
|
||||
for op in result.operations:
|
||||
for key in op.keys():
|
||||
if key == 'operation' or key == 'type':
|
||||
continue
|
||||
if op['type'] == 'option':
|
||||
sql += f"update options set value = '{op[key]}' where key = '{key}';"
|
||||
else:
|
||||
sql += f"update options_v3 set value = '{op[key]}' where key = '{key}';"
|
||||
return sql
|
||||
|
||||
|
||||
def inp_out_option(name: str) -> list[str]:
|
||||
lines = []
|
||||
objs = read_all(name, f"select * from options")
|
||||
for obj in objs:
|
||||
key = obj['key']
|
||||
# release version does not support new keys and has error message
|
||||
if key == 'HTOL' or key == 'QTOL' or key == 'RQTOL':
|
||||
continue
|
||||
value = obj['value']
|
||||
if str(value).strip() != '':
|
||||
lines.append(f'{key} {value}')
|
||||
return lines
|
||||
|
||||
@@ -164,6 +164,9 @@ def calculate_boundary(name: str, nodes: list[str]) -> list[tuple[float, float]]
|
||||
# work into a branch, return
|
||||
if len(sorted_links) == 0:
|
||||
cursor = paths[-2]
|
||||
if cursor == topology.max_x_node():
|
||||
paths.append(cursor)
|
||||
break
|
||||
in_angle = in_angle = _angle_of_node_link(cursor, overlapped_link, t_nodes, t_links)
|
||||
continue
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ def generate_sub_district_metering_area(name: str, dma: str, part_count: int = 1
|
||||
for nodes in calculate_district_metering_area_for_region(name, dma, part_count, part_type):
|
||||
boundary = calculate_boundary(name, nodes)
|
||||
boundary = inflate_boundary(name, boundary, inflate_delta)
|
||||
cs.add({ 'type': 'district_metering_area', 'id': f"DMA_{level}_{i}", 'boundary': boundary, 'parent': dma, 'nodes': nodes })
|
||||
cs.add({ 'type': 'district_metering_area', 'id': f"DMA_[{dma}]_{level}_{i}", 'boundary': boundary, 'parent': dma, 'nodes': nodes })
|
||||
i += 1
|
||||
|
||||
return execute_batch_command(name, cs)
|
||||
|
||||
@@ -6097,24 +6097,81 @@ class TestApi:
|
||||
|
||||
cs = generate_district_metering_area(p, 3).operations
|
||||
assert len(cs) == 3
|
||||
assert cs[0]['operation'] == API_ADD
|
||||
assert cs[0]['type'] == 'district_metering_area'
|
||||
assert cs[0]['id'] == 'DMA_1_1'
|
||||
assert cs[1]['operation'] == API_ADD
|
||||
assert cs[1]['type'] == 'district_metering_area'
|
||||
assert cs[1]['id'] == 'DMA_1_2'
|
||||
assert cs[2]['operation'] == API_ADD
|
||||
assert cs[2]['type'] == 'district_metering_area'
|
||||
assert cs[2]['id'] == 'DMA_1_3'
|
||||
|
||||
dmas = get_all_district_metering_area_ids(p)
|
||||
assert len(dmas) == 3
|
||||
assert dmas[0] == 'DMA_1_1'
|
||||
assert dmas[1] == 'DMA_1_2'
|
||||
assert dmas[2] == 'DMA_1_3'
|
||||
|
||||
cs = generate_district_metering_area(p, 3).operations
|
||||
assert len(cs) == 6
|
||||
assert cs[0]['operation'] == API_DELETE
|
||||
assert cs[0]['type'] == 'district_metering_area'
|
||||
assert cs[0]['id'] == 'DMA_1_1'
|
||||
assert cs[1]['operation'] == API_DELETE
|
||||
assert cs[1]['type'] == 'district_metering_area'
|
||||
assert cs[1]['id'] == 'DMA_1_2'
|
||||
assert cs[2]['operation'] == API_DELETE
|
||||
assert cs[2]['type'] == 'district_metering_area'
|
||||
assert cs[2]['id'] == 'DMA_1_3'
|
||||
assert cs[3]['operation'] == API_ADD
|
||||
assert cs[3]['type'] == 'district_metering_area'
|
||||
assert cs[3]['id'] == 'DMA_1_1'
|
||||
assert cs[4]['operation'] == API_ADD
|
||||
assert cs[4]['type'] == 'district_metering_area'
|
||||
assert cs[4]['id'] == 'DMA_1_2'
|
||||
assert cs[5]['operation'] == API_ADD
|
||||
assert cs[5]['type'] == 'district_metering_area'
|
||||
assert cs[5]['id'] == 'DMA_1_3'
|
||||
|
||||
dmas = get_all_district_metering_area_ids(p)
|
||||
assert len(dmas) == 3
|
||||
assert dmas[0] == 'DMA_1_1'
|
||||
assert dmas[1] == 'DMA_1_2'
|
||||
assert dmas[2] == 'DMA_1_3'
|
||||
|
||||
cs = generate_sub_district_metering_area(p, 'DMA_1_1', 2).operations
|
||||
assert len(cs) == 2
|
||||
assert cs[0]['operation'] == API_ADD
|
||||
assert cs[0]['type'] == 'district_metering_area'
|
||||
assert cs[0]['id'] == 'DMA_[DMA_1_1]_2_1'
|
||||
assert cs[1]['operation'] == API_ADD
|
||||
assert cs[1]['type'] == 'district_metering_area'
|
||||
assert cs[1]['id'] == 'DMA_[DMA_1_1]_2_2'
|
||||
|
||||
cs = generate_sub_district_metering_area(p, 'DMA_1_2', 3).operations
|
||||
assert len(cs) == 2
|
||||
assert cs[0]['operation'] == API_ADD
|
||||
assert cs[0]['type'] == 'district_metering_area'
|
||||
assert cs[0]['id'] == 'DMA_[DMA_1_2]_2_2'
|
||||
assert cs[1]['operation'] == API_ADD
|
||||
assert cs[1]['type'] == 'district_metering_area'
|
||||
assert cs[1]['id'] == 'DMA_[DMA_1_2]_2_3'
|
||||
|
||||
cs = generate_sub_district_metering_area(p, 'DMA_1_3', 2).operations
|
||||
assert len(cs) == 2
|
||||
assert cs[0]['operation'] == API_ADD
|
||||
assert cs[0]['type'] == 'district_metering_area'
|
||||
assert cs[0]['id'] == 'DMA_[DMA_1_3]_2_1'
|
||||
assert cs[1]['operation'] == API_ADD
|
||||
assert cs[1]['type'] == 'district_metering_area'
|
||||
assert cs[1]['id'] == 'DMA_[DMA_1_3]_2_2'
|
||||
|
||||
dmas = get_all_district_metering_area_ids(p)
|
||||
assert len(dmas) == 5
|
||||
assert len(dmas) == 9
|
||||
|
||||
cs = generate_district_metering_area(p, 3).operations
|
||||
assert len(cs) == 8
|
||||
assert len(cs) == 12
|
||||
|
||||
dmas = get_all_district_metering_area_ids(p)
|
||||
assert len(dmas) == 3
|
||||
|
||||
Reference in New Issue
Block a user