Files
TJWaterServer/api/s31_scada_element.py
2025-01-31 13:00:13 +08:00

198 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from .database import *
from .s0_base import *
SCADA_TYPE_PRESSURE = 'PRESSURE'
SCADA_TYPE_DEMAND = 'DEMAND'
SCADA_TYPE_QUALITY = 'QUALITY'
SCADA_TYPE_LEVEL = 'LEVEL'
SCADA_TYPE_FLOW = 'FLOW'
SCADA_MODEL_TYPE_JUNCTION = 'JUNCTION'
SCADA_MODEL_TYPE_RESERVOIR = 'RESERVOIR'
SCADA_MODEL_TYPE_TANK = 'TANK'
SCADA_MODEL_TYPE_PIPE = 'PIPE'
SCADA_MODEL_TYPE_PUMP = 'PUMP'
SCADA_MODEL_TYPE_VALVE = 'VALVE'
SCADA_ELEMENT_STATUS_OFFLINE = 'OFF'
SCADA_ELEMENT_STATUS_ONLINE = 'ON'
_scada_model_types = [SCADA_MODEL_TYPE_JUNCTION, SCADA_MODEL_TYPE_RESERVOIR, SCADA_MODEL_TYPE_TANK, SCADA_MODEL_TYPE_PIPE, SCADA_MODEL_TYPE_PUMP, SCADA_MODEL_TYPE_VALVE]
def _check_model(name: str, cs: ChangeSet) -> bool:
has_model_id = 'model_id' in cs.operations[0]
has_model_type = 'model_type' in cs.operations[0]
if has_model_id and has_model_type:
pass
elif has_model_id and not has_model_type:
return False
elif not has_model_id and has_model_type:
return False
elif not has_model_id and not has_model_type:
return True
_model_id = cs.operations[0]['model_id']
_model_type = cs.operations[0]['model_type']
if _model_type == SCADA_MODEL_TYPE_JUNCTION:
return is_junction(name, _model_id)
elif _model_type == SCADA_MODEL_TYPE_RESERVOIR:
return is_reservoir(name, _model_id)
elif _model_type == SCADA_MODEL_TYPE_TANK:
return is_tank(name, _model_id)
elif _model_type == SCADA_MODEL_TYPE_PIPE:
return is_pipe(name, _model_id)
elif _model_type == SCADA_MODEL_TYPE_PUMP:
return is_pump(name, _model_id)
elif _model_type == SCADA_MODEL_TYPE_VALVE:
return is_valve(name, _model_id)
return False
def get_scada_element_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
'x' : {'type': 'float' , 'optional': False , 'readonly': False},
'y' : {'type': 'float' , 'optional': False , 'readonly': False},
'device_id' : {'type': 'str' , 'optional': True , 'readonly': False},
'model_id' : {'type': 'str' , 'optional': True , 'readonly': False},
'model_type' : {'type': 'str' , 'optional': True , 'readonly': False},
'status' : {'type': 'str' , 'optional': True , 'readonly': False} }
def get_scada_element(name: str, id: str) -> dict[str, Any]:
sm = try_read(name, f"select * from scada_element where id = '{id}'")
if sm == None:
return {}
d = {}
d['id'] = str(sm['id'])
d['x'] = float(sm['x'])
d['y'] = float(sm['y'])
d['device_id'] = str(sm['device_id']) if sm['device_id'] != None else None
d['model_id'] = str(sm['model_id']) if sm['model_id'] != None else None
d['model_type'] = str(sm['model_type']) if sm['model_type'] != None else None
d['status'] = str(sm['status'])
return d
class ScadaModel(object):
def __init__(self, input: dict[str, Any]) -> None:
self.type = 'scada_element'
self.id = str(input['id'])
self.x = float(input['x'])
self.y = float(input['y'])
self.device_id = str(input['device_id']) if 'device_id' in input and input['device_id'] != None else None
self.model_id = str(input['model_id']) if 'model_id' in input and input['model_id'] != None else None
self.model_type = str(input['model_type']) if 'model_type' in input and input['model_type'] != None else None
self.status = str(input['status']) if 'status' in input and input['status'] != None else SCADA_ELEMENT_STATUS_OFFLINE
self.f_type = f"'{self.type}'"
self.f_id = f"'{self.id}'"
self.f_x = self.x
self.f_y = self.y
self.f_device_id = f"'{self.device_id}'" if self.device_id != None else 'null'
self.f_model_id = f"'{self.model_id}'" if self.model_id != None else 'null'
self.f_model_type = f"'{self.model_type}'" if self.model_type != None else 'null'
self.f_status = f"'{self.status}'"
def as_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'id': self.id, 'x': self.x, 'y': self.y, 'device_id': self.device_id, 'model_id': self.model_id, 'model_type': self.model_type, 'status': self.status }
def as_id_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'id': self.id }
def _set_scada_element(name: str, cs: ChangeSet) -> DbChangeSet:
old = ScadaModel(get_scada_element(name, cs.operations[0]['id']))
raw_new = get_scada_element(name, cs.operations[0]['id'])
new_dict = cs.operations[0]
schema = get_scada_element_schema(name)
for key, value in schema.items():
if key in new_dict and not value['readonly']:
raw_new[key] = new_dict[key]
new = ScadaModel(raw_new)
redo_sql = f"update scada_element set x = {new.f_x}, y = {new.f_y}, device_id = {new.f_device_id}, model_id = {new.f_model_id}, model_type = {new.f_model_type}, status = {new.f_status} where id = {new.f_id};"
undo_sql = f"update scada_element set x = {old.f_x}, y = {old.f_y}, device_id = {old.f_device_id}, model_id = {old.f_model_id}, model_type = {old.f_model_type}, status = {old.f_status} where id = {old.f_id};"
redo_cs = g_update_prefix | new.as_dict()
undo_cs = g_update_prefix | old.as_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def set_scada_element(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_element(name, cs.operations[0]['id']) == {}:
return ChangeSet()
if _check_model(name, cs) == False:
return ChangeSet()
return execute_command(name, _set_scada_element(name, cs))
def _add_scada_element(name: str, cs: ChangeSet) -> DbChangeSet:
new = ScadaModel(cs.operations[0])
redo_sql = f"insert into scada_element (id, x, y, device_id, model_id, model_type, status) values ({new.f_id}, {new.f_x}, {new.f_y}, {new.f_device_id}, {new.f_model_id}, {new.f_model_type}, {new.f_status});"
undo_sql = f"delete from scada_element where id = {new.f_id};"
redo_cs = g_add_prefix | new.as_dict()
undo_cs = g_delete_prefix | new.as_id_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def add_scada_element(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_element(name, cs.operations[0]['id']) != {}:
return ChangeSet()
if _check_model(name, cs) == False:
return ChangeSet()
return execute_command(name, _add_scada_element(name, cs))
def _delete_scada_element(name: str, cs: ChangeSet) -> DbChangeSet:
old = ScadaModel(get_scada_element(name, cs.operations[0]['id']))
redo_sql = f"delete from scada_element where id = {old.f_id};"
undo_sql = f"insert into scada_element (id, x, y, device_id, model_id, model_type, status) values ({old.f_id}, {old.f_x}, {old.f_y}, {old.f_device_id}, {old.f_model_id}, {old.f_model_type}, {old.f_status});"
redo_cs = g_delete_prefix | old.as_id_dict()
undo_cs = g_add_prefix | old.as_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def delete_scada_element(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_element(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, _delete_scada_element(name, cs))
def get_all_scada_element_ids(name: str) -> list[str]:
result : list[str] = []
rows = read_all(name, 'select id from scada_element order by id')
for row in rows:
result.append(str(row['id']))
return result
#
# create table scada_element
# (
# id text primary key
# , x float8 not null
# , y float8 not null
# , device_id text references scada_device(id)
# , model_id varchar(32) -- add constraint in API
# , model_type scada_model_type
# , status scada_element_status not null default 'OFF'
# );
#
# 返回listlist里每个item是dict内容是 'id':'abc' 这样
# scada_model type 是类似pressureflow之类的是由Device 决定 的
def get_all_scada_elements(name: str) -> list[dict[str, Any]]:
return read_all(name, 'select * from scada_element order by id')