243 lines
9.5 KiB
Python
243 lines
9.5 KiB
Python
from typing import Any
|
|
from psycopg.rows import Row
|
|
from .s0_base import *
|
|
from .change_set import ChangeSet
|
|
from .s24_coordinates import *
|
|
from .utility import *
|
|
from .schema import *
|
|
|
|
|
|
OVERFLOW_YES = 'yes'
|
|
OVERFLOW_NO = 'no'
|
|
|
|
|
|
schema: dict[str, dict[str, Any]] = { \
|
|
'id' : define_property(str_type, False, True), \
|
|
'elevation' : define_property(float_type), \
|
|
'init_level': define_property(float_type), \
|
|
'min_level' : define_property(float_type), \
|
|
'max_level' : define_property(float_type), \
|
|
'diameter' : define_property(float_type), \
|
|
'min_vol' : define_property(float_type), \
|
|
'vol_curve' : define_property(str_type, True), \
|
|
'overflow' : define_property(str_type, True), \
|
|
'coord' : define_property(client_point_type), \
|
|
'links' : define_property(str_list_type, False, True)}
|
|
|
|
|
|
def get_tank_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return schema
|
|
|
|
|
|
def _query_tank(name: str, id: str) -> Row | None:
|
|
return read(name, f"select * from tanks where id = '{id}'")
|
|
|
|
|
|
def add_tank(name: str, id: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet:
|
|
if is_tank(name, id):
|
|
return ChangeSet()
|
|
|
|
sql = f"insert into _node (id, type) values ('{id}', '{TANK}');"
|
|
sql += f"\ninsert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol) values ('{id}', {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol});"
|
|
sql += f"\ninsert into coordinates (node, coord) values ('{id}', '({x}, {y})');"
|
|
|
|
undo = f"delete from coordinates where node = ''{id}'';"
|
|
undo += f"\ndelete from tanks where id = ''{id}'';"
|
|
undo += f"\ndelete from _node where id = ''{id}'';"
|
|
|
|
write(name, sql)
|
|
add_operation(name, sql.replace("'", "''"), undo, 'add_tank', API_ADD, TANK, id)
|
|
return get_current_change_set(name)
|
|
|
|
|
|
def get_tank(name: str, id: str) -> dict[str, Any] | None:
|
|
row = _query_tank(name, id)
|
|
if row == None:
|
|
return None
|
|
|
|
ps: dict[str, str] = {}
|
|
ps['id'] = id
|
|
ps['elevation'] = float(row['elevation'])
|
|
ps['init_level'] = float(row['init_level'])
|
|
ps['min_level'] = float(row['min_level'])
|
|
ps['max_level'] = float(row['max_level'])
|
|
ps['diameter'] = float(row['diameter'])
|
|
ps['min_vol'] = float(row['min_vol'])
|
|
ps['vol_curve'] = row['vol_curve']
|
|
ps['overflow'] = row['overflow']
|
|
ps['coord'] = get_node_coord(name, id)
|
|
ps['links'] = get_node_links(name, id)
|
|
return ps
|
|
|
|
|
|
def set_tank(name: str, id: str, properties: dict[str, Any]) -> ChangeSet:
|
|
if not is_tank(name, id):
|
|
return ChangeSet()
|
|
if 'vol_curve' in properties:
|
|
if not is_curve(properties['vol_curve']):
|
|
return ChangeSet()
|
|
if 'overflow' in properties:
|
|
if properties['overflow'] != OVERFLOW_YES and properties['overflow'] != OVERFLOW_NO:
|
|
return ChangeSet()
|
|
|
|
old = Serialize(get_tank(name, id), schema).to_storage()
|
|
|
|
new = get_tank(name, id)
|
|
ps: list[str] = []
|
|
for key in properties:
|
|
if key in schema and schema[key]['readonly'] == False:
|
|
new[key] = properties[key]
|
|
ps.append(key)
|
|
new = Serialize(new, schema).to_execution()
|
|
|
|
sql = f"update tanks set elevation = {new['elevation']}, \
|
|
init_level = {new['init_level']}, min_level = {new['min_level']}, max_level = {new['max_level']}, \
|
|
diameter = {new['diameter']}, min_vol = {new['min_vol']}, vol_curve = {new['vol_curve']}, overflow = {new['overflow']} where id = '{id}';"
|
|
undo = ""
|
|
if 'coord' in ps:
|
|
sql += f"\nupdate coordinates set coord = {new['coord']} where node = '{id}';"
|
|
undo = f"update coordinates set coord = {old['coord']} where node = ''{id}'';"
|
|
undo += f"\nupdate tanks set elevation = {old['elevation']}, \
|
|
init_level = {old['init_level']}, min_level = {old['min_level']}, max_level = {old['max_level']}, \
|
|
diameter = {old['diameter']}, min_vol = {old['min_vol']}, vol_curve = {old['vol_curve']}, overflow = {old['overflow']} where id = ''{id}'';"
|
|
|
|
write(name, sql)
|
|
add_operation(name, sql.replace("'", "''"), undo, 'set_tank', API_UPDATE, TANK, id, ps)
|
|
return get_current_change_set(name)
|
|
|
|
|
|
def delete_tank(name: str, id: str) -> ChangeSet:
|
|
row = get_tank(name, id)
|
|
if row == None:
|
|
return ChangeSet()
|
|
|
|
old = Serialize(get_tank(name, id), schema).to_storage()
|
|
|
|
sql = f"delete from coordinates where node = '{id}';"
|
|
sql += f"\ndelete from tanks where id = '{id}';"
|
|
sql += f"\ndelete from _node where id = '{id}';"
|
|
|
|
undo = f"insert into _node (id, type) values (''{id}'', ''{TANK}'');"
|
|
undo += f"\ninsert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) \
|
|
values (''{id}'', {old['elevation']}, {old['init_level']}, {old['min_level']}, {old['max_level']}, {old['diameter']}, {old['min_vol']}, {old['vol_curve']}, {old['overflow']});"
|
|
undo += f"\ninsert into coordinates (node, coord) values (''{id}'', {old['coord']});"
|
|
|
|
write(name, sql)
|
|
add_operation(name, sql.replace("'", "''"), undo, 'delete_tank', API_DELETE, TANK, id)
|
|
return get_current_change_set(name)
|
|
|
|
|
|
|
|
'''
|
|
def add_tank(name: str, id: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet:
|
|
sql = f"insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol) values ('{id}', {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol});"
|
|
undo_sql = f'delete from tanks where id = "{id}";'
|
|
return add_node(name, TANK, id, x, y, sql, undo_sql)
|
|
|
|
|
|
def _get_tank(name: str, id: str) -> Row | None:
|
|
return query(name, f"select elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow from tanks where id = '{id}'")
|
|
|
|
|
|
def delete_tank(name: str, id: str) -> ChangeSet:
|
|
if not is_tank(name, id):
|
|
return ChangeSet()
|
|
|
|
row = _get_tank(name, id)
|
|
if row == None:
|
|
return ChangeSet()
|
|
|
|
elevation = row['elevation']
|
|
init_level = row['init_level']
|
|
min_level = row['min_level']
|
|
max_level = row['max_level']
|
|
diameter = row['diameter']
|
|
min_vol = row['min_vol']
|
|
vol_curve = decorate(row['vol_curve'], 'str', True)
|
|
overflow = decorate(row['overflow'], 'str', True)
|
|
|
|
sql = f"delete from tanks where id = '{id}';"
|
|
undo_sql = f'insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) values ("{id}", {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol}, {vol_curve}, {overflow});'
|
|
|
|
return delete_node(name, TANK, id, sql, undo_sql)
|
|
|
|
|
|
def _set_tank(name: str, id: str, key: str, key_type: str, value: str, optional: bool = False) -> ChangeSet:
|
|
if not is_tank(name, id):
|
|
return ChangeSet()
|
|
|
|
row = _get_tank(name, id)
|
|
if row == None:
|
|
return ChangeSet()
|
|
|
|
return update(name, TANK, 'tanks', 'id', id, key, key_type, row[key], value, optional)
|
|
|
|
|
|
def set_tank_elevation(name: str, id: str, elevation: float) -> ChangeSet:
|
|
return _set_tank(name, id, 'elevation', 'float', elevation)
|
|
|
|
|
|
def set_tank_init_level(name: str, id: str, init_level: float) -> ChangeSet:
|
|
return _set_tank(name, id, 'init_level', 'float', init_level)
|
|
|
|
|
|
def set_tank_min_level(name: str, id: str, min_level: float) -> ChangeSet:
|
|
return _set_tank(name, id, 'min_level', 'float', min_level)
|
|
|
|
|
|
def set_tank_max_level(name: str, id: str, max_level: float) -> ChangeSet:
|
|
return _set_tank(name, id, 'max_level', 'float', max_level)
|
|
|
|
|
|
def set_tank_diameter(name: str, id: str, diameter: float) -> ChangeSet:
|
|
return _set_tank(name, id, 'diameter', 'float', diameter)
|
|
|
|
|
|
def set_tank_min_vol(name: str, id: str, min_vol: float) -> ChangeSet:
|
|
return _set_tank(name, id, 'min_vol', 'float', min_vol)
|
|
|
|
|
|
def set_tank_vol_curve(name: str, id: str, vol_curve: str) -> ChangeSet:
|
|
if not is_curve(name, vol_curve):
|
|
return ChangeSet()
|
|
|
|
return _set_tank(name, id, 'vol_curve', 'str', vol_curve, True)
|
|
|
|
|
|
def set_tank_overflow(name: str, id: str, overflow: str) -> ChangeSet:
|
|
if overflow != OVERFLOW_YES and overflow != OVERFLOW_NO:
|
|
return ChangeSet()
|
|
|
|
return _set_tank(name, id, 'overflow', 'str', overflow)
|
|
|
|
|
|
def set_tank_coord(name: str, id: str, x: float, y: float) -> ChangeSet:
|
|
if not is_tank(name, id):
|
|
return ChangeSet()
|
|
|
|
return set_node_coord(name, TANK, id, x, y)
|
|
|
|
|
|
def get_tank_property_names(name: str) -> list[str]:
|
|
return ['elevation', 'init_level', 'min_level', 'max_level', 'diameter', 'min_vol', 'vol_curve', 'overflow', 'coord', 'links']
|
|
|
|
|
|
def get_tank_properties(name: str, id: str) -> dict[str, Any] | None:
|
|
row = _get_tank(name, id)
|
|
if row == None:
|
|
return None
|
|
|
|
ps: dict[str, str] = {}
|
|
ps['elevation'] = float(row['elevation']) if row != None else None
|
|
ps['init_level'] = float(row['init_level']) if row != None else None
|
|
ps['min_level'] = float(row['min_level']) if row != None else None
|
|
ps['max_level'] = float(row['max_level']) if row != None else None
|
|
ps['diameter'] = float(row['diameter']) if row != None else None
|
|
ps['min_vol'] = float(row['min_vol']) if row != None else None
|
|
ps['vol_curve'] = row['vol_curve'] if row != None and row['vol_curve'] != None else None
|
|
ps['overflow'] = row['overflow'] if row != None and row['overflow'] != None else None
|
|
ps['coord'] = get_node_coord(name, id)
|
|
ps['links'] = get_node_links(name, id)
|
|
return ps
|
|
'''
|