282 lines
7.5 KiB
Python
282 lines
7.5 KiB
Python
from .project import *
|
|
from .database import ChangeSet, get_current_operation, set_restore_operation
|
|
from .sections import section_name
|
|
from .batch_cmds import execute_batch_commands
|
|
from .s1_title import inp_in_title
|
|
from .s2_junctions import inp_in_junction
|
|
from .s3_reservoirs import inp_in_reservoir
|
|
from .s4_tanks import inp_in_tank
|
|
from .s5_pipes import inp_in_pipe
|
|
from .s6_pumps import inp_in_pump
|
|
from .s7_valves import inp_in_valve
|
|
from .s8_tags import inp_in_tag
|
|
from .s9_demands import inp_in_demand
|
|
from .s10_status import inp_in_status
|
|
from .s11_patterns import inp_in_pattern
|
|
from .s12_curves import inp_in_curve
|
|
from .s13_controls import inp_in_control
|
|
from .s14_rules import inp_in_rule
|
|
from .s15_energy import inp_in_energy
|
|
from .s16_emitters import inp_in_emitter
|
|
from .s17_quality import inp_in_quality
|
|
from .s18_sources import inp_in_source
|
|
from .s19_reactions import inp_in_reaction
|
|
from .s20_mixing import inp_in_mixing
|
|
from .s21_times import inp_in_time
|
|
from .s22_report import inp_in_report
|
|
from .s23_options import inp_in_option
|
|
from .s24_coordinates import inp_in_coord
|
|
from .s25_vertices import inp_in_vertex
|
|
from .s26_labels import inp_in_label
|
|
from .s27_backdrop import inp_in_backdrop
|
|
#from .s28_end import *
|
|
|
|
|
|
def _parse_inp(inp: str) -> dict[str, list[str]]:
|
|
file: dict[str, list[str]] = {}
|
|
for s in section_name:
|
|
file[s] = []
|
|
|
|
section = ''
|
|
|
|
for line in open(inp):
|
|
line = line.strip()
|
|
if line == '':
|
|
# skip empty line for control and rule
|
|
if section == 'CONTROLS' or section == 'RULES':
|
|
pass
|
|
else:
|
|
section = ''
|
|
continue
|
|
|
|
if line.startswith('['):
|
|
is_section = False
|
|
for s in section_name:
|
|
if line.startswith(f'[{s}'):
|
|
section = s
|
|
is_section = True
|
|
break
|
|
if is_section:
|
|
continue
|
|
|
|
if section != '':
|
|
file[section].append(line)
|
|
|
|
return file
|
|
|
|
|
|
def _parse_cs(cs: ChangeSet) -> dict[str, list[str]]:
|
|
file: dict[str, list[str]] = {}
|
|
for s in section_name:
|
|
file[s] = []
|
|
|
|
section = ''
|
|
|
|
for line in str(cs.operations[0]['inp']).split('\n'):
|
|
line = line.strip()
|
|
if line == '':
|
|
# skip empty line for control and rule
|
|
if section == 'CONTROLS' or section == 'RULES':
|
|
pass
|
|
else:
|
|
section = ''
|
|
continue
|
|
|
|
if line.startswith('['):
|
|
is_section = False
|
|
for s in section_name:
|
|
if line.startswith(f'[{s}'):
|
|
section = s
|
|
is_section = True
|
|
break
|
|
if is_section:
|
|
continue
|
|
|
|
if section != '':
|
|
file[section].append(line)
|
|
|
|
return file
|
|
|
|
|
|
def _read_inp(file: dict[str, list[str]]) -> ChangeSet:
|
|
file_cs: dict[str, ChangeSet] = {}
|
|
for s in section_name:
|
|
file_cs[s] = ChangeSet()
|
|
|
|
for name, section in file.items():
|
|
if name == 'TITLE':
|
|
file_cs[name].merge(inp_in_title(section))
|
|
|
|
elif name == 'JUNCTIONS': # + coords
|
|
file_cs[name].merge(inp_in_junction(section))
|
|
|
|
elif name == 'RESERVOIRS': # + coords
|
|
file_cs[name].merge(inp_in_reservoir(section))
|
|
|
|
elif name == 'TANKS': # + coords
|
|
file_cs[name].merge(inp_in_tank(section))
|
|
|
|
elif name == 'PIPES':
|
|
file_cs[name].merge(inp_in_pipe(section))
|
|
|
|
elif name == 'PUMPS':
|
|
file_cs[name].merge(inp_in_pump(section))
|
|
|
|
elif name == 'VALVES':
|
|
file_cs[name].merge(inp_in_valve(section))
|
|
|
|
elif name == 'TAGS':
|
|
file_cs[name].merge(inp_in_tag(section))
|
|
|
|
elif name == 'DEMANDS':
|
|
file_cs[name].merge(inp_in_demand(section))
|
|
|
|
elif name == 'STATUS':
|
|
file_cs[name].merge(inp_in_status(section))
|
|
|
|
elif name == 'PATTERNS':
|
|
file_cs[name].merge(inp_in_pattern(section))
|
|
|
|
elif name == 'CURVES':
|
|
file_cs[name].merge(inp_in_curve(section))
|
|
|
|
elif name == 'CONTROLS':
|
|
file_cs[name].merge(inp_in_control(section))
|
|
|
|
elif name == 'RULES':
|
|
file_cs[name].merge(inp_in_rule(section))
|
|
|
|
elif name == 'ENERGY':
|
|
file_cs[name].merge(inp_in_energy(section))
|
|
|
|
elif name == 'EMITTERS':
|
|
file_cs[name].merge(inp_in_emitter(section))
|
|
|
|
elif name == 'QUALITY':
|
|
file_cs[name].merge(inp_in_quality(section))
|
|
|
|
elif name == 'SOURCES':
|
|
file_cs[name].merge(inp_in_source(section))
|
|
|
|
elif name == 'REACTIONS':
|
|
file_cs[name].merge(inp_in_reaction(section))
|
|
|
|
elif name == 'MIXING':
|
|
file_cs[name].merge(inp_in_mixing(section))
|
|
|
|
elif name == 'TIMES':
|
|
file_cs[name].merge(inp_in_time(section))
|
|
|
|
elif name == 'REPORT':
|
|
file_cs[name].merge(inp_in_report(section))
|
|
|
|
elif name == 'OPTIONS':
|
|
file_cs[name].merge(inp_in_option(section))
|
|
|
|
elif name == 'COORDINATES':
|
|
coords = inp_in_coord(section)
|
|
for s in ['JUNCTIONS', 'RESERVOIRS', 'TANKS']:
|
|
for node in file_cs[s].operations:
|
|
if node['type'] == 'demand':
|
|
continue
|
|
if node['id'] in coords:
|
|
coord = coords[node['id']]
|
|
node |= { 'x' : coord['x'], 'y' : coord['y'] }
|
|
else:
|
|
print(f"WARNING: [{s}] {node['id']} has no coordinate, set it at origin!")
|
|
node |= { 'x' : 0.0, 'y' : 0.0 }
|
|
|
|
elif name == 'VERTICES':
|
|
file_cs[name].merge(inp_in_vertex(section))
|
|
|
|
elif name == 'LABELS':
|
|
file_cs[name].merge(inp_in_label(section))
|
|
|
|
elif name == 'BACKDROP':
|
|
file_cs[name].merge(inp_in_backdrop(section))
|
|
|
|
elif name == 'END':
|
|
pass # :)
|
|
|
|
# release file
|
|
file = {}
|
|
|
|
cs = ChangeSet()
|
|
priorities = [
|
|
'PATTERNS',
|
|
'CURVES',
|
|
'JUNCTIONS',
|
|
'RESERVOIRS',
|
|
'TANKS',
|
|
'COORDINATES',
|
|
'PIPES',
|
|
'PUMPS',
|
|
'VALVES',
|
|
'DEMANDS',
|
|
'STATUS',
|
|
'OPTIONS',
|
|
'TIMES',
|
|
'EMITTERS',
|
|
'QUALITY',
|
|
'SOURCES',
|
|
'REACTIONS',
|
|
'MIXING',
|
|
'ENERGY',
|
|
'REPORT',
|
|
'VERTICES',
|
|
'CONTROLS',
|
|
'RULES',
|
|
'TITLE',
|
|
'TAGS',
|
|
'LABELS',
|
|
'BACKDROP',
|
|
'END',
|
|
]
|
|
for s in priorities:
|
|
cs.merge(file_cs[s])
|
|
|
|
return cs
|
|
|
|
|
|
def read_inp(project: str, inp: str):
|
|
if is_project_open(project):
|
|
close_project(project)
|
|
|
|
if have_project(project):
|
|
delete_project(project)
|
|
|
|
create_project(project)
|
|
open_project(project)
|
|
|
|
file = _parse_inp(inp)
|
|
cs = _read_inp(file)
|
|
|
|
execute_batch_commands(project, cs)
|
|
op = get_current_operation(project)
|
|
set_restore_operation(project, op)
|
|
|
|
close_project(project)
|
|
|
|
|
|
def import_inp(project: str, cs: ChangeSet) -> ChangeSet:
|
|
if is_project_open(project):
|
|
close_project(project)
|
|
|
|
if have_project(project):
|
|
delete_project(project)
|
|
|
|
create_project(project)
|
|
open_project(project)
|
|
|
|
file = _parse_cs(cs)
|
|
new_cs = _read_inp(file)
|
|
|
|
success_cs = execute_batch_commands(project, new_cs)
|
|
op = get_current_operation(project)
|
|
set_restore_operation(project, op)
|
|
|
|
close_project(project)
|
|
|
|
# return ?
|
|
return success_cs
|