Files
TJWaterServer/api/s29_scada_device.py
2023-10-14 15:35:16 +08:00

124 lines
4.9 KiB
Python

from .database import *
SCADA_DEVICE_TYPE_PRESSURE = 'PRESSURE'
SCADA_DEVICE_TYPE_DEMAND = 'DEMAND'
SCADA_DEVICE_TYPE_QUALITY = 'QUALITY'
SCADA_DEVICE_TYPE_LEVEL = 'LEVEL'
SCADA_DEVICE_TYPE_FLOW = 'FLOW'
SCADA_DEVICE_TYPE_UNKNOWN = 'UNKNOWN'
def get_scada_device_schema(name: str) -> dict[str, dict[str, Any]]:
return { 'id' : {'type': 'str', 'optional': False, 'readonly': True },
'name' : {'type': 'str', 'optional': True , 'readonly': False},
'address': {'type': 'str', 'optional': True , 'readonly': False},
'sd_type': {'type': 'str', 'optional': True , 'readonly': False}}
def get_scada_device(name: str, id: str) -> dict[str, Any]:
sm = try_read(name, f"select * from scada_device where id = '{id}'")
if sm == None:
return {}
d = {}
d['id'] = str(sm['id'])
d['name'] = str(sm['name']) if sm['name'] != None else None
d['address'] = str(sm['address']) if sm['address'] != None else None
d['sd_type'] = str(sm['sd_type']) if sm['sd_type'] != None else None
return d
class ScadaDevice(object):
def __init__(self, input: dict[str, Any]) -> None:
self.type = 'scada_device'
self.id = str(input['id'])
self.name = str(input['name']) if 'name' in input and input['name'] != None else None
self.address = str(input['address']) if 'address' in input and input['address'] != None else None
self.sd_type = str(input['sd_type']) if 'sd_type' in input and input['sd_type'] != None else None
self.f_type = f"'{self.type}'"
self.f_id = f"'{self.id}'"
self.f_name = f"'{self.name}'" if self.name != None else 'null'
self.f_address = f"'{self.address}'" if self.address != None else 'null'
self.f_sd_type = f"'{self.sd_type}'" if self.sd_type != None else 'null'
def as_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'id': self.id, 'name': self.name, 'address': self.address, 'sd_type': self.sd_type }
def as_id_dict(self) -> dict[str, Any]:
return { 'type': self.type, 'id': self.id }
def _set_scada_device(name: str, cs: ChangeSet) -> DbChangeSet:
old = ScadaDevice(get_scada_device(name, cs.operations[0]['id']))
raw_new = get_scada_device(name, cs.operations[0]['id'])
new_dict = cs.operations[0]
schema = get_scada_device_schema(name)
for key, value in schema.items():
if key in new_dict and not value['readonly']:
raw_new[key] = new_dict[key]
new = ScadaDevice(raw_new)
redo_sql = f"update scada_device set name = {new.f_name}, address = {new.f_address}, sd_type = {new.f_sd_type} where id = {new.f_id};"
undo_sql = f"update scada_device set name = {old.f_name}, address = {old.f_address}, sd_type = {old.f_sd_type} where id = {old.f_id};"
redo_cs = g_update_prefix | new.as_dict()
undo_cs = g_update_prefix | old.as_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def set_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_device(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, _set_scada_device(name, cs), False)
def _add_scada_device(name: str, cs: ChangeSet) -> DbChangeSet:
new = ScadaDevice(cs.operations[0])
redo_sql = f"insert into scada_device (id, name, address, sd_type) values ({new.f_id}, {new.f_name}, {new.f_address}, {new.f_sd_type});"
undo_sql = f"delete from scada_device where id = {new.f_id};"
redo_cs = g_add_prefix | new.as_dict()
undo_cs = g_delete_prefix | new.as_id_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def add_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_device(name, cs.operations[0]['id']) != {}:
return ChangeSet()
return execute_command(name, _add_scada_device(name, cs), False)
def _delete_scada_device(name: str, cs: ChangeSet) -> DbChangeSet:
old = ScadaDevice(get_scada_device(name, cs.operations[0]['id']))
redo_sql = f"delete from scada_device where id = {old.f_id};"
undo_sql = f"insert into scada_device (id, name, address, sd_type) values ({old.f_id}, {old.f_name}, {old.f_address}, {old.f_sd_type});"
redo_cs = g_delete_prefix | old.as_id_dict()
undo_cs = g_add_prefix | old.as_dict()
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
def delete_scada_device(name: str, cs: ChangeSet) -> ChangeSet:
if get_scada_device(name, cs.operations[0]['id']) == {}:
return ChangeSet()
return execute_command(name, _delete_scada_device(name, cs), False)
def get_all_scada_device_ids(name: str) -> list[str]:
result : list[str] = []
rows = read_all(name, 'select id from scada_device order by id')
for row in rows:
result.append(str(row['id']))
return result
def get_all_scada_devices(name: str) -> list[dict[str, Any]]:
return read_all(name, 'select * from scada_device order by id')