174 lines
7.0 KiB
Python
174 lines
7.0 KiB
Python
from .database import *
|
|
from .s0_base import *
|
|
from .s24_coordinates import *
|
|
|
|
|
|
def get_junction_schema(name: str) -> dict[str, dict[str, Any]]:
|
|
return { 'id' : {'type': 'str' , 'optional': False , 'readonly': True },
|
|
'x' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'y' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'elevation' : {'type': 'float' , 'optional': False , 'readonly': False},
|
|
'demand' : {'type': 'float' , 'optional': True , 'readonly': False},
|
|
'pattern' : {'type': 'str' , 'optional': True , 'readonly': False},
|
|
'links' : {'type': 'str_list' , 'optional': False , 'readonly': True } }
|
|
|
|
|
|
def get_junction(name: str, id: str) -> dict[str, Any]:
|
|
j = read(name, f"select * from junctions where id = '{id}'")
|
|
xy = get_node_coord(name, id)
|
|
d = {}
|
|
d['id'] = str(j['id'])
|
|
d['x'] = float(xy['x'])
|
|
d['y'] = float(xy['y'])
|
|
d['elevation'] = float(j['elevation'])
|
|
d['demand'] = float(j['demand']) if j['demand'] != None else None
|
|
d['pattern'] = str(j['pattern']) if j['pattern'] != None else None
|
|
d['links'] = get_node_links(name, id)
|
|
return d
|
|
|
|
|
|
class Junction(object):
|
|
def __init__(self, input: dict[str, Any]) -> None:
|
|
self.type = 'junction'
|
|
self.id = str(input['id'])
|
|
self.x = float(input['x'])
|
|
self.y = float(input['y'])
|
|
self.elevation = float(input['elevation'])
|
|
self.demand = float(input['demand']) if 'demand' in input and input['demand'] != None else None
|
|
self.pattern = str(input['pattern']) if 'pattern' in input and input['pattern'] != None else None
|
|
|
|
self.f_type = f"'{self.type}'"
|
|
self.f_id = f"'{self.id}'"
|
|
self.f_coord = f"'({self.x}, {self.y})'"
|
|
self.f_elevation = self.elevation
|
|
self.f_demand = self.demand if self.demand != None else 'null'
|
|
self.f_pattern = f"'{self.pattern}'" if self.pattern != None else 'null'
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id, 'x': self.x, 'y': self.y, 'elevation': self.elevation, 'demand': self.demand, 'pattern': self.pattern }
|
|
|
|
def as_id_dict(self) -> dict[str, Any]:
|
|
return { 'type': self.type, 'id': self.id }
|
|
|
|
|
|
def set_junction_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Junction(get_junction(name, cs.operations[0]['id']))
|
|
raw_new = get_junction(name, cs.operations[0]['id'])
|
|
|
|
new_dict = cs.operations[0]
|
|
schema = get_junction_schema(name)
|
|
for key, value in schema.items():
|
|
if key in new_dict and not value['readonly']:
|
|
raw_new[key] = new_dict[key]
|
|
new = Junction(raw_new)
|
|
|
|
redo_sql = f"update junctions set elevation = {new.f_elevation}, demand = {new.f_demand}, pattern = {new.f_pattern} where id = {new.f_id};"
|
|
redo_sql += f"\nupdate coordinates set coord = {new.f_coord} where node = {new.f_id};"
|
|
|
|
undo_sql = f"update coordinates set coord = {old.f_coord} where node = {old.f_id};"
|
|
undo_sql += f"\nupdate junctions set elevation = {old.f_elevation}, demand = {old.f_demand}, pattern = {old.f_pattern} where id = {old.f_id};"
|
|
|
|
redo_cs = g_update_prefix | new.as_dict()
|
|
undo_cs = g_update_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def set_junction(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, set_junction_cmd(name, cs))
|
|
|
|
|
|
def add_junction_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
new = Junction(cs.operations[0])
|
|
|
|
redo_sql = f"insert into _node (id, type) values ({new.f_id}, {new.f_type});"
|
|
redo_sql += f"\ninsert into junctions (id, elevation, demand, pattern) values ({new.f_id}, {new.f_elevation}, {new.f_demand}, {new.f_pattern});"
|
|
redo_sql += f"\ninsert into coordinates (node, coord) values ({new.f_id}, {new.f_coord});"
|
|
|
|
undo_sql = f"delete from coordinates where node = {new.f_id};"
|
|
undo_sql += f"\ndelete from junctions where id = {new.f_id};"
|
|
undo_sql += f"\ndelete from _node where id = {new.f_id};"
|
|
|
|
redo_cs = g_add_prefix | new.as_dict()
|
|
undo_cs = g_delete_prefix | new.as_id_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def add_junction(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, add_junction_cmd(name, cs))
|
|
|
|
|
|
def delete_junction_cmd(name: str, cs: ChangeSet) -> DbChangeSet:
|
|
old = Junction(get_junction(name, cs.operations[0]['id']))
|
|
|
|
redo_sql = f"delete from coordinates where node = {old.f_id};"
|
|
redo_sql += f"\ndelete from junctions where id = {old.f_id};"
|
|
redo_sql += f"\ndelete from _node where id = {old.f_id};"
|
|
|
|
undo_sql = f"insert into _node (id, type) values ({old.f_id}, {old.f_type});"
|
|
undo_sql += f"\ninsert into junctions (id, elevation, demand, pattern) values ({old.f_id}, {old.f_elevation}, {old.f_demand}, {old.f_pattern});"
|
|
undo_sql += f"\ninsert into coordinates (node, coord) values ({old.f_id}, {old.f_coord});"
|
|
|
|
redo_cs = g_delete_prefix | old.as_id_dict()
|
|
undo_cs = g_add_prefix | old.as_dict()
|
|
|
|
return DbChangeSet(redo_sql, undo_sql, [redo_cs], [undo_cs])
|
|
|
|
|
|
def delete_junction(name: str, cs: ChangeSet) -> ChangeSet:
|
|
return execute_command(name, delete_junction_cmd(name, cs))
|
|
|
|
|
|
#--------------------------------------------------------------
|
|
# [EPANET2]
|
|
# id elev. (demand) (demand pattern)
|
|
#--------------------------------------------------------------
|
|
class InpJunction:
|
|
def __init__(self, line: str) -> None:
|
|
tokens = line.split()
|
|
|
|
num = len(tokens)
|
|
has_desc = tokens[-1].startswith(';')
|
|
num_without_desc = (num - 1) if has_desc else num
|
|
|
|
self.id = str(tokens[0])
|
|
self.elevation = float(tokens[1])
|
|
self.demand = float(tokens[2]) if num_without_desc >= 3 else None
|
|
self.pattern = str(tokens[3]) if num_without_desc >= 4 else None
|
|
self.desc = str(tokens[-1]) if has_desc else None
|
|
|
|
|
|
def inp_in_junction(section: list[str]) -> ChangeSet:
|
|
cs = ChangeSet()
|
|
for s in section:
|
|
# skip comment
|
|
if s.startswith(';'):
|
|
continue
|
|
obj = InpJunction(s)
|
|
cs.append(g_add_prefix | {'type': 'junction', 'id': obj.id, 'elevation': obj.elevation, 'demand': obj.demand, 'pattern': obj.pattern})
|
|
return cs
|
|
|
|
|
|
def inp_out_junction(name: str) -> list[str]:
|
|
lines = []
|
|
objs = read_all(name, 'select * from junctions')
|
|
for obj in objs:
|
|
id = obj['id']
|
|
elev = obj['elevation']
|
|
demand = obj['demand'] if obj['demand'] != None else ''
|
|
pattern = obj['pattern'] if obj['pattern'] != None else ''
|
|
desc = ';'
|
|
lines.append(f'{id} {elev} {demand} {pattern} {desc}')
|
|
return lines
|
|
|
|
|
|
def unset_junction_by_pattern(name: str, pattern: str) -> ChangeSet:
|
|
cs = ChangeSet()
|
|
|
|
rows = read_all(name, f"select id from junctions where pattern = '{pattern}'")
|
|
for row in rows:
|
|
cs.append(g_update_prefix | {'type': 'junction', 'id': row['id'], 'pattern': None})
|
|
|
|
return cs
|