Files
TJWaterServer/api/s29_scada_model.py
2023-02-16 21:21:21 +08:00

147 lines
6.8 KiB
Python

from .database import *
from .s0_base import is_node, is_link
SCADA_TYPE_PRESSURE = 'PRESSURE'
SCADA_TYPE_DEMAND = 'DEMAND'
SCADA_TYPE_QUALITY = 'QUALITY'
SCADA_TYPE_LEVEL = 'LEVEL'
SCADA_TYPE_FLOW = 'FLOW'
SCADA_STATUS_OFFLINE = 'OFF'
SCADA_STATUS_ONLINE = 'ON'
def _check_model_id(name: str, cs: ChangeSet) -> bool:
if 'model_id' not in cs.operations[0]:
return True
if cs.operations[0]['model_id'] == None:
return True
model_id = cs.operations[0]['model_id']
return is_node(name, model_id) or is_link(name, model_id)
def get_scada_model_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
'x' : {'type': 'float' , 'optional': False , 'readonly': False},
'y' : {'type': 'float' , 'optional': False , 'readonly': False},
'device_id' : {'type': 'str' , 'optional': False , 'readonly': False},
'device_name' : {'type': 'str' , 'optional': True , 'readonly': False},
'address' : {'type': 'str' , 'optional': True , 'readonly': False},
'sm_type' : {'type': 'str' , 'optional': True , 'readonly': False},
'model_id' : {'type': 'str' , 'optional': True , 'readonly': False},
'status' : {'type': 'str' , 'optional': True , 'readonly': False} }
def get_scada_model(name: str, id: str) -> dict[str, Any]:
sm = try_read(name, f"select * from scada_model where id = '{id}'")
if sm == None:
return {}
d = {}
d['id'] = str(sm['id'])
d['x'] = float(sm['x'])
d['y'] = float(sm['y'])
d['device_id'] = str(sm['device_id'])
d['device_name'] = str(sm['device_name']) if sm['device_name'] != None else None
d['address'] = str(sm['address']) if sm['address'] != None else None
d['sm_type'] = str(sm['type']) if sm['type'] != None else None
d['model_id'] = str(sm['model_id']) if sm['model_id'] != None else None
d['status'] = str(sm['status'])
return d
class ScadaModel(object):
def __init__(self, input: dict[str, Any]) -> None:
self.type = 'scada_model'
self.id = str(input['id'])
self.x = float(input['x'])
self.y = float(input['y'])
self.device_id = str(input['device_id'])
self.device_name = str(input['device_name']) if 'device_name' in input and input['device_name'] != None else None
self.address = str(input['address']) if 'address' in input and input['address'] != None else None
self.sm_type = str(input['sm_type']) if 'sm_type' in input and input['sm_type'] != None else None
self.model_id = str(input['model_id']) if 'model_id' in input and input['model_id'] != None else None
self.status = str(input['status']) if 'status' in input and input['status'] != None else SCADA_STATUS_OFFLINE
self.f_type = f"'{self.type}'"
self.f_id = f"'{self.id}'"
self.f_x = self.x
self.f_y = self.y
self.f_device_id = f"'{self.device_id}'"
self.f_device_name = f"'{self.device_name}'" if self.device_name != None else 'null'
self.f_address = f"'{self.address}'" if self.address != None else 'null'
self.f_sm_type = f"'{self.sm_type}'" if self.sm_type != None else 'null'
self.f_model_id = f"'{self.model_id}'" if self.model_id != None else 'null'
self.f_status = f"'{self.status}'"
def as_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'id': self.id, 'x': self.x, 'y': self.y, 'device_id': self.device_id, 'device_name': self.device_name, 'address': self.address, 'sm_type': self.sm_type, 'model_id': self.model_id, 'status': self.status }
def as_id_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'id': self.id }
def set_scada_model_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
old = ScadaModel(get_scada_model(name, cs.operations[0]['id']))
raw_new = get_scada_model(name, cs.operations[0]['id'])
new_dict = cs.operations[0]
schema = get_scada_model_schema(name)
for key, value in schema.items():
if key in new_dict and not value['readonly']:
raw_new[key] = new_dict[key]
new = ScadaModel(raw_new)
redo_sql = f"update scada_model set x = {new.f_x}, y = {new.f_y}, device_id = {new.f_device_id}, device_name = {new.f_device_name}, address = {new.f_address}, type = {new.f_sm_type}, model_id = {new.f_model_id}, status = {new.f_status} where id = {new.f_id};"
undo_sql = f"update scada_model set x = {old.f_x}, y = {old.f_y}, device_id = {old.f_device_id}, device_name = {old.f_device_name}, address = {old.f_address}, type = {old.f_sm_type}, model_id = {old.f_model_id}, status = {old.f_status} where id = {old.f_id};"
redo_cs = g_update_prefix | new.as_dict()
undo_cs = g_update_prefix | old.as_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def set_scada_model(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_model(name, cs.operations[0]['id']) == {}:
return ChangeSet()
if _check_model_id(name, cs) == False:
return ChangeSet()
return execute_command(name, set_scada_model_cmd(name, cs))
def add_scada_model_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
new = ScadaModel(cs.operations[0])
redo_sql = f"insert into scada_model (id, x, y, device_id, device_name, address, type, model_id, status) values ({new.f_id}, {new.f_x}, {new.f_y}, {new.f_device_id}, {new.f_device_name}, {new.f_address}, {new.f_sm_type}, {new.f_model_id}, {new.f_status});"
undo_sql = f"delete from scada_model where id = {new.f_id};"
redo_cs = g_add_prefix | new.as_dict()
undo_cs = g_delete_prefix | new.as_id_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def add_scada_model(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_model(name, cs.operations[0]['id']) != {}:
return ChangeSet()
if _check_model_id(name, cs) == False:
return ChangeSet()
return execute_command(name, add_scada_model_cmd(name, cs))
def delete_scada_model_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
old = ScadaModel(get_scada_model(name, cs.operations[0]['id']))
redo_sql = f"delete from scada_model where id = {old.f_id};"
undo_sql = f"insert into scada_model (id, x, y, device_id, device_name, address, type, model_id, status) values ({old.f_id}, {old.f_x}, {old.f_y}, {old.f_device_id}, {old.f_device_name}, {old.f_address}, {old.f_sm_type}, {old.f_model_id}, {old.f_status});"
redo_cs = g_delete_prefix | old.as_id_dict()
undo_cs = g_add_prefix | old.as_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def delete_scada_model(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_model(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, delete_scada_model_cmd(name, cs))