Add status api and test

This commit is contained in:
WQY\qiong
2022-10-22 11:51:48 +08:00
parent 6fa2f77a60
commit c1b99bc7eb
7 changed files with 171 additions and 5 deletions

View File

@@ -40,4 +40,7 @@ from .s7_valves import get_valve_schema, add_valve, get_valve, set_valve, delete
from .s9_demands import get_demand_schema, get_demand, set_demand
from .s10_status import LINK_STATUS_OPEN, LINK_STATUS_CLOSED, LINK_STATUS_ACTIVE
from .s10_status import get_status_schema, get_status, set_status
from .s24_coordinates import get_node_coord

View File

@@ -6,6 +6,7 @@ from .s5_pipes import *
from .s6_pumps import *
from .s7_valves import *
from .s9_demands import *
from .s10_status import *
def execute_add_command(name: str, cs: ChangeSet) -> ChangeSet:
@@ -46,6 +47,8 @@ def execute_update_command(name: str, cs: ChangeSet) -> ChangeSet:
return set_valve(name, cs)
elif type == 'demand':
return set_demand(name, cs)
elif type == 'status':
return set_status(name, cs)
return ChangeSet()

View File

@@ -57,6 +57,12 @@ def read_all(name: str, sql: str) -> list[Row]:
return cur.fetchall()
def try_read(name: str, sql: str) -> Row | None:
with conn[name].cursor(row_factory=dict_row) as cur:
cur.execute(sql)
return cur.fetchone()
def write(name: str, sql: str) -> None:
with conn[name].cursor() as cur:
cur.execute(sql)

64
api/s10_status.py Normal file
View File

@@ -0,0 +1,64 @@
from .operation import *
from .s0_base import *
LINK_STATUS_OPEN = 'OPEN'
LINK_STATUS_CLOSED = 'CLOSED'
LINK_STATUS_ACTIVE = 'ACTIVE'
def get_status_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'link' : {'type': 'str' , 'optional': False , 'readonly': True },
'status' : {'type': 'str' , 'optional': True , 'readonly': False},
'setting' : {'type': 'float' , 'optional': True , 'readonly': False} }
def get_status(name: str, link: str) -> dict[str, Any]:
s = try_read(name, f"select * from status where link = '{link}'")
if s == None:
return { 'link': link, 'status': None, 'setting': None }
d = {}
d['link'] = str(s['link'])
d['status'] = str(s['status']) if s['status'] != None else None
d['setting'] = float(s['setting']) if s['setting'] != None else None
return d
class Status(object):
def __init__(self, input: dict[str, Any]) -> None:
self.type = 'status'
self.link = str(input['link'])
self.status = str(input['status']) if 'status' in input and input['status'] != None else None
self.setting = float(input['setting']) if 'setting' in input and input['setting'] != None else None
self.f_type = f"'{self.type}'"
self.f_link = f"'{self.link}'"
self.f_status = f"'{self.status}'" if self.status != None else 'null'
self.f_setting = self.setting if self.setting != None else 'null'
def as_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'link': self.link, 'status': self.status, 'setting': self.setting }
def set_status(name: str, cs: ChangeSet) -> ChangeSet:
old = Status(get_status(name, cs.operations[0]['link']))
raw_new = get_status(name, cs.operations[0]['link'])
new_dict = cs.operations[0]
schema = get_status_schema(name)
for key, value in schema.items():
if key in new_dict and not value['readonly']:
raw_new[key] = new_dict[key]
new = Status(raw_new)
redo_sql = f"delete from status where link = {new.f_link};"
redo_sql += f"\ninsert into status (link, status, setting) values ({new.f_link}, {new.f_status}, {new.f_setting});"
undo_sql = f"delete from status where link = {old.f_link};"
if old.status != None or old.setting != None:
undo_sql += f"\ninsert into status (link, status, setting) values ({old.f_link}, {old.f_status}, {old.f_setting});"
redo_cs = g_update_prefix | new.as_dict()
undo_cs = g_update_prefix | old.as_dict()
return execute_command(name, redo_sql, undo_sql, redo_cs, undo_cs)