198 lines
8.0 KiB
Python
198 lines
8.0 KiB
Python
from .database import *
|
||
from .s0_base import *
|
||
|
||
|
||
SCADA_TYPE_PRESSURE = 'PRESSURE'
|
||
SCADA_TYPE_DEMAND = 'DEMAND'
|
||
SCADA_TYPE_QUALITY = 'QUALITY'
|
||
SCADA_TYPE_LEVEL = 'LEVEL'
|
||
SCADA_TYPE_FLOW = 'FLOW'
|
||
|
||
|
||
SCADA_MODEL_TYPE_JUNCTION = 'JUNCTION'
|
||
SCADA_MODEL_TYPE_RESERVOIR = 'RESERVOIR'
|
||
SCADA_MODEL_TYPE_TANK = 'TANK'
|
||
SCADA_MODEL_TYPE_PIPE = 'PIPE'
|
||
SCADA_MODEL_TYPE_PUMP = 'PUMP'
|
||
SCADA_MODEL_TYPE_VALVE = 'VALVE'
|
||
|
||
|
||
SCADA_ELEMENT_STATUS_OFFLINE = 'OFF'
|
||
SCADA_ELEMENT_STATUS_ONLINE = 'ON'
|
||
|
||
|
||
_scada_model_types = [SCADA_MODEL_TYPE_JUNCTION, SCADA_MODEL_TYPE_RESERVOIR, SCADA_MODEL_TYPE_TANK, SCADA_MODEL_TYPE_PIPE, SCADA_MODEL_TYPE_PUMP, SCADA_MODEL_TYPE_VALVE]
|
||
|
||
|
||
def _check_model(name: str, cs: ChangeSet) -> bool:
|
||
has_model_id = 'model_id' in cs.operations[0]
|
||
has_model_type = 'model_type' in cs.operations[0]
|
||
|
||
if has_model_id and has_model_type:
|
||
pass
|
||
elif has_model_id and not has_model_type:
|
||
return False
|
||
elif not has_model_id and has_model_type:
|
||
return False
|
||
elif not has_model_id and not has_model_type:
|
||
return True
|
||
|
||
_model_id = cs.operations[0]['model_id']
|
||
_model_type = cs.operations[0]['model_type']
|
||
if _model_type == SCADA_MODEL_TYPE_JUNCTION:
|
||
return is_junction(name, _model_id)
|
||
elif _model_type == SCADA_MODEL_TYPE_RESERVOIR:
|
||
return is_reservoir(name, _model_id)
|
||
elif _model_type == SCADA_MODEL_TYPE_TANK:
|
||
return is_tank(name, _model_id)
|
||
elif _model_type == SCADA_MODEL_TYPE_PIPE:
|
||
return is_pipe(name, _model_id)
|
||
elif _model_type == SCADA_MODEL_TYPE_PUMP:
|
||
return is_pump(name, _model_id)
|
||
elif _model_type == SCADA_MODEL_TYPE_VALVE:
|
||
return is_valve(name, _model_id)
|
||
return False
|
||
|
||
|
||
def get_scada_element_schema(name: str) -> dict[str, dict[str, Any]]:
|
||
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
|
||
'x' : {'type': 'float' , 'optional': False , 'readonly': False},
|
||
'y' : {'type': 'float' , 'optional': False , 'readonly': False},
|
||
'device_id' : {'type': 'str' , 'optional': True , 'readonly': False},
|
||
'model_id' : {'type': 'str' , 'optional': True , 'readonly': False},
|
||
'model_type' : {'type': 'str' , 'optional': True , 'readonly': False},
|
||
'status' : {'type': 'str' , 'optional': True , 'readonly': False} }
|
||
|
||
|
||
def get_scada_element(name: str, id: str) -> dict[str, Any]:
|
||
sm = try_read(name, f"select * from scada_element where id = '{id}'")
|
||
if sm == None:
|
||
return {}
|
||
d = {}
|
||
d['id'] = str(sm['id'])
|
||
d['x'] = float(sm['x'])
|
||
d['y'] = float(sm['y'])
|
||
d['device_id'] = str(sm['device_id']) if sm['device_id'] != None else None
|
||
d['model_id'] = str(sm['model_id']) if sm['model_id'] != None else None
|
||
d['model_type'] = str(sm['model_type']) if sm['model_type'] != None else None
|
||
d['status'] = str(sm['status'])
|
||
return d
|
||
|
||
|
||
class ScadaModel(object):
|
||
def __init__(self, input: dict[str, Any]) -> None:
|
||
self.type = 'scada_element'
|
||
self.id = str(input['id'])
|
||
self.x = float(input['x'])
|
||
self.y = float(input['y'])
|
||
self.device_id = str(input['device_id']) if 'device_id' in input and input['device_id'] != None else None
|
||
self.model_id = str(input['model_id']) if 'model_id' in input and input['model_id'] != None else None
|
||
self.model_type = str(input['model_type']) if 'model_type' in input and input['model_type'] != None else None
|
||
self.status = str(input['status']) if 'status' in input and input['status'] != None else SCADA_ELEMENT_STATUS_OFFLINE
|
||
|
||
self.f_type = f"'{self.type}'"
|
||
self.f_id = f"'{self.id}'"
|
||
self.f_x = self.x
|
||
self.f_y = self.y
|
||
self.f_device_id = f"'{self.device_id}'" if self.device_id != None else 'null'
|
||
self.f_model_id = f"'{self.model_id}'" if self.model_id != None else 'null'
|
||
self.f_model_type = f"'{self.model_type}'" if self.model_type != None else 'null'
|
||
self.f_status = f"'{self.status}'"
|
||
|
||
def as_dict(self) -> dict[str, Any]:
|
||
return { 'type': self.type, 'id': self.id, 'x': self.x, 'y': self.y, 'device_id': self.device_id, 'model_id': self.model_id, 'model_type': self.model_type, 'status': self.status }
|
||
|
||
def as_id_dict(self) -> dict[str, Any]:
|
||
return { 'type': self.type, 'id': self.id }
|
||
|
||
|
||
def _set_scada_element(name: str, cs: ChangeSet) -> DbChangeSet:
|
||
old = ScadaModel(get_scada_element(name, cs.operations[0]['id']))
|
||
raw_new = get_scada_element(name, cs.operations[0]['id'])
|
||
|
||
new_dict = cs.operations[0]
|
||
schema = get_scada_element_schema(name)
|
||
for key, value in schema.items():
|
||
if key in new_dict and not value['readonly']:
|
||
raw_new[key] = new_dict[key]
|
||
new = ScadaModel(raw_new)
|
||
|
||
redo_sql = f"update scada_element set x = {new.f_x}, y = {new.f_y}, device_id = {new.f_device_id}, model_id = {new.f_model_id}, model_type = {new.f_model_type}, status = {new.f_status} where id = {new.f_id};"
|
||
undo_sql = f"update scada_element set x = {old.f_x}, y = {old.f_y}, device_id = {old.f_device_id}, model_id = {old.f_model_id}, model_type = {old.f_model_type}, status = {old.f_status} where id = {old.f_id};"
|
||
|
||
redo_cs = g_update_prefix | new.as_dict()
|
||
undo_cs = g_update_prefix | old.as_dict()
|
||
|
||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||
|
||
|
||
def set_scada_element(name: str, cs: ChangeSet) -> ChangeSet:
|
||
if get_scada_element(name, cs.operations[0]['id']) == {}:
|
||
return ChangeSet()
|
||
if _check_model(name, cs) == False:
|
||
return ChangeSet()
|
||
return execute_command(name, _set_scada_element(name, cs))
|
||
|
||
|
||
def _add_scada_element(name: str, cs: ChangeSet) -> DbChangeSet:
|
||
new = ScadaModel(cs.operations[0])
|
||
|
||
redo_sql = f"insert into scada_element (id, x, y, device_id, model_id, model_type, status) values ({new.f_id}, {new.f_x}, {new.f_y}, {new.f_device_id}, {new.f_model_id}, {new.f_model_type}, {new.f_status});"
|
||
undo_sql = f"delete from scada_element where id = {new.f_id};"
|
||
|
||
redo_cs = g_add_prefix | new.as_dict()
|
||
undo_cs = g_delete_prefix | new.as_id_dict()
|
||
|
||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||
|
||
|
||
def add_scada_element(name: str, cs: ChangeSet) -> ChangeSet:
|
||
if get_scada_element(name, cs.operations[0]['id']) != {}:
|
||
return ChangeSet()
|
||
if _check_model(name, cs) == False:
|
||
return ChangeSet()
|
||
return execute_command(name, _add_scada_element(name, cs))
|
||
|
||
|
||
def _delete_scada_element(name: str, cs: ChangeSet) -> DbChangeSet:
|
||
old = ScadaModel(get_scada_element(name, cs.operations[0]['id']))
|
||
|
||
redo_sql = f"delete from scada_element where id = {old.f_id};"
|
||
undo_sql = f"insert into scada_element (id, x, y, device_id, model_id, model_type, status) values ({old.f_id}, {old.f_x}, {old.f_y}, {old.f_device_id}, {old.f_model_id}, {old.f_model_type}, {old.f_status});"
|
||
|
||
redo_cs = g_delete_prefix | old.as_id_dict()
|
||
undo_cs = g_add_prefix | old.as_dict()
|
||
|
||
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
||
|
||
|
||
def delete_scada_element(name: str, cs: ChangeSet) -> ChangeSet:
|
||
if get_scada_element(name, cs.operations[0]['id']) == {}:
|
||
return ChangeSet()
|
||
return execute_command(name, _delete_scada_element(name, cs))
|
||
|
||
|
||
def get_all_scada_element_ids(name: str) -> list[str]:
|
||
result : list[str] = []
|
||
rows = read_all(name, 'select id from scada_element order by id')
|
||
for row in rows:
|
||
result.append(str(row['id']))
|
||
return result
|
||
|
||
#
|
||
# create table scada_element
|
||
# (
|
||
# id text primary key
|
||
# , x float8 not null
|
||
# , y float8 not null
|
||
# , device_id text references scada_device(id)
|
||
# , model_id varchar(32) -- add constraint in API
|
||
# , model_type scada_model_type
|
||
# , status scada_element_status not null default 'OFF'
|
||
# );
|
||
#
|
||
# 返回list,list里每个item是dict,内容是 'id':'abc' 这样
|
||
# scada_model type 是类似pressure,flow之类的,是由Device 决定 的
|
||
def get_all_scada_elements(name: str) -> list[dict[str, Any]]:
|
||
return read_all(name, 'select * from scada_element order by id')
|