Add tag api and test

This commit is contained in:
WQY\qiong
2022-10-29 18:40:55 +08:00
parent e0e209ddd1
commit a04dcdd1ee
4 changed files with 185 additions and 0 deletions

View File

@@ -38,6 +38,9 @@ from .s6_pumps import get_pump_schema, add_pump, get_pump, set_pump, delete_pump
from .s7_valves import VALVES_TYPE_PRV, VALVES_TYPE_PSV, VALVES_TYPE_PBV, VALVES_TYPE_FCV, VALVES_TYPE_TCV, VALVES_TYPE_GPV
from .s7_valves import get_valve_schema, add_valve, get_valve, set_valve, delete_valve
from .s8_tags import TAG_TYPE_NODE, TAG_TYPE_LINK
from .s8_tags import get_tag_schema, get_tag, set_tag
from .s9_demands import get_demand_schema, get_demand, set_demand
from .s10_status import LINK_STATUS_OPEN, LINK_STATUS_CLOSED, LINK_STATUS_ACTIVE

79
api/s8_tags.py Normal file
View File

@@ -0,0 +1,79 @@
from .operation import *
TAG_TYPE_NODE = 'NODE'
TAG_TYPE_LINK = 'LINK'
def get_tag_schema(name: str) -> dict[str, dict[str, Any]]:
return { 't_type' : {'type': 'str' , 'optional': False , 'readonly': False},
'id' : {'type': 'str' , 'optional': False , 'readonly': False},
'tag' : {'type': 'str' , 'optional': True , 'readonly': False},}
def get_tag(name: str, t_type: str, id: str) -> dict[str, Any]:
t = None
if t_type == TAG_TYPE_NODE:
t = try_read(name, f"select * from tags_node where id = '{id}'")
elif t_type == TAG_TYPE_LINK:
t = try_read(name, f"select * from tags_link where id = '{id}'")
else:
raise Exception('Only support NODE and Link')
if t == None:
return { 't_type': t_type, 'id': id, 'tag': None }
d = {}
d['t_type'] = t_type
d['id'] = str(t['id'])
d['tag'] = str(t['tag']) if t['tag'] != None else None
return d
class Tag(object):
def __init__(self, input: dict[str, Any]) -> None:
self.type = 'tag'
self.t_type = str(input['t_type'])
self.id = str(input['id'])
self.tag = str(input['tag']) if 'tag' in input and input['tag'] != None else None
self.f_type = f"'{self.type}'"
self.f_t_type = f"'{self.t_type}'"
self.f_id = f"'{self.id}'"
self.f_tag = f"'{self.tag}'" if self.tag != None else 'null'
def as_dict(self) -> dict[str, Any]:
return { 'type': self.type, 't_type': self.t_type, 'id': self.id, 'tag': self.tag }
def set_tag_cache(name: str, cs: ChangeSet) -> SqlChangeSet:
old = Tag(get_tag(name, cs.operations[0]['t_type'], cs.operations[0]['id']))
raw_new = get_tag(name, cs.operations[0]['t_type'], cs.operations[0]['id'])
new_dict = cs.operations[0]
schema = get_tag_schema(name)
for key, value in schema.items():
if key in new_dict and not value['readonly']:
raw_new[key] = new_dict[key]
new = Tag(raw_new)
table = ''
if cs.operations[0]['t_type'] == TAG_TYPE_NODE:
table = 'tags_node'
elif cs.operations[0]['t_type'] == TAG_TYPE_LINK:
table = 'tags_link'
else:
raise Exception('Only support NODE and Link')
redo_sql = f"delete from {table} where id = {new.f_id};"
if new.tag != None:
redo_sql += f"\ninsert into {table} (id, tag) values ({new.f_id}, {new.f_tag});"
undo_sql = f"delete from {table} where id = {old.f_id};"
if old.tag != None:
undo_sql += f"\ninsert into {table} (id, tag) values ({old.f_id}, {old.f_tag});"
redo_cs = g_update_prefix | new.as_dict()
undo_cs = g_update_prefix | old.as_dict()
return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs)
def set_tag(name: str, cs: ChangeSet) -> ChangeSet:
return execute_command(name, set_tag_cache(name, cs))