Files
TJWaterServer/api/inp_in.py
2023-03-09 09:04:34 +08:00

282 lines
7.5 KiB
Python

from .project import *
from .database import get_current_operation, set_restore_operation
from .sections import section_name
from .batch_cmds import *
from .s1_title import inp_in_title
from .s2_junctions import inp_in_junction
from .s3_reservoirs import inp_in_reservoir
from .s4_tanks import inp_in_tank
from .s5_pipes import inp_in_pipe
from .s6_pumps import inp_in_pump
from .s7_valves import inp_in_valve
from .s8_tags import inp_in_tag
from .s9_demands import inp_in_demand
from .s10_status import inp_in_status
from .s11_patterns import inp_in_pattern
from .s12_curves import inp_in_curve
from .s13_controls import inp_in_control
from .s14_rules import inp_in_rule
from .s15_energy import inp_in_energy
from .s16_emitters import inp_in_emitter
from .s17_quality import inp_in_quality
from .s18_sources import inp_in_source
from .s19_reactions import inp_in_reaction
from .s20_mixing import inp_in_mixing
from .s21_times import inp_in_time
from .s22_report import inp_in_report
from .s23_options import inp_in_option
from .s24_coordinates import inp_in_coord
from .s25_vertices import inp_in_vertex
from .s26_labels import inp_in_label
from .s27_backdrop import inp_in_backdrop
#from .s28_end import *
def _parse_inp(inp: str) -> dict[str, list[str]]:
file: dict[str, list[str]] = {}
for s in section_name:
file[s] = []
section = ''
for line in open(inp):
line = line.strip()
if line == '':
# skip empty line for control and rule
if section == 'CONTROLS' or section == 'RULES':
pass
else:
section = ''
continue
if line.startswith('['):
is_section = False
for s in section_name:
if line.startswith(f'[{s}'):
section = s
is_section = True
break
if is_section:
continue
if section != '':
file[section].append(line)
return file
def _parse_cs(cs: ChangeSet) -> dict[str, list[str]]:
file: dict[str, list[str]] = {}
for s in section_name:
file[s] = []
section = ''
for line in str(cs.operations[0]['inp']).split('\n'):
line = line.strip()
if line == '':
# skip empty line for control and rule
if section == 'CONTROLS' or section == 'RULES':
pass
else:
section = ''
continue
if line.startswith('['):
is_section = False
for s in section_name:
if line.startswith(f'[{s}'):
section = s
is_section = True
break
if is_section:
continue
if section != '':
file[section].append(line)
return file
def _read_inp(file: dict[str, list[str]]) -> ChangeSet:
file_cs: dict[str, ChangeSet] = {}
for s in section_name:
file_cs[s] = ChangeSet()
for name, section in file.items():
if name == 'TITLE':
file_cs[name].merge(inp_in_title(section))
elif name == 'JUNCTIONS': # + coords
file_cs[name].merge(inp_in_junction(section))
elif name == 'RESERVOIRS': # + coords
file_cs[name].merge(inp_in_reservoir(section))
elif name == 'TANKS': # + coords
file_cs[name].merge(inp_in_tank(section))
elif name == 'PIPES':
file_cs[name].merge(inp_in_pipe(section))
elif name == 'PUMPS':
file_cs[name].merge(inp_in_pump(section))
elif name == 'VALVES':
file_cs[name].merge(inp_in_valve(section))
elif name == 'TAGS':
file_cs[name].merge(inp_in_tag(section))
elif name == 'DEMANDS':
file_cs[name].merge(inp_in_demand(section))
elif name == 'STATUS':
file_cs[name].merge(inp_in_status(section))
elif name == 'PATTERNS':
file_cs[name].merge(inp_in_pattern(section))
elif name == 'CURVES':
file_cs[name].merge(inp_in_curve(section))
elif name == 'CONTROLS':
file_cs[name].merge(inp_in_control(section))
elif name == 'RULES':
file_cs[name].merge(inp_in_rule(section))
elif name == 'ENERGY':
file_cs[name].merge(inp_in_energy(section))
elif name == 'EMITTERS':
file_cs[name].merge(inp_in_emitter(section))
elif name == 'QUALITY':
file_cs[name].merge(inp_in_quality(section))
elif name == 'SOURCES':
file_cs[name].merge(inp_in_source(section))
elif name == 'REACTIONS':
file_cs[name].merge(inp_in_reaction(section))
elif name == 'MIXING':
file_cs[name].merge(inp_in_mixing(section))
elif name == 'TIMES':
file_cs[name].merge(inp_in_time(section))
elif name == 'REPORT':
pass # ignore now, for simulation, always report all
elif name == 'OPTIONS':
file_cs[name].merge(inp_in_option(section))
elif name == 'COORDINATES':
coords = inp_in_coord(section)
for s in ['JUNCTIONS', 'RESERVOIRS', 'TANKS']:
for node in file_cs[s].operations:
if node['type'] == 'demand':
continue
if node['id'] in coords:
coord = coords[node['id']]
node |= { 'x' : coord['x'], 'y' : coord['y'] }
else:
print(f"WARNING: [{s}] {node['id']} has no coordinate, set it at origin!")
node |= { 'x' : 0.0, 'y' : 0.0 }
elif name == 'VERTICES':
file_cs[name].merge(inp_in_vertex(section))
elif name == 'LABELS':
file_cs[name].merge(inp_in_label(section))
elif name == 'BACKDROP':
file_cs[name].merge(inp_in_backdrop(section))
elif name == 'END':
pass # :)
# release file
file = {}
cs = ChangeSet()
priorities = [
'PATTERNS',
'CURVES',
'JUNCTIONS',
'RESERVOIRS',
'TANKS',
'COORDINATES',
'PIPES',
'PUMPS',
'VALVES',
'DEMANDS',
'STATUS',
'OPTIONS',
'TIMES',
'EMITTERS',
'QUALITY',
'SOURCES',
'REACTIONS',
'MIXING',
'ENERGY',
'REPORT',
'VERTICES',
'CONTROLS',
'RULES',
'TITLE',
'TAGS',
'LABELS',
'BACKDROP',
'END',
]
for s in priorities:
cs.merge(file_cs[s])
return cs
def read_inp(project: str, inp: str):
if is_project_open(project):
close_project(project)
if have_project(project):
delete_project(project)
create_project(project)
open_project(project)
file = _parse_inp(inp)
cs = _read_inp(file)
execute_batch_commands(project, cs)
op = get_current_operation(project)
set_restore_operation(project, op)
close_project(project)
def import_inp(project: str, cs: ChangeSet) -> ChangeSet:
if is_project_open(project):
close_project(project)
if have_project(project):
delete_project(project)
create_project(project)
open_project(project)
file = _parse_cs(cs)
new_cs = _read_inp(file)
success_cs = execute_batch_commands(project, new_cs)
op = get_current_operation(project)
set_restore_operation(project, op)
close_project(project)
# return ?
return success_cs