Add emitter api and test

This commit is contained in:
WQY\qiong
2022-10-22 14:00:22 +08:00
parent 40e97fd166
commit eca95309ed
5 changed files with 132 additions and 1 deletions

View File

@@ -47,4 +47,6 @@ from .s11_patterns import get_pattern_schema, get_pattern, set_pattern
from .s12_curves import get_curve_schema, get_curve, set_curve
from .s16_emitters import get_emitter_schema, get_emitter, set_emitter
from .s24_coordinates import get_node_coord

View File

@@ -9,6 +9,7 @@ from .s9_demands import *
from .s10_status import *
from .s11_patterns import *
from .s12_curves import *
from .s16_emitters import *
def execute_add_command(name: str, cs: ChangeSet) -> ChangeSet:
@@ -55,6 +56,8 @@ def execute_update_command(name: str, cs: ChangeSet) -> ChangeSet:
return set_pattern(name, cs)
elif type == CURVE:
return set_curve(name, cs)
elif type == 'emitter':
return set_emitter(name, cs)
return ChangeSet()

56
api/s16_emitters.py Normal file
View File

@@ -0,0 +1,56 @@
from .operation import *
from .s0_base import *
def get_emitter_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'junction' : {'type': 'str' , 'optional': False , 'readonly': True },
'coefficient' : {'type': 'float' , 'optional': True , 'readonly': False} }
def get_emitter(name: str, junction: str) -> dict[str, Any]:
e = try_read(name, f"select * from emitters where junction = '{junction}'")
if e == None:
return { 'junction': junction, 'coefficient': None }
d = {}
d['junction'] = str(e['junction'])
d['coefficient'] = float(e['coefficient']) if e['coefficient'] != None else None
return d
class Emitter(object):
def __init__(self, input: dict[str, Any]) -> None:
self.type = 'emitter'
self.junction = str(input['junction'])
self.coefficient = float(input['coefficient']) if 'coefficient' in input and input['coefficient'] != None else None
self.f_type = f"'{self.type}'"
self.f_junction = f"'{self.junction}'"
self.f_coefficient = self.coefficient if self.coefficient != None else 'null'
def as_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'junction': self.junction, 'coefficient': self.coefficient }
def set_emitter(name: str, cs: ChangeSet) -> ChangeSet:
old = Emitter(get_emitter(name, cs.operations[0]['junction']))
raw_new = get_emitter(name, cs.operations[0]['junction'])
new_dict = cs.operations[0]
schema = get_emitter_schema(name)
for key, value in schema.items():
if key in new_dict and not value['readonly']:
raw_new[key] = new_dict[key]
new = Emitter(raw_new)
redo_sql = f"delete from emitters where junction = {new.f_junction};"
if new.coefficient != None:
redo_sql += f"\ninsert into emitters (junction, coefficient) values ({new.f_junction}, {new.f_coefficient});"
undo_sql = f"delete from emitters where junction = {old.f_junction};"
if old.coefficient != None:
undo_sql += f"\ninsert into emitters (junction, coefficient) values ({old.f_junction}, {old.f_coefficient});"
redo_cs = g_update_prefix | new.as_dict()
undo_cs = g_update_prefix | old.as_dict()
return execute_command(name, redo_sql, undo_sql, redo_cs, undo_cs)

View File

@@ -1187,6 +1187,7 @@ class TestApi:
set_demand(p, ChangeSet({'junction': 'j1', 'demands': [{'demand': 10.0, 'pattern': None, 'category': 'x'},
{'demand': 20.0, 'pattern': None, 'category': None}]}))
d = get_demand(p, 'j1')
assert d['junction'] == 'j1'
ds = d['demands']
@@ -1199,6 +1200,7 @@ class TestApi:
assert ds[1]['category'] == None
set_demand(p, ChangeSet({'junction': 'j1', 'demands': []}))
d = get_demand(p, 'j1')
assert d['junction'] == 'j1'
assert d['demands'] == []
@@ -1444,6 +1446,60 @@ class TestApi:
self.leave(p)
def test_emitter(self):
p = 'test_emitter'
self.enter(p)
add_junction(p, ChangeSet({'id': 'j1', 'x': 0.0, 'y': 10.0, 'elevation': 20.0}))
assert is_junction(p, 'j1')
e = get_emitter(p, 'j1')
assert e['junction'] == 'j1'
assert e['coefficient'] == None
set_emitter(p, ChangeSet({'junction': 'j1', 'coefficient': 10.0}))
e = get_emitter(p, 'j1')
assert e['junction'] == 'j1'
assert e['coefficient'] == 10.0
set_emitter(p, ChangeSet({'junction': 'j1', 'coefficient': None}))
e = get_emitter(p, 'j1')
assert e['junction'] == 'j1'
assert e['coefficient'] == None
self.leave(p)
def test_emitter_op(self):
p = 'test_emitter_op'
self.enter(p)
add_junction(p, ChangeSet({'id': 'j1', 'x': 0.0, 'y': 10.0, 'elevation': 20.0}))
assert is_junction(p, 'j1')
cs = set_emitter(p, ChangeSet({'junction': 'j1', 'coefficient': 10.0})).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'emitter'
assert cs['junction'] == 'j1'
assert cs['coefficient'] == 10.0
cs = execute_undo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'emitter'
assert cs['junction'] == 'j1'
assert cs['coefficient'] == None
cs = execute_redo(p).operations[0]
assert cs['operation'] == API_UPDATE
assert cs['type'] == 'emitter'
assert cs['junction'] == 'j1'
assert cs['coefficient'] == 10.0
self.leave(p)
def test_snapshot(self):
p = "test_snapshot"
self.enter(p)

View File

@@ -340,7 +340,7 @@ def set_pattern(name: str, cs: ChangeSet) -> ChangeSet:
############################################################
# curve 11.[CURVES]
# curve 12.[CURVES]
############################################################
def get_curve_schema(name: str) -> dict[str, dict[str, Any]]:
@@ -353,6 +353,20 @@ def set_curve(name: str, cs: ChangeSet) -> ChangeSet:
return api.set_curve(name, cs)
############################################################
# emitter 16.[EMITTERS]
############################################################
def get_emitter_schema(name: str) -> dict[str, dict[str, Any]]:
return api.get_emitter_schema(name)
def get_emitter(name: str, id: str) -> dict[str, Any]:
return api.get_emitter(name, id)
def set_emitter(name: str, cs: ChangeSet) -> ChangeSet:
return api.set_emitter(name, cs)
############################################################
# coord 24.[COORDINATES]
############################################################