Refactor tank
This commit is contained in:
235
api/s4_tanks.py
235
api/s4_tanks.py
@@ -1,79 +1,44 @@
|
||||
from psycopg.rows import dict_row, Row
|
||||
from psycopg.rows import Row
|
||||
from .connection import g_conn_dict as conn
|
||||
from .s0_base import *
|
||||
from .operation import *
|
||||
from .change_set import ChangeSet
|
||||
from .s24_coordinates import *
|
||||
import utility
|
||||
|
||||
|
||||
def add_tank(name: str, id: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet:
|
||||
if is_node(name, id):
|
||||
return
|
||||
sql = f"insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol) values ('{id}', {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol});"
|
||||
undo_sql = f'delete from tanks where id = "{id}";'
|
||||
return add_node(name, TANK, id, x, y, sql, undo_sql)
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"insert into _node (id, type) values ('{id}', 'TANK');"
|
||||
sql += f" insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol) values ('{id}', {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol});"
|
||||
sql += f" insert into coordinates (node, coord) values ('{id}', '({x}, {y})');"
|
||||
cur.execute(sql)
|
||||
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'delete from coordinates where node = "{id}";'
|
||||
undo += f' delete from tanks where id = "{id}";'
|
||||
undo += f' delete from _node where id = "{id}";'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.add('tank', id)
|
||||
return change
|
||||
def _get_tank(name: str, id: str) -> Row | None:
|
||||
return utility.query(name, f"select elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow from tanks where id = '{id}'")
|
||||
|
||||
|
||||
def delete_tank(name: str, id: str) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
|
||||
with conn[name].cursor(row_factory=dict_row) as cur:
|
||||
cur.execute(f"select * from tanks where id = '{id}'")
|
||||
row = cur.fetchone()
|
||||
if row == None:
|
||||
return
|
||||
row = _get_tank(name, id)
|
||||
if row == None:
|
||||
return
|
||||
|
||||
elevation = row['elevation']
|
||||
init_level = row['init_level']
|
||||
min_level = row['min_level']
|
||||
max_level = row['max_level']
|
||||
diameter = row['diameter']
|
||||
min_vol = row['min_vol']
|
||||
vol_curve = 'NULL' if row['vol_curve'] == None else row['vol_curve']
|
||||
vol_curve = f'"{vol_curve}"' if vol_curve != 'NULL' else vol_curve
|
||||
overflow = 'NULL' if row['overflow'] == None else row['overflow']
|
||||
elevation = row['elevation']
|
||||
init_level = row['init_level']
|
||||
min_level = row['min_level']
|
||||
max_level = row['max_level']
|
||||
diameter = row['diameter']
|
||||
min_vol = row['min_vol']
|
||||
vol_curve = 'NULL' if row['vol_curve'] == None else row['vol_curve']
|
||||
vol_curve = f'"{vol_curve}"' if vol_curve != 'NULL' else vol_curve
|
||||
overflow = 'NULL' if row['overflow'] == None else row['overflow']
|
||||
|
||||
cur.execute(f"select * from coordinates where node = '{id}'")
|
||||
row = cur.fetchone()
|
||||
if row == None:
|
||||
return
|
||||
sql += f"delete from tanks where id = '{id}';"
|
||||
undo_sql = f'insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) values ("{id}", {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol}, {vol_curve}, {overflow});'
|
||||
|
||||
coord = row['coord']
|
||||
|
||||
sql = f"delete from coordinates where node = '{id}';"
|
||||
sql += f" delete from tanks where id = '{id}';"
|
||||
sql += f" delete from _node where id = '{id}';"
|
||||
cur.execute(sql)
|
||||
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'insert into _node (id, type) values ("{id}", "JUNCTION");'
|
||||
undo += f' insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) values ("{id}", {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol}, {vol_curve}, {overflow});'
|
||||
undo += f' insert into coordinates (node, coord) values ("{id}", "{coord}");'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.delete('tank', id)
|
||||
return change
|
||||
|
||||
|
||||
def _get_tank(name: str, id: str) -> Row | None:
|
||||
with conn[name].cursor(row_factory=dict_row) as cur:
|
||||
cur.execute(f"select elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow from tanks where id = '{id}'")
|
||||
return cur.fetchone()
|
||||
return add_node(name, TANK, id, x, y, sql, undo_sql)
|
||||
|
||||
|
||||
def get_tank_elevation(name: str, id: str) -> float | None:
|
||||
@@ -126,177 +91,55 @@ def get_tank_coord(name: str, id: str) -> dict[str, float] | None:
|
||||
return get_node_coord(name, id)
|
||||
|
||||
|
||||
def set_tank_elevation(name: str, id: str, elevation: float) -> ChangeSet:
|
||||
def _set_tank(name: str, id: str, key: str, key_type: str, value: str, optional: bool = False) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
|
||||
old = get_tank_elevation(name, id)
|
||||
if old == None:
|
||||
row = _get_tank(name, id)
|
||||
if row == None:
|
||||
return
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set elevation = {elevation} where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set elevation = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
return utility.update(name, TANK, 'tanks', 'id', id, key, key_type, row[key], value, optional)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'elevation', 'float', str(elevation))
|
||||
return change
|
||||
|
||||
def set_tank_elevation(name: str, id: str, elevation: float) -> ChangeSet:
|
||||
return _set_tank(name, id, 'elevation', 'float', elevation)
|
||||
|
||||
|
||||
def set_tank_init_level(name: str, id: str, init_level: float) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
|
||||
old = get_tank_init_level(name, id)
|
||||
if old == None:
|
||||
return
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set init_level = {init_level} where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set init_level = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'init_level', 'float', str(init_level))
|
||||
return change
|
||||
return _set_tank(name, id, 'init_level', 'float', init_level)
|
||||
|
||||
|
||||
def set_tank_min_level(name: str, id: str, min_level: float) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
|
||||
old = get_tank_min_level(name, id)
|
||||
if old == None:
|
||||
return
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set min_level = {min_level} where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set min_level = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'min_level', 'float', str(min_level))
|
||||
return change
|
||||
return _set_tank(name, id, 'min_level', 'float', min_level)
|
||||
|
||||
|
||||
def set_tank_max_level(name: str, id: str, max_level: float) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
|
||||
old = get_tank_max_level(name, id)
|
||||
if old == None:
|
||||
return
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set max_level = {max_level} where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set max_level = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'max_level', 'float', str(max_level))
|
||||
return change
|
||||
return _set_tank(name, id, 'max_level', 'float', max_level)
|
||||
|
||||
|
||||
def set_tank_diameter(name: str, id: str, diameter: float) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
|
||||
old = get_tank_diameter(name, id)
|
||||
if old == None:
|
||||
return
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set diameter = {diameter} where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set diameter = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'diameter', 'float', str(diameter))
|
||||
return change
|
||||
return _set_tank(name, id, 'diameter', 'float', diameter)
|
||||
|
||||
|
||||
def set_tank_min_vol(name: str, id: str, min_vol: float) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
|
||||
old = get_tank_min_vol(name, id)
|
||||
if old == None:
|
||||
return
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set min_vol = {min_vol} where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set min_vol = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'min_vol', 'float', str(min_vol))
|
||||
return change
|
||||
return _set_tank(name, id, 'min_vol', 'float', min_vol)
|
||||
|
||||
|
||||
def set_tank_vol_curve(name: str, id: str, vol_curve: str) -> ChangeSet:
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
if not is_curve(name, id):
|
||||
return
|
||||
if not is_curve(name, vol_curve):
|
||||
return ChangeSet
|
||||
|
||||
old = get_tank_vol_curve(name, id)
|
||||
if old == None:
|
||||
return
|
||||
|
||||
old = f'"{old}"' if old != 'NULL' else old
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set vol_curve = '{vol_curve}' where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set vol_curve = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'vol_curve', 'str', str(vol_curve))
|
||||
return change
|
||||
return _set_tank(name, id, 'vol_curve', 'str', vol_curve, True)
|
||||
|
||||
OVERFLOW_YES = 'YES'
|
||||
OVERFLOW_NO = 'NO'
|
||||
|
||||
def set_tank_overflow(name: str, id: str, overflow: str) -> ChangeSet:
|
||||
if overflow is not OVERFLOW_YES or overflow is not OVERFLOW_NO:
|
||||
return
|
||||
return ChangeSet()
|
||||
|
||||
if not is_tank(name, id):
|
||||
return
|
||||
if not is_curve(name, id):
|
||||
return
|
||||
|
||||
old = get_tank_overflow(name, id)
|
||||
if old == None:
|
||||
return
|
||||
|
||||
old = f'"{old}"' if old != 'NULL' else old
|
||||
|
||||
with conn[name].cursor() as cur:
|
||||
sql = f"update tanks set overflow = '{overflow}' where id = '{id}'"
|
||||
cur.execute(sql)
|
||||
redo = sql.replace("'", '"')
|
||||
undo = f'update tanks set overflow = {old} where id = "{id}"'
|
||||
add_operation(name, redo, undo)
|
||||
|
||||
change = ChangeSet()
|
||||
change.update('tank', id, 'overflow', 'str', str(overflow))
|
||||
return change
|
||||
return _set_tank(name, id, 'overflow', 'str', overflow)
|
||||
|
||||
|
||||
def set_tank_coord(name: str, id: str, x: float, y: float) -> ChangeSet:
|
||||
|
||||
Reference in New Issue
Block a user