226 lines
10 KiB
Python
226 lines
10 KiB
Python
from .database import *
|
|
from .s0_base import *
|
|
from .s24_coordinates import *
|
|
|
|
|
|
OVERFLOW_YES = 'YES'
|
|
OVERFLOW_NO = 'NO'
|
|
|
|
|
|
def get_tank_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
|
|
'x' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'y' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'elevation' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'init_level' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'min_level' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'max_level' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'diameter' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'min_vol' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'vol_curve' : {'type': 'str' , 'optional': True , 'readonly': False},
|
|
'overflow' : {'type': 'str' , 'optional': True , 'readonly': False},
|
|
'links' : {'type': 'str_list' , 'optional': False , 'readonly': True } }
|
|
|
|
|
|
def get_tank(name: str, id: str) -> dict[str, Any]:
|
|
t = try_read(name, f"select * from tanks where id = '{id}'")
|
|
if t == None:
|
|
return {}
|
|
xy = get_node_coord(name, id)
|
|
d = {}
|
|
d['id'] = str(t['id'])
|
|
d['x'] = float(xy['x'])
|
|
d['y'] = float(xy['y'])
|
|
d['elevation'] = float(t['elevation'])
|
|
d['init_level'] = float(t['init_level'])
|
|
d['min_level'] = float(t['min_level'])
|
|
d['max_level'] = float(t['max_level'])
|
|
d['diameter'] = float(t['diameter'])
|
|
d['min_vol'] = float(t['min_vol'])
|
|
d['vol_curve'] = str(t['vol_curve']) if t['vol_curve'] != None else None
|
|
d['overflow'] = str(t['overflow']) if t['overflow'] != None else None
|
|
d['links'] = get_node_links(name, id)
|
|
return d
|
|
|
|
|
|
class Tank(object):
|
|
def __init__(self, input: dict[str, Any]) -> None:
|
|
self.type = 'tank'
|
|
self.id = str(input['id'])
|
|
self.x = float(input['x'])
|
|
self.y = float(input['y'])
|
|
self.elevation = float(input['elevation'])
|
|
self.init_level = float(input['init_level'])
|
|
self.min_level = float(input['min_level'])
|
|
self.max_level = float(input['max_level'])
|
|
self.diameter = float(input['diameter'])
|
|
self.min_vol = float(input['min_vol'])
|
|
self.vol_curve = str(input['vol_curve']) if 'vol_curve' in input and input['vol_curve'] != None else None
|
|
self.overflow = str(input['overflow']) if 'overflow' in input and input['overflow'] != None else None
|
|
|
|
self.f_type = f"'{self.type}'"
|
|
self.f_id = f"'{self.id}'"
|
|
self.f_coord = f"'({self.x}, {self.y})'"
|
|
self.f_elevation = self.elevation
|
|
self.f_init_level = self.init_level
|
|
self.f_min_level = self.min_level
|
|
self.f_max_level = self.max_level
|
|
self.f_diameter = self.diameter
|
|
self.f_min_vol = self.min_vol
|
|
self.f_vol_curve = f"'{self.vol_curve}'" if self.vol_curve != None else 'null'
|
|
self.f_overflow = f"'{self.overflow}'" if self.overflow != None else 'null'
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id, 'x': self.x, 'y': self.y, 'elevation': self.elevation, 'init_level': self.init_level, 'min_level': self.min_level, 'max_level': self.max_level, 'diameter': self.diameter, 'min_vol': self.min_vol, 'vol_curve': self.vol_curve, 'overflow': self.overflow }
|
|
|
|
def as_id_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id }
|
|
|
|
|
|
def set_tank_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Tank(get_tank(name, cs.operations[0]['id']))
|
|
raw_new = get_tank(name, cs.operations[0]['id'])
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_tank_schema(name)
|
|
for key, value in schema.items():
|
|
if key in new_dict and not value['readonly']:
|
|
raw_new[key] = new_dict[key]
|
|
new = Tank(raw_new)
|
|
|
|
redo_sql = f"update tanks set elevation = {new.f_elevation}, init_level = {new.f_init_level}, min_level = {new.f_min_level}, max_level = {new.f_max_level}, diameter = {new.f_diameter}, min_vol = {new.f_min_vol}, vol_curve = {new.f_vol_curve}, overflow = {new.f_overflow} where id = {new.f_id};"
|
|
redo_sql += f"\nupdate coordinates set coord = {new.f_coord} where node = {new.f_id};"
|
|
|
|
undo_sql = f"update coordinates set coord = {old.f_coord} where node = {old.f_id};"
|
|
undo_sql += f"\nupdate tanks set elevation = {old.f_elevation}, init_level = {old.f_init_level}, min_level = {old.f_min_level}, max_level = {old.f_max_level}, diameter = {old.f_diameter}, min_vol = {old.f_min_vol}, vol_curve = {old.f_vol_curve}, overflow = {old.f_overflow} where id = {old.f_id};"
|
|
|
|
redo_cs = g_update_prefix | new.as_dict()
|
|
undo_cs = g_update_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_tank(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if 'id' not in cs.operations[0]:
|
|
return ChangeSet()
|
|
if get_tank(name, cs.operations[0]['id']) == {}:
|
|
return ChangeSet()
|
|
return execute_command(name, set_tank_cmd(name, cs))
|
|
|
|
|
|
def add_tank_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
new = Tank(cs.operations[0])
|
|
|
|
redo_sql = f"insert into _node (id, type) values ({new.f_id}, {new.f_type});"
|
|
redo_sql += f"\ninsert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) values ({new.f_id}, {new.f_elevation}, {new.f_init_level}, {new.f_min_level}, {new.f_max_level}, {new.f_diameter}, {new.f_min_vol}, {new.f_vol_curve}, {new.f_overflow});"
|
|
redo_sql += f"\ninsert into coordinates (node, coord) values ({new.f_id}, {new.f_coord});"
|
|
|
|
undo_sql = f"delete from coordinates where node = {new.f_id};"
|
|
undo_sql += f"\ndelete from tanks where id = {new.f_id};"
|
|
undo_sql += f"\ndelete from _node where id = {new.f_id};"
|
|
|
|
redo_cs = g_add_prefix | new.as_dict()
|
|
undo_cs = g_delete_prefix | new.as_id_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def add_tank(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if 'id' not in cs.operations[0]:
|
|
return ChangeSet()
|
|
if get_tank(name, cs.operations[0]['id']) != {}:
|
|
return ChangeSet()
|
|
return execute_command(name, add_tank_cmd(name, cs))
|
|
|
|
|
|
def delete_tank_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Tank(get_tank(name, cs.operations[0]['id']))
|
|
|
|
redo_sql = f"delete from coordinates where node = {old.f_id};"
|
|
redo_sql += f"\ndelete from tanks where id = {old.f_id};"
|
|
redo_sql += f"\ndelete from _node where id = {old.f_id};"
|
|
|
|
undo_sql = f"insert into _node (id, type) values ({old.f_id}, {old.f_type});"
|
|
undo_sql += f"\ninsert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) values ({old.f_id}, {old.f_elevation}, {old.f_init_level}, {old.f_min_level}, {old.f_max_level}, {old.f_diameter}, {old.f_min_vol}, {old.f_vol_curve}, {old.f_overflow});"
|
|
undo_sql += f"\ninsert into coordinates (node, coord) values ({old.f_id}, {old.f_coord});"
|
|
|
|
redo_cs = g_delete_prefix | old.as_id_dict()
|
|
undo_cs = g_add_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def delete_tank(name: str, cs: ChangeSet) -> ChangeSet:
|
|
if 'id' not in cs.operations[0]:
|
|
return ChangeSet()
|
|
if get_tank(name, cs.operations[0]['id']) == {}:
|
|
return ChangeSet()
|
|
return execute_command(name, delete_tank_cmd(name, cs))
|
|
|
|
|
|
#--------------------------------------------------------------
|
|
# [EPA2]
|
|
# [IN]
|
|
# id elev initlevel minlevel maxlevel diam (minvol vcurve overflow) ;desc
|
|
# xxx
|
|
# * YES
|
|
# [OUT]
|
|
# id elev initlevel minlevel maxlevel diam minvol (vcurve overflow) ;desc
|
|
#--------------------------------------------------------------
|
|
# [EPA3]
|
|
# id elev initlevel minlevel maxlevel diam minvol (vcurve)
|
|
#--------------------------------------------------------------
|
|
|
|
|
|
def inp_in_tank(line: str) -> str:
|
|
tokens = line.split()
|
|
|
|
num = len(tokens)
|
|
has_desc = tokens[-1].startswith(';')
|
|
num_without_desc = (num - 1) if has_desc else num
|
|
|
|
id = str(tokens[0])
|
|
elevation = float(tokens[1])
|
|
init_level = float(tokens[2])
|
|
min_level = float(tokens[3])
|
|
max_level = float(tokens[4])
|
|
diameter = float(tokens[5])
|
|
min_vol = float(tokens[6]) if num_without_desc >= 7 else 0.0
|
|
vol_curve = str(tokens[7]) if num_without_desc >= 8 and tokens[7] != '*' else None
|
|
vol_curve = f"'{vol_curve}'" if vol_curve != None else 'null'
|
|
overflow = str(tokens[8].upper()) if num_without_desc >= 9 else None
|
|
overflow = f"'{overflow}'" if overflow != None else 'null'
|
|
desc = str(tokens[-1]) if has_desc else None
|
|
|
|
return f"insert into _node (id, type) values ('{id}', 'tank');insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) values ('{id}', {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol}, {vol_curve}, {overflow});"
|
|
|
|
|
|
def inp_out_tank(name: str) -> list[str]:
|
|
lines = []
|
|
objs = read_all(name, 'select * from tanks')
|
|
for obj in objs:
|
|
id = obj['id']
|
|
elevation = obj['elevation']
|
|
init_level = obj['init_level']
|
|
min_level = obj['min_level']
|
|
max_level = obj['max_level']
|
|
diameter = obj['diameter']
|
|
min_vol = obj['min_vol']
|
|
vol_curve = obj['vol_curve'] if obj['vol_curve'] != None else ''
|
|
overflow = obj['overflow'] if obj['overflow'] != None else ''
|
|
if vol_curve == '' and overflow != '':
|
|
vol_curve = '*'
|
|
desc = ';'
|
|
lines.append(f'{id} {elevation} {init_level} {min_level} {max_level} {diameter} {min_vol} {vol_curve} {overflow} {desc}')
|
|
return lines
|
|
|
|
|
|
def unset_tank_by_curve(name: str, curve: str) -> ChangeSet:
|
|
cs = ChangeSet()
|
|
|
|
rows = read_all(name, f"select id from tanks where vol_curve = '{curve}'")
|
|
for row in rows:
|
|
cs.append(g_update_prefix | {'type': 'tank', 'id': row['id'], 'vol_curve': None})
|
|
|
|
return cs
|