Support DMA

This commit is contained in:
WQY\qiong
2023-05-14 13:20:54 +08:00
parent 1bc73f5bd5
commit 58fd285d25
9 changed files with 637 additions and 96 deletions

View File

@@ -142,8 +142,13 @@ from .s32_region_util import get_nodes_in_boundary, get_nodes_in_region, calcula
from .s32_region import get_region_schema, get_region, set_region, add_region, delete_region
from .s33_dma import PARTITION_TYPE_RB, PARTITION_TYPE_KWAY
from .s33_dma import calculate_district_metering_area_for_nodes, calculate_district_metering_area_for_region, calculate_district_metering_area_for_network
from .s33_dma_cal import PARTITION_TYPE_RB, PARTITION_TYPE_KWAY
from .s33_dma_cal import calculate_district_metering_area_for_nodes, calculate_district_metering_area_for_region, calculate_district_metering_area_for_network
from .s33_dma import get_district_metering_area_schema, get_district_metering_area, set_district_metering_area, add_district_metering_area, delete_district_metering_area
from .s33_dma import get_all_district_metering_area_ids, get_all_district_metering_areas
from .s33_dma_gen import generate_district_metering_area, generate_sub_district_metering_area
from .s34_sa import calculate_service_area

View File

@@ -31,6 +31,7 @@ from .s29_scada_device import set_scada_device, add_scada_device, delete_scada_d
from .s30_scada_device_data import set_scada_device_data, add_scada_device_data, delete_scada_device_data
from .s31_scada_element import set_scada_element, add_scada_element, delete_scada_element
from .s32_region import set_region, add_region, delete_region
from .s33_dma import set_district_metering_area, add_district_metering_area, delete_district_metering_area
from .batch_api_cs import rewrite_batch_api
@@ -109,6 +110,8 @@ def _execute_add_command(name: str, cs: ChangeSet) -> ChangeSet:
return add_scada_element(name, cs)
elif type == s32_region:
return add_region(name, cs)
elif type == s33_dma:
return add_district_metering_area(name, cs)
return ChangeSet()
@@ -190,6 +193,8 @@ def _execute_update_command(name: str, cs: ChangeSet) -> ChangeSet:
return set_scada_element(name, cs)
elif type == s32_region:
return set_region(name, cs)
elif type == s33_dma:
return set_district_metering_area(name, cs)
return ChangeSet()
@@ -269,6 +274,8 @@ def _execute_delete_command(name: str, cs: ChangeSet) -> ChangeSet:
return delete_scada_element(name, cs)
elif type == s32_region:
return delete_region(name, cs)
elif type == s33_dma:
return delete_district_metering_area(name, cs)
return ChangeSet()

View File

@@ -1,97 +1,230 @@
import ctypes
import os
from .database import *
from .s0_base import get_nodes
from .s32_region_util import get_nodes_in_region
from .s32_region_util import Topology
from .s0_base import is_node
from .s32_region_util import to_postgis_polygon
from .s32_region import get_region
def get_district_metering_area_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
'boundary' : {'type': 'tuple_list' , 'optional': False , 'readonly': False },
'parent' : {'type': 'str' , 'optional': True , 'readonly': False },
'level' : {'type': 'str' , 'optional': False , 'readonly': True } }
PARTITION_TYPE_RB = 0
PARTITION_TYPE_KWAY = 1
def get_district_metering_area(name: str, id: str) -> dict[str, Any]:
dma = get_region(name, id)
if dma == {}:
return {}
r = try_read(name, f"select * from region_dma where id = '{id}'")
if r == None:
return {}
dma['parent'] = r['parent']
dma['nodes'] = list(eval(r['nodes']))
dma['level'] = 1
if dma['parent'] != None:
parent = dma['parent']
while parent != None:
parent = read(name, f"select parent from region_dma where id = '{parent}'")['parent']
dma['level'] += 1
return dma
# no update for nodes
def _set_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
new_boundary = cs.operations[0]['boundary']
old_boundary = get_region(name, id)['boundary']
new_parent = cs.operations[0]['parent']
f_new_parent = f"'{new_parent}'" if new_parent != None else 'null'
new_nodes = cs.operations[0]['nodes']
str_new_nodes = str(new_nodes).replace("'", "''")
old = get_district_metering_area(name, id)
old_parent = old['parent']
f_old_parent = f"'{old_parent}'" if old_parent != None else 'null'
old_nodes = old['nodes']
str_old_nodes = str(old_nodes).replace("'", "''")
redo_sql = f"update region set boundary = st_geomfromtext('{to_postgis_polygon(new_boundary)}') where id = '{id}';"
redo_sql += f"update region_dma set parent = {f_new_parent}, nodes = '{str_new_nodes}' where id = '{id}';"
undo_sql = f"update region_dma set parent = {f_old_parent}, nodes = '{str_old_nodes}' where id = '{id}';"
undo_sql += f"update region set boundary = st_geomfromtext('{to_postgis_polygon(old_boundary)}') where id = '{id}';"
redo_cs = g_update_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': new_boundary, 'parent': new_parent, 'nodes': new_nodes }
undo_cs = g_update_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': old_boundary, 'parent': old_parent, 'nodes': old_nodes }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def calculate_district_metering_area_for_nodes(name: str, nodes: list[str], part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> list[list[str]]:
if part_type != PARTITION_TYPE_RB and part_type != PARTITION_TYPE_KWAY:
return []
if part_count <= 0:
return []
elif part_count == 1:
return [nodes]
def set_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
ops = cs.operations
lib = ctypes.CDLL(os.path.join(os.getcwd(), 'api', 'CMetis.dll'))
if len(cs.operations) == 0:
return ChangeSet()
op = ops[0]
METIS_NOPTIONS = 40
c_options = (ctypes.c_int64 * METIS_NOPTIONS)()
if 'id' not in op:
return ChangeSet()
dma = get_district_metering_area(name, op['id'])
if dma == {}:
return ChangeSet()
METIS_OK = 1
result = lib.set_default_options(c_options)
if result != METIS_OK:
return []
if 'boundary' not in op:
op['boundary'] = dma['boundary']
else:
b = op['boundary']
if len(b) < 4 or b[0] != b[-1]:
return ChangeSet()
METIS_OPTION_PTYPE , METIS_OPTION_CONTIG = 0, 13
c_options[METIS_OPTION_PTYPE] = part_type
c_options[METIS_OPTION_CONTIG] = 1
if 'parent' not in op:
op['parent'] = dma['parent']
topology = Topology(name, nodes)
t_nodes = topology.nodes()
t_links = topology.links()
t_node_list = topology.node_list()
t_link_list = topology.link_list()
if op['parent'] != None and get_district_metering_area(name, op['parent']) == {}:
return ChangeSet()
nedges = len(t_link_list) * 2
if 'nodes' not in op:
op['nodes'] = dma['nodes']
else:
for node in op['nodes']:
if not is_node(name, node):
return ChangeSet()
c_nvtxs = ctypes.c_int64(len(t_node_list))
c_ncon = ctypes.c_int64(1)
c_xadj = (ctypes.c_int64 * (c_nvtxs.value + 1))()
c_adjncy = (ctypes.c_int64 * nedges)()
c_vwgt = (ctypes.c_int64 * (c_ncon.value * c_nvtxs.value))()
c_adjwgt = (ctypes.c_int64 * nedges)()
c_vsize = (ctypes.c_int64 * c_nvtxs.value)()
c_xadj[0] = 0
l, n = 0, 0
c_xadj_i = 1
for node in t_node_list:
links = t_nodes[node]['links']
for link in links:
node1 = t_links[link]['node1']
node2 = t_links[link]['node2']
c_adjncy[l] = t_node_list.index(node2) if node2 != node else t_node_list.index(node1)
c_adjwgt[l] = 1
l += 1
if len(links) > 0:
c_xadj[c_xadj_i] = l # adjncy.size()
c_xadj_i += 1
c_vwgt[n] = 1
c_vsize[n] = 1
n += 1
part_func = lib.part_graph_recursive if part_type == PARTITION_TYPE_RB else lib.part_graph_kway
c_nparts = ctypes.c_int64(part_count)
c_tpwgts = ctypes.POINTER(ctypes.c_double)()
c_ubvec = ctypes.POINTER(ctypes.c_double)()
c_out_edgecut = ctypes.c_int64(0)
c_out_part = (ctypes.c_int64 * c_nvtxs.value)()
result = part_func(ctypes.byref(c_nvtxs), ctypes.byref(c_ncon), c_xadj, c_adjncy, c_vwgt, c_vsize, c_adjwgt, ctypes.byref(c_nparts), c_tpwgts, c_ubvec, c_options, ctypes.byref(c_out_edgecut), c_out_part)
if result != METIS_OK:
return []
dmas : list[list[str]]= []
for i in range(part_count):
dmas.append([])
for i in range(c_nvtxs.value):
dmas[c_out_part[i]].append(t_node_list[i])
return dmas
return execute_command(name, _set_district_metering_area(name, cs))
def calculate_district_metering_area_for_region(name: str, region: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> list[list[str]]:
nodes = get_nodes_in_region(name, region)
return calculate_district_metering_area_for_nodes(name, nodes, part_count, part_type)
def _add_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
boundary = cs.operations[0]['boundary']
parent = cs.operations[0]['parent']
f_parent = f"'{parent}'" if parent != None else 'null'
nodes = cs.operations[0]['nodes']
str_nodes = str(nodes).replace("'", "''")
redo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'DMA');"
redo_sql += f"insert into region_dma (id, parent, nodes) values ('{id}', {f_parent}, '{str_nodes}');"
undo_sql = f"delete from region_dma where id = '{id}';"
undo_sql += f"delete from region where id = '{id}';"
redo_cs = g_add_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': boundary, 'parent': parent, 'nodes': nodes }
undo_cs = g_delete_prefix | { 'type': 'district_metering_area', 'id': id }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def calculate_district_metering_area_for_network(name: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> list[list[str]]:
nodes = get_nodes(name)
return calculate_district_metering_area_for_nodes(name, nodes, part_count, part_type)
def add_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
ops = cs.operations
if len(cs.operations) == 0:
return ChangeSet()
op = ops[0]
if 'id' not in op:
return ChangeSet()
dma = get_district_metering_area(name, op['id'])
if dma != {}:
return ChangeSet()
if 'boundary' not in op:
return ChangeSet()
else:
b = op['boundary']
if len(b) < 4 or b[0] != b[-1]:
return ChangeSet()
if 'parent' not in op:
op['parent'] = None
if op['parent'] != None and get_district_metering_area(name, op['parent']) == {}:
return ChangeSet()
if 'nodes' not in op:
op['nodes'] = []
else:
for node in op['nodes']:
if not is_node(name, node):
return ChangeSet()
return execute_command(name, _add_district_metering_area(name, cs))
def _delete_district_metering_area(name: str, cs: ChangeSet) -> DbChangeSet:
id = cs.operations[0]['id']
dma = get_district_metering_area(name, id)
boundary = dma['boundary']
parent = dma['parent']
f_parent = f"'{parent}'" if parent != None else 'null'
nodes = dma['nodes']
str_nodes = str(nodes).replace("'", "''")
redo_sql = f"delete from region_dma where id = '{id}';"
redo_sql += f"delete from region where id = '{id}';"
undo_sql = f"insert into region (id, boundary, r_type) values ('{id}', '{to_postgis_polygon(boundary)}', 'DMA');"
undo_sql += f"insert into region_dma (id, parent, nodes) values ('{id}', {f_parent}, '{str_nodes}');"
redo_cs = g_delete_prefix | { 'type': 'district_metering_area', 'id': id }
undo_cs = g_add_prefix | { 'type': 'district_metering_area', 'id': id, 'boundary': boundary, 'parent': parent, 'nodes': nodes }
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def _has_child(name: str, parent: str) -> bool:
return try_read(name, f"select * from region_dma where parent = '{parent}'") != None
def is_descendant_of(name: str, descendant: str, ancestor: str) -> bool:
parent = descendant
while parent != None:
parent = read(name, f"select parent from region_dma where id = '{parent}'")['parent']
if parent == ancestor:
return True
return False
def delete_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
ops = cs.operations
if len(cs.operations) == 0:
return ChangeSet()
op = ops[0]
if 'id' not in op:
return ChangeSet()
dma = get_district_metering_area(name, op['id'])
if dma == {}:
return ChangeSet()
#TODO: cascade ?
if _has_child(name, dma['id']):
return ChangeSet()
return execute_command(name, _delete_district_metering_area(name, cs))
def get_all_district_metering_area_ids(name: str) -> list[str]:
ids = []
for row in read_all(name, f"select id from region_dma"):
ids.append(row['id'])
return ids
def get_all_district_metering_areas(name: str) -> list[dict[str, Any]]:
result = []
for id in get_all_district_metering_area_ids(name):
result.append(get_district_metering_area(name, id))
return result

98
api/s33_dma_cal.py Normal file
View File

@@ -0,0 +1,98 @@
import ctypes
import os
from .database import *
from .s0_base import get_nodes
from .s32_region_util import get_nodes_in_region, to_postgis_polygon
from .s32_region_util import Topology
from .s32_region import get_region
PARTITION_TYPE_RB = 0
PARTITION_TYPE_KWAY = 1
def calculate_district_metering_area_for_nodes(name: str, nodes: list[str], part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> list[list[str]]:
if part_type != PARTITION_TYPE_RB and part_type != PARTITION_TYPE_KWAY:
return []
if part_count <= 0:
return []
elif part_count == 1:
return [nodes]
lib = ctypes.CDLL(os.path.join(os.getcwd(), 'api', 'CMetis.dll'))
METIS_NOPTIONS = 40
c_options = (ctypes.c_int64 * METIS_NOPTIONS)()
METIS_OK = 1
result = lib.set_default_options(c_options)
if result != METIS_OK:
return []
METIS_OPTION_PTYPE , METIS_OPTION_CONTIG = 0, 13
c_options[METIS_OPTION_PTYPE] = part_type
c_options[METIS_OPTION_CONTIG] = 1
topology = Topology(name, nodes)
t_nodes = topology.nodes()
t_links = topology.links()
t_node_list = topology.node_list()
t_link_list = topology.link_list()
nedges = len(t_link_list) * 2
c_nvtxs = ctypes.c_int64(len(t_node_list))
c_ncon = ctypes.c_int64(1)
c_xadj = (ctypes.c_int64 * (c_nvtxs.value + 1))()
c_adjncy = (ctypes.c_int64 * nedges)()
c_vwgt = (ctypes.c_int64 * (c_ncon.value * c_nvtxs.value))()
c_adjwgt = (ctypes.c_int64 * nedges)()
c_vsize = (ctypes.c_int64 * c_nvtxs.value)()
c_xadj[0] = 0
l, n = 0, 0
c_xadj_i = 1
for node in t_node_list:
links = t_nodes[node]['links']
for link in links:
node1 = t_links[link]['node1']
node2 = t_links[link]['node2']
c_adjncy[l] = t_node_list.index(node2) if node2 != node else t_node_list.index(node1)
c_adjwgt[l] = 1
l += 1
if len(links) > 0:
c_xadj[c_xadj_i] = l # adjncy.size()
c_xadj_i += 1
c_vwgt[n] = 1
c_vsize[n] = 1
n += 1
part_func = lib.part_graph_recursive if part_type == PARTITION_TYPE_RB else lib.part_graph_kway
c_nparts = ctypes.c_int64(part_count)
c_tpwgts = ctypes.POINTER(ctypes.c_double)()
c_ubvec = ctypes.POINTER(ctypes.c_double)()
c_out_edgecut = ctypes.c_int64(0)
c_out_part = (ctypes.c_int64 * c_nvtxs.value)()
result = part_func(ctypes.byref(c_nvtxs), ctypes.byref(c_ncon), c_xadj, c_adjncy, c_vwgt, c_vsize, c_adjwgt, ctypes.byref(c_nparts), c_tpwgts, c_ubvec, c_options, ctypes.byref(c_out_edgecut), c_out_part)
if result != METIS_OK:
return []
dmas : list[list[str]]= []
for i in range(part_count):
dmas.append([])
for i in range(c_nvtxs.value):
dmas[c_out_part[i]].append(t_node_list[i])
return dmas
def calculate_district_metering_area_for_region(name: str, region: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> list[list[str]]:
nodes = get_nodes_in_region(name, region)
return calculate_district_metering_area_for_nodes(name, nodes, part_count, part_type)
def calculate_district_metering_area_for_network(name: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> list[list[str]]:
nodes = get_nodes(name)
return calculate_district_metering_area_for_nodes(name, nodes, part_count, part_type)

47
api/s33_dma_gen.py Normal file
View File

@@ -0,0 +1,47 @@
from .s32_region_util import calculate_boundary, inflate_boundary
from .s33_dma_cal import *
from .s33_dma import get_all_district_metering_area_ids, get_all_district_metering_areas, get_district_metering_area, is_descendant_of
from .batch_exe import execute_batch_command
def generate_district_metering_area(name: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> ChangeSet:
cs = ChangeSet()
dmas = get_all_district_metering_areas(name)
max_level = 0
for dma in dmas:
if dma['level'] > max_level:
max_level = dma['level']
while max_level > 0:
for dma in dmas:
if dma['level'] == max_level:
cs.delete({ 'type': 'district_metering_area', 'id': dma['id'] })
max_level -= 1
i = 1
for nodes in calculate_district_metering_area_for_network(name, part_count, part_type):
boundary = calculate_boundary(name, nodes)
boundary = inflate_boundary(name, boundary)
cs.add({ 'type': 'district_metering_area', 'id': f"DMA_1_{i}", 'boundary': boundary, 'parent': None, 'nodes': nodes })
i += 1
return execute_batch_command(name, cs)
def generate_sub_district_metering_area(name: str, dma: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> ChangeSet:
cs = ChangeSet()
for id in get_all_district_metering_area_ids(name):
if is_descendant_of(name, id, dma):
cs.delete({ 'type': 'district_metering_area', 'id': id })
level = get_district_metering_area(name, dma)['level'] + 1
i = 1
for nodes in calculate_district_metering_area_for_region(name, dma, part_count, part_type):
boundary = calculate_boundary(name, nodes)
boundary = inflate_boundary(name, boundary)
cs.add({ 'type': 'district_metering_area', 'id': f"DMA_{level}_{i}", 'boundary': boundary, 'parent': dma, 'nodes': nodes })
i += 1
return execute_batch_command(name, cs)

View File

@@ -34,6 +34,7 @@ s29_scada_device = 'scada_device'
s30_scada_device_data = 'scada_device_data'
s31_scada_element = 'scada_element'
s32_region = 'region'
s33_dma = 'district_metering_area'
TITLE = 'TITLE'
JUNCTIONS = 'JUNCTIONS'

View File

@@ -2,5 +2,5 @@ create table region_dma
(
id text primary key references region(id)
, parent text --references region_dma(id)
, nodes text not null default ''
, nodes text not null
);

View File

@@ -5871,6 +5871,257 @@ class TestApi:
self.leave(p)
def test_district_metering_area(self):
p = 'test_district_metering_area'
self.enter(p)
dma = get_district_metering_area(p, 'dma')
assert dma == {}
add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0)]}))
dma = get_district_metering_area(p, 'dma')
assert dma == {}
add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': '1'}))
dma = get_district_metering_area(p, 'dma')
assert dma == {}
add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]}))
dma = get_district_metering_area(p, 'dma')
assert dma['id'] == 'dma'
assert dma['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert dma['parent'] == None
assert dma['nodes'] == []
assert dma['level'] == 1
set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': '1'}))
dma = get_district_metering_area(p, 'dma')
assert dma['id'] == 'dma'
assert dma['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert dma['parent'] == None
assert dma['nodes'] == []
assert dma['level'] == 1
add_district_metering_area(p, ChangeSet({'id': 'd0', 'boundary': [(0.0, 0.0), (1.0, 0.0), (2.0, 2.0), (0.0, 0.0)]}))
set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': 'd0'}))
dma = get_district_metering_area(p, 'dma')
assert dma['id'] == 'dma'
assert dma['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert dma['parent'] == 'd0'
assert dma['nodes'] == []
assert dma['level'] == 2
set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'nodes': ['1']}))
dma = get_district_metering_area(p, 'dma')
assert dma['id'] == 'dma'
assert dma['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert dma['parent'] == 'd0'
assert dma['nodes'] == []
assert dma['level'] == 2
add_junction(p, ChangeSet({'id': 'j0', 'x': 0.0, 'y': 10.0, 'elevation': 20.0}))
set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'nodes': ['j0']}))
dma = get_district_metering_area(p, 'dma')
assert dma['id'] == 'dma'
assert dma['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert dma['parent'] == 'd0'
assert dma['nodes'] == ['j0']
assert dma['level'] == 2
delete_district_metering_area(p, ChangeSet({'id': 'dma'}))
dma = get_district_metering_area(p, 'dma')
assert dma == {}
add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': 'd0', 'nodes': ['j0']}))
dma = get_district_metering_area(p, 'dma')
assert dma['id'] == 'dma'
assert dma['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert dma['parent'] == 'd0'
assert dma['nodes'] == ['j0']
assert dma['level'] == 2
delete_district_metering_area(p, ChangeSet({'id': 'd0'}))
dma = get_district_metering_area(p, 'd0')
assert dma != {}
delete_district_metering_area(p, ChangeSet({'id': 'dma'}))
dma = get_district_metering_area(p, 'dma')
assert dma == {}
delete_district_metering_area(p, ChangeSet({'id': 'd0'}))
dma = get_district_metering_area(p, 'd0')
assert dma == {}
self.leave(p)
def test_district_metering_area_op(self):
p = 'test_district_metering_area_op'
self.enter(p)
cs = add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0)]})).operations
assert len(cs) == 0
cs = add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': '1'})).operations
assert len(cs) == 0
cs = add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]})).operations[0]
assert cs['operation'] == API_ADD
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == None
assert cs['nodes'] == []
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_DELETE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_ADD
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == None
assert cs['nodes'] == []
cs = set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': '1'})).operations
assert len(cs) == 0
add_district_metering_area(p, ChangeSet({'id': 'd0', 'boundary': [(0.0, 0.0), (1.0, 0.0), (2.0, 2.0), (0.0, 0.0)]}))
cs = set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': 'd0'})).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == 'd0'
assert cs['nodes'] == []
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == None
assert cs['nodes'] == []
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == 'd0'
assert cs['nodes'] == []
cs = set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'nodes': ['1']})).operations
assert len(cs) == 0
add_junction(p, ChangeSet({'id': 'j0', 'x': 0.0, 'y': 10.0, 'elevation': 20.0}))
cs = set_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'nodes': ['j0']})).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == 'd0'
assert cs['nodes'] == ['j0']
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == 'd0'
assert cs['nodes'] == []
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == 'd0'
assert cs['nodes'] == ['j0']
cs = delete_district_metering_area(p, ChangeSet({'id': 'xxx'})).operations
assert len(cs) == 0
cs = delete_district_metering_area(p, ChangeSet({'id': 'dma'})).operations[0]
assert cs['operation'] == API_DELETE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_ADD
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == 'd0'
assert cs['nodes'] == ['j0']
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_DELETE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
cs = add_district_metering_area(p, ChangeSet({'id': 'dma', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)], 'parent': 'd0', 'nodes': ['j0']})).operations[0]
assert cs['operation'] == API_ADD
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
assert cs['boundary'] == [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]
assert cs['parent'] == 'd0'
assert cs['nodes'] == ['j0']
cs = delete_district_metering_area(p, ChangeSet({'id': 'd0'})).operations
assert len(cs) == 0
cs = delete_district_metering_area(p, ChangeSet({'id': 'dma'})).operations[0]
assert cs['operation'] == API_DELETE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'dma'
cs = delete_district_metering_area(p, ChangeSet({'id': 'dma'})).operations
assert len(cs) == 0
cs = delete_district_metering_area(p, ChangeSet({'id': 'd0'})).operations[0]
assert cs['operation'] == API_DELETE
assert cs['type'] == 'district_metering_area'
assert cs['id'] == 'd0'
self.leave(p)
def test_district_metering_area_gen(self):
p = 'test_district_metering_area_gen'
read_inp(p, f'./inp/net3.inp', '3')
open_project(p)
cs = generate_district_metering_area(p, 3).operations
assert len(cs) == 3
dmas = get_all_district_metering_area_ids(p)
assert len(dmas) == 3
cs = generate_district_metering_area(p, 3).operations
assert len(cs) == 6
dmas = get_all_district_metering_area_ids(p)
assert len(dmas) == 3
cs = generate_sub_district_metering_area(p, 'DMA_1_1', 2).operations
assert len(cs) == 2
dmas = get_all_district_metering_area_ids(p)
assert len(dmas) == 5
cs = generate_district_metering_area(p, 3).operations
assert len(cs) == 8
dmas = get_all_district_metering_area_ids(p)
assert len(dmas) == 3
self.leave(p)
# 34 service_area

View File

@@ -1022,9 +1022,6 @@ def add_region(name: str, cs: ChangeSet) -> ChangeSet:
def delete_region(name: str, cs: ChangeSet) -> ChangeSet:
return api.delete_region(name, cs)
def get_all_regions(name: str) -> list[dict[str, Any]]:
return []
############################################################
# district_metering_area 33
@@ -1040,29 +1037,31 @@ def calculate_district_metering_area_for_network(name: str, part_count: int = 1,
return api.calculate_district_metering_area_for_network(name, part_count, part_type)
def get_district_metering_area_schema(name: str) -> dict[str, dict[str, Any]]:
return {}
return api.get_district_metering_area_schema(name)
def get_district_metering_area(name: str, id: str) -> dict[str, Any]:
return {}
return api.get_district_metering_area(name, id)
def set_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
return ChangeSet()
return api.set_district_metering_area(name, cs)
# example: add_district_metering_area(p, ChangeSet({'id': 'r', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]}))
def add_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
return ChangeSet()
return api.add_district_metering_area(name, cs)
def delete_district_metering_area(name: str, cs: ChangeSet) -> ChangeSet:
return ChangeSet()
return api.delete_district_metering_area(name, cs)
def get_all_district_metering_area_ids(name: str) -> list[str]:
return api.get_all_district_metering_area_ids(name)
def get_all_district_metering_areas(name: str) -> list[dict[str, Any]]:
return []
return api.get_all_district_metering_areas(name)
def generate_district_metering_area(name: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> ChangeSet:
return ChangeSet()
return api.generate_district_metering_area(name, part_count, part_type)
def generate_sub_district_metering_area(name: str, dma: str, part_count: int = 1, part_type: int = PARTITION_TYPE_RB) -> ChangeSet:
return ChangeSet()
return api.generate_sub_district_metering_area(name, dma, part_count, part_type)
############################################################