Add source api and test
This commit is contained in:
@@ -62,6 +62,9 @@ from .s16_emitters import get_emitter_schema, get_emitter, set_emitter
|
||||
|
||||
from .s17_quality import get_quality_schema, get_quality, set_quality
|
||||
|
||||
from .s18_sources import SOURCE_TYPE_CONCEN, SOURCE_TYPE_MASS, SOURCE_TYPE_FLOWPACED, SOURCE_TYPE_SETPOINT
|
||||
from .s18_sources import get_source_schema, get_source, set_source, add_source, delete_source
|
||||
|
||||
from .s21_times import TIME_STATISTIC_NONE, TIME_STATISTIC_AVERAGED, TIME_STATISTIC_MINIMUM, TIME_STATISTIC_MAXIMUM, TIME_STATISTIC_RANGE
|
||||
from .s21_times import get_time_schema, get_time, set_time
|
||||
|
||||
|
||||
100
api/s18_sources.py
Normal file
100
api/s18_sources.py
Normal file
@@ -0,0 +1,100 @@
|
||||
from .operation import *
|
||||
from .s0_base import *
|
||||
|
||||
SOURCE_TYPE_CONCEN = 'CONCEN'
|
||||
SOURCE_TYPE_MASS = 'MASS'
|
||||
SOURCE_TYPE_FLOWPACED = 'FLOWPACED'
|
||||
SOURCE_TYPE_SETPOINT = 'SETPOINT'
|
||||
|
||||
def get_source_schema(name: str) -> dict[str, dict[str, Any]]:
|
||||
return { 'node' : {'type': 'str' , 'optional': False , 'readonly': True },
|
||||
's_type' : {'type': 'str' , 'optional': False , 'readonly': False},
|
||||
'strength' : {'type': 'float' , 'optional': False , 'readonly': False},
|
||||
'pattern' : {'type': 'str' , 'optional': True , 'readonly': False} }
|
||||
|
||||
|
||||
def get_source(name: str, node: str) -> dict[str, Any]:
|
||||
s = read(name, f"select * from sources where node = '{node}'")
|
||||
d = {}
|
||||
d['node'] = str(s['node'])
|
||||
d['s_type'] = str(s['type'])
|
||||
d['strength'] = float(s['strength'])
|
||||
d['pattern'] = str(s['pattern']) if s['pattern'] != None else None
|
||||
return d
|
||||
|
||||
|
||||
class Source(object):
|
||||
def __init__(self, input: dict[str, Any]) -> None:
|
||||
self.type = 'source'
|
||||
self.node = str(input['node'])
|
||||
self.s_type = str(input['s_type'])
|
||||
self.strength = float(input['strength'])
|
||||
self.pattern = str(input['pattern']) if 'pattern' in input and input['pattern'] != None else None
|
||||
|
||||
self.f_type = f"'{self.type}'"
|
||||
self.f_node = f"'{self.node}'"
|
||||
self.f_s_type = f"'{self.s_type}'"
|
||||
self.f_strength = self.strength
|
||||
self.f_pattern = f"'{self.pattern}'" if self.pattern != None else 'null'
|
||||
|
||||
def as_dict(self) -> dict[str, Any]:
|
||||
return { 'type': self.type, 'node': self.node, 's_type': self.s_type, 'strength': self.strength, 'pattern': self.pattern }
|
||||
|
||||
def as_id_dict(self) -> dict[str, Any]:
|
||||
return { 'type': self.type, 'node': self.node }
|
||||
|
||||
|
||||
def set_source_cache(name: str, cs: ChangeSet) -> SqlChangeSet:
|
||||
old = Source(get_source(name, cs.operations[0]['node']))
|
||||
raw_new = get_source(name, cs.operations[0]['node'])
|
||||
|
||||
new_dict = cs.operations[0]
|
||||
schema = get_source_schema(name)
|
||||
for key, value in schema.items():
|
||||
if key in new_dict and not value['readonly']:
|
||||
raw_new[key] = new_dict[key]
|
||||
new = Source(raw_new)
|
||||
|
||||
redo_sql = f"update sources set type = {new.f_s_type}, strength = {new.f_strength}, pattern = {new.f_pattern} where node = {new.f_node};"
|
||||
undo_sql = f"update sources set type = {old.f_s_type}, strength = {old.f_strength}, pattern = {old.f_pattern} where node = {old.f_node};"
|
||||
|
||||
redo_cs = g_update_prefix | new.as_dict()
|
||||
undo_cs = g_update_prefix | old.as_dict()
|
||||
|
||||
return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs)
|
||||
|
||||
|
||||
def set_source(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
return execute_command(name, set_source_cache(name, cs))
|
||||
|
||||
|
||||
def add_source_cache(name: str, cs: ChangeSet) -> SqlChangeSet:
|
||||
new = Source(cs.operations[0])
|
||||
|
||||
redo_sql = f"insert into sources (node, type, strength, pattern) values ({new.f_node}, {new.f_s_type}, {new.f_strength}, {new.f_pattern});"
|
||||
undo_sql = f"delete from sources where node = {new.f_node};"
|
||||
|
||||
redo_cs = g_add_prefix | new.as_dict()
|
||||
undo_cs = g_delete_prefix | new.as_id_dict()
|
||||
|
||||
return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs)
|
||||
|
||||
|
||||
def add_source(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
return execute_command(name, add_source_cache(name, cs))
|
||||
|
||||
|
||||
def delete_source_cache(name: str, cs: ChangeSet) -> SqlChangeSet:
|
||||
old = Source(get_source(name, cs.operations[0]['node']))
|
||||
|
||||
redo_sql = f"delete from sources where node = {old.f_node};"
|
||||
undo_sql = f"insert into sources (node, type, strength, pattern) values ({old.f_node}, {old.f_s_type}, {old.f_strength}, {old.f_pattern});"
|
||||
|
||||
redo_cs = g_delete_prefix | old.as_id_dict()
|
||||
undo_cs = g_add_prefix | old.as_dict()
|
||||
|
||||
return SqlChangeSet(redo_sql, undo_sql, redo_cs, undo_cs)
|
||||
|
||||
|
||||
def delete_source(name: str, cs: ChangeSet) -> ChangeSet:
|
||||
return execute_command(name, delete_source_cache(name, cs))
|
||||
Reference in New Issue
Block a user