182 lines
5.0 KiB
Python
182 lines
5.0 KiB
Python
from psycopg.rows import dict_row, Row
|
|
from .connection import g_conn_dict as conn
|
|
from .operation import *
|
|
from .change_set import ChangeSet
|
|
|
|
|
|
_NODE = "_node"
|
|
_LINK = "_link"
|
|
_CURVE = "_curve"
|
|
_PATTERN = "_pattern"
|
|
|
|
JUNCTION = "junction"
|
|
RESERVOIR = "reservoir"
|
|
TANK = "tank"
|
|
PIPE = "pipe"
|
|
PUMP = "pump"
|
|
VALVE = "valve"
|
|
|
|
|
|
def _get_from(name: str, id: str, base_type: str) -> Row | None:
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select * from {base_type} where id = '{id}'")
|
|
return cur.fetchone()
|
|
|
|
|
|
def is_node(name: str, id: str) -> bool:
|
|
return _get_from(name, id, _NODE) != None
|
|
|
|
|
|
def is_junction(name: str, id: str) -> bool:
|
|
row = _get_from(name, id, _NODE)
|
|
return row != None and row['type'] == JUNCTION
|
|
|
|
|
|
def is_reservoir(name: str, id: str) -> bool:
|
|
row = _get_from(name, id, _NODE)
|
|
return row != None and row['type'] == RESERVOIR
|
|
|
|
|
|
def is_tank(name: str, id: str) -> bool:
|
|
row = _get_from(name, id, _NODE)
|
|
return row != None and row['type'] == TANK
|
|
|
|
|
|
def is_link(name: str, id: str) -> bool:
|
|
return _get_from(name, id, _LINK) != None
|
|
|
|
|
|
def is_pipe(name: str, id: str) -> bool:
|
|
row = _get_from(name, id, _LINK)
|
|
return row != None and row['type'] == PIPE
|
|
|
|
|
|
def is_pump(name: str, id: str) -> bool:
|
|
row = _get_from(name, id, _LINK)
|
|
return row != None and row['type'] == PUMP
|
|
|
|
|
|
def is_valve(name: str, id: str) -> bool:
|
|
row = _get_from(name, id, _LINK)
|
|
return row != None and row['type'] == VALVE
|
|
|
|
|
|
def is_curve(name: str, id: str) -> bool:
|
|
return _get_from(name, id, _CURVE) != None
|
|
|
|
|
|
def is_pattern(name: str, id: str) -> bool:
|
|
return _get_from(name, id, _PATTERN) != None
|
|
|
|
|
|
def _get_all(name: str, base_type: str) -> list[str]:
|
|
ids : list[str] = []
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select id from {base_type} order by id")
|
|
for record in cur:
|
|
ids.append(record['id'])
|
|
return ids
|
|
|
|
|
|
def get_nodes(name: str) -> list[str]:
|
|
return _get_all(name, _NODE)
|
|
|
|
|
|
def get_links(name: str) -> list[str]:
|
|
return _get_all(name, _LINK)
|
|
|
|
|
|
def get_curves(name: str) -> list[str]:
|
|
return _get_all(name, _CURVE)
|
|
|
|
|
|
def get_patterns(name: str) -> list[str]:
|
|
return _get_all(name, _PATTERN)
|
|
|
|
|
|
def add_node(name: str, node_type: str, id: str, x: float, y: float, table_sql: str, table_undo_sql: str) -> ChangeSet:
|
|
if is_node(name, id):
|
|
return ChangeSet()
|
|
|
|
with conn[name].cursor() as cur:
|
|
sql = f"insert into _node (id, type) values ('{id}', '{node_type}'); "
|
|
sql += table_sql
|
|
sql += f" insert into coordinates (node, coord) values ('{id}', '({x}, {y})');"
|
|
cur.execute(sql)
|
|
|
|
redo = sql.replace("'", '"')
|
|
undo = f'delete from coordinates where node = "{id}"; '
|
|
undo += table_undo_sql
|
|
undo += f' delete from _node where id = "{id}";'
|
|
add_operation(name, redo, undo)
|
|
|
|
change = ChangeSet()
|
|
change.add(node_type, id)
|
|
return change
|
|
|
|
|
|
def delete_node(name: str, node_type: str, id: str, table_sql: str, table_undo_sql: str) -> ChangeSet:
|
|
if not is_node(name, id):
|
|
return ChangeSet()
|
|
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
cur.execute(f"select * from coordinates where node = '{id}'")
|
|
row = cur.fetchone()
|
|
if row == None:
|
|
return ChangeSet()
|
|
|
|
coord = row['coord']
|
|
|
|
sql = f"delete from coordinates where node = '{id}'; "
|
|
sql += table_sql
|
|
sql += f" delete from _node where id = '{id}';"
|
|
cur.execute(sql)
|
|
|
|
redo = sql.replace("'", '"')
|
|
undo = f'insert into _node (id, type) values ("{id}", "{node_type}"); '
|
|
undo += table_undo_sql
|
|
undo += f' insert into coordinates (node, coord) values ("{id}", "{coord}");'
|
|
add_operation(name, redo, undo)
|
|
|
|
change = ChangeSet()
|
|
change.delete(node_type, id)
|
|
return change
|
|
|
|
|
|
def add_link(name: str, link_type: str, id: str, table_sql: str, table_undo_sql: str) -> ChangeSet:
|
|
if is_link(name, id):
|
|
return ChangeSet()
|
|
|
|
with conn[name].cursor() as cur:
|
|
sql = f"insert into _link (id, type) values ('{id}', '{link_type}'); "
|
|
sql += table_sql
|
|
cur.execute(sql)
|
|
|
|
redo = sql.replace("'", '"')
|
|
undo = table_undo_sql
|
|
undo += f' delete from _link where id = "{id}";'
|
|
add_operation(name, redo, undo)
|
|
|
|
change = ChangeSet()
|
|
change.add(link_type, id)
|
|
return change
|
|
|
|
|
|
def delete_link(name: str, link_type: str, id: str, table_sql: str, table_undo_sql: str) -> ChangeSet:
|
|
if not is_link(name, id):
|
|
return ChangeSet()
|
|
|
|
with conn[name].cursor(row_factory=dict_row) as cur:
|
|
sql = table_sql
|
|
sql += f" delete from _link where id = '{id}';"
|
|
cur.execute(sql)
|
|
|
|
redo = sql.replace("'", '"')
|
|
undo = f'insert into _link (id, type) values ("{id}", "{link_type}"); '
|
|
undo += table_undo_sql
|
|
add_operation(name, redo, undo)
|
|
|
|
change = ChangeSet()
|
|
change.delete(link_type, id)
|
|
return change
|