From cef6ea2cf7ddbc4f4611e9640bb8f46fb989dfbc Mon Sep 17 00:00:00 2001 From: wqy Date: Fri, 16 Sep 2022 22:48:35 +0800 Subject: [PATCH] Add tank api --- api/s4_tanks.py | 333 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 333 insertions(+) create mode 100644 api/s4_tanks.py diff --git a/api/s4_tanks.py b/api/s4_tanks.py new file mode 100644 index 0000000..897e749 --- /dev/null +++ b/api/s4_tanks.py @@ -0,0 +1,333 @@ +from psycopg.rows import dict_row, Row +from .connection import g_conn_dict as conn +from .s0_base import * +from .operation import * +from .change_set import ChangeSet + +def add_tank(name: str, id: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet: + if is_node(name, id): + return + + with conn[name].cursor() as cur: + sql = f"insert into _node (id, type) values ('{id}', 'TANK');" + sql += f" insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol) values ('{id}', {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol});" + sql += f" insert into coordinates (node, coord) values ('{id}', '({x}, {y})');" + cur.execute(sql) + + redo = sql.replace("'", '"') + undo = f'delete from coordinates where node = "{id}";' + undo += f' delete from tanks where id = "{id}";' + undo += f' delete from _node where id = "{id}";' + add_operation(name, redo, undo) + + change = ChangeSet() + change.add('tank', id) + return change + + +def delete_tank(name: str, id: str) -> ChangeSet: + if not is_tank(name, id): + return + + with conn[name].cursor(row_factory=dict_row) as cur: + cur.execute(f"select * from tanks where id = '{id}'") + row = cur.fetchone() + if row == None: + return + + elevation = row['elevation'] + init_level = row['init_level'] + min_level = row['min_level'] + max_level = row['max_level'] + diameter = row['diameter'] + min_vol = row['min_vol'] + vol_curve = 'NULL' if row['vol_curve'] == None else row['vol_curve'] + vol_curve = f'"{vol_curve}"' if vol_curve != 'NULL' else vol_curve + overflow = 'NULL' if row['overflow'] == None else row['overflow'] + + cur.execute(f"select * from coordinates where node = '{id}'") + row = cur.fetchone() + if row == None: + return + + coord = row['coord'] + + sql = f"delete from coordinates where node = '{id}';" + sql += f" delete from tanks where id = '{id}';" + sql += f" delete from _node where id = '{id}';" + cur.execute(sql) + + redo = sql.replace("'", '"') + undo = f'insert into _node (id, type) values ("{id}", "JUNCTION");' + undo += f' insert into tanks (id, elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow) values ("{id}", {elevation}, {init_level}, {min_level}, {max_level}, {diameter}, {min_vol}, {vol_curve}, {overflow});' + undo += f' insert into coordinates (node, coord) values ("{id}", "{coord}");' + add_operation(name, redo, undo) + + change = ChangeSet() + change.delete('tank', id) + return change + + +def _get_tank(name: str, id: str) -> Row | None: + with conn[name].cursor(row_factory=dict_row) as cur: + cur.execute(f"select elevation, init_level, min_level, max_level, diameter, min_vol, vol_curve, overflow from tanks where id = '{id}'") + return cur.fetchone() + + +def get_tank_elevation(name: str, id: str) -> float | None: + row = _get_tank(name, id) + return float(row['elevation']) if row != None else None + + +def get_tank_init_level(name: str, id: str) -> float | None: + row = _get_tank(name, id) + return float(row['init_level']) if row != None else None + + +def get_tank_min_level(name: str, id: str) -> float | None: + row = _get_tank(name, id) + return float(row['min_level']) if row != None else None + + +def get_tank_max_level(name: str, id: str) -> float | None: + row = _get_tank(name, id) + return float(row['max_level']) if row != None else None + + +def get_tank_diameter(name: str, id: str) -> float | None: + row = _get_tank(name, id) + return float(row['diameter']) if row != None else None + + +def get_tank_min_vol(name: str, id: str) -> float | None: + row = _get_tank(name, id) + return float(row['min_vol']) if row != None else None + + +def get_tank_vol_curve(name: str, id: str) -> str | None: + row = _get_tank(name, id) + if row != None: + return row['vol_curve'] if row['vol_curve'] != None else 'NULL' + else: + return None + + +def get_tank_overflow(name: str, id: str) -> str | None: + row = _get_tank(name, id) + if row != None: + return row['overflow'] if row['overflow'] != None else 'NULL' + else: + return None + + +def _to_point(coord: str) -> dict[str, float]: + coord = coord.removeprefix('(') + coord = coord.removesuffix(')') + coord = coord.split(',') + return { 'x': float(coord[0]), 'y': float(coord[1]) } + + +def get_tank_coord(name: str, id: str) -> dict[str, float] | None: + with conn[name].cursor(row_factory=dict_row) as cur: + cur.execute(f"select * from coordinates where node = '{id}'") + row = cur.fetchone() + if row == None: + return None + + coord = str(row['coord']) + return _to_point(coord) + + +def set_tank_elevation(name: str, id: str, elevation: float) -> ChangeSet: + if not is_tank(name, id): + return + + old = get_tank_elevation(name, id) + if old == None: + return + + with conn[name].cursor() as cur: + sql = f"update tanks set elevation = {elevation} where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set elevation = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'elevation', 'float', str(elevation)) + return change + + +def set_tank_init_level(name: str, id: str, init_level: float) -> ChangeSet: + if not is_tank(name, id): + return + + old = get_tank_init_level(name, id) + if old == None: + return + + with conn[name].cursor() as cur: + sql = f"update tanks set init_level = {init_level} where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set init_level = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'init_level', 'float', str(init_level)) + return change + + +def set_tank_min_level(name: str, id: str, min_level: float) -> ChangeSet: + if not is_tank(name, id): + return + + old = get_tank_min_level(name, id) + if old == None: + return + + with conn[name].cursor() as cur: + sql = f"update tanks set min_level = {min_level} where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set min_level = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'min_level', 'float', str(min_level)) + return change + + +def set_tank_max_level(name: str, id: str, max_level: float) -> ChangeSet: + if not is_tank(name, id): + return + + old = get_tank_max_level(name, id) + if old == None: + return + + with conn[name].cursor() as cur: + sql = f"update tanks set max_level = {max_level} where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set max_level = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'max_level', 'float', str(max_level)) + return change + + +def set_tank_diameter(name: str, id: str, diameter: float) -> ChangeSet: + if not is_tank(name, id): + return + + old = get_tank_diameter(name, id) + if old == None: + return + + with conn[name].cursor() as cur: + sql = f"update tanks set diameter = {diameter} where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set diameter = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'diameter', 'float', str(diameter)) + return change + + +def set_tank_min_vol(name: str, id: str, min_vol: float) -> ChangeSet: + if not is_tank(name, id): + return + + old = get_tank_min_vol(name, id) + if old == None: + return + + with conn[name].cursor() as cur: + sql = f"update tanks set min_vol = {min_vol} where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set min_vol = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'min_vol', 'float', str(min_vol)) + return change + + +def set_tank_vol_curve(name: str, id: str, vol_curve: str) -> ChangeSet: + if not is_tank(name, id): + return + if not is_curve(name, id): + return + + old = get_tank_vol_curve(name, id) + if old == None: + return + + old = f'"{old}"' if old != 'NULL' else old + + with conn[name].cursor() as cur: + sql = f"update tanks set vol_curve = '{vol_curve}' where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set vol_curve = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'vol_curve', 'str', str(vol_curve)) + return change + +OVERFLOW_YES = 'YES' +OVERFLOW_NO = 'NO' + +def set_tank_overflow(name: str, id: str, overflow: str) -> ChangeSet: + if overflow is not OVERFLOW_YES or overflow is not OVERFLOW_NO: + return + + if not is_tank(name, id): + return + if not is_curve(name, id): + return + + old = get_tank_overflow(name, id) + if old == None: + return + + old = f'"{old}"' if old != 'NULL' else old + + with conn[name].cursor() as cur: + sql = f"update tanks set overflow = '{overflow}' where id = '{id}'" + cur.execute(sql) + redo = sql.replace("'", '"') + undo = f'update tanks set overflow = {old} where id = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'overflow', 'str', str(overflow)) + return change + + +def set_tank_coord(name: str, id: str, x: float, y: float) -> ChangeSet: + if not is_tank(name, id): + return + + old = get_tank_coord(name, id) + if old == None: + return + old_x, old_y = old['x'], old['y'] + + with conn[name].cursor() as cur: + sql = f"update coordinates set coord = '({x},{y})' where node = '{id}'" + cur.execute(sql) + + redo = sql.replace("'", '"') + undo = f'update coordinates set coord = "({old_x},{old_y})" where node = "{id}"' + add_operation(name, redo, undo) + + change = ChangeSet() + change.update('tank', id, 'coord', 'point', str({'x': x, 'y': y})) + return change