from .project import * from .database import ChangeSet, get_current_operation, set_restore_operation from .sections import section_name from .batch_cmds import execute_batch_commands from .s1_title import inp_in_title from .s2_junctions import inp_in_junction from .s3_reservoirs import inp_in_reservoir from .s4_tanks import inp_in_tank from .s5_pipes import inp_in_pipe from .s6_pumps import inp_in_pump from .s7_valves import inp_in_valve from .s8_tags import inp_in_tag from .s9_demands import inp_in_demand from .s10_status import inp_in_status from .s11_patterns import inp_in_pattern from .s12_curves import inp_in_curve from .s13_controls import inp_in_control from .s14_rules import inp_in_rule from .s15_energy import inp_in_energy from .s16_emitters import inp_in_emitter from .s17_quality import inp_in_quality from .s18_sources import inp_in_source from .s19_reactions import inp_in_reaction from .s20_mixing import inp_in_mixing from .s21_times import inp_in_time from .s22_report import inp_in_report from .s23_options import inp_in_option from .s24_coordinates import inp_in_coord from .s25_vertices import inp_in_vertex from .s26_labels import inp_in_label from .s27_backdrop import inp_in_backdrop #from .s28_end import * def _parse_inp(inp: str) -> dict[str, list[str]]: file: dict[str, list[str]] = {} for s in section_name: file[s] = [] section = '' for line in open(inp): line = line.strip() if line == '': # skip empty line for control and rule if section == 'CONTROLS' or section == 'RULES': pass else: section = '' continue if line.startswith('['): is_section = False for s in section_name: if line.startswith(f'[{s}'): section = s is_section = True break if is_section: continue if section != '': file[section].append(line) return file def _parse_cs(cs: ChangeSet) -> dict[str, list[str]]: file: dict[str, list[str]] = {} for s in section_name: file[s] = [] section = '' for line in str(cs.operations[0]['inp']).split('\n'): line = line.strip() if line == '': # skip empty line for control and rule if section == 'CONTROLS' or section == 'RULES': pass else: section = '' continue if line.startswith('['): is_section = False for s in section_name: if line.startswith(f'[{s}'): section = s is_section = True break if is_section: continue if section != '': file[section].append(line) return file def _read_inp(file: dict[str, list[str]]) -> ChangeSet: file_cs: dict[str, ChangeSet] = {} for s in section_name: file_cs[s] = ChangeSet() for name, section in file.items(): if name == 'TITLE': file_cs[name].merge(inp_in_title(section)) elif name == 'JUNCTIONS': # + coords file_cs[name].merge(inp_in_junction(section)) elif name == 'RESERVOIRS': # + coords file_cs[name].merge(inp_in_reservoir(section)) elif name == 'TANKS': # + coords file_cs[name].merge(inp_in_tank(section)) elif name == 'PIPES': file_cs[name].merge(inp_in_pipe(section)) elif name == 'PUMPS': file_cs[name].merge(inp_in_pump(section)) elif name == 'VALVES': file_cs[name].merge(inp_in_valve(section)) elif name == 'TAGS': file_cs[name].merge(inp_in_tag(section)) elif name == 'DEMANDS': file_cs[name].merge(inp_in_demand(section)) elif name == 'STATUS': file_cs[name].merge(inp_in_status(section)) elif name == 'PATTERNS': file_cs[name].merge(inp_in_pattern(section)) elif name == 'CURVES': file_cs[name].merge(inp_in_curve(section)) elif name == 'CONTROLS': file_cs[name].merge(inp_in_control(section)) elif name == 'RULES': file_cs[name].merge(inp_in_rule(section)) elif name == 'ENERGY': file_cs[name].merge(inp_in_energy(section)) elif name == 'EMITTERS': file_cs[name].merge(inp_in_emitter(section)) elif name == 'QUALITY': file_cs[name].merge(inp_in_quality(section)) elif name == 'SOURCES': file_cs[name].merge(inp_in_source(section)) elif name == 'REACTIONS': file_cs[name].merge(inp_in_reaction(section)) elif name == 'MIXING': file_cs[name].merge(inp_in_mixing(section)) elif name == 'TIMES': file_cs[name].merge(inp_in_time(section)) elif name == 'REPORT': file_cs[name].merge(inp_in_report(section)) elif name == 'OPTIONS': file_cs[name].merge(inp_in_option(section)) elif name == 'COORDINATES': coords = inp_in_coord(section) for s in ['JUNCTIONS', 'RESERVOIRS', 'TANKS']: for node in file_cs[s].operations: if node['type'] == 'demand': continue if node['id'] in coords: coord = coords[node['id']] node |= { 'x' : coord['x'], 'y' : coord['y'] } else: print(f"WARNING: [{s}] {node['id']} has no coordinate, set it at origin!") node |= { 'x' : 0.0, 'y' : 0.0 } elif name == 'VERTICES': file_cs[name].merge(inp_in_vertex(section)) elif name == 'LABELS': file_cs[name].merge(inp_in_label(section)) elif name == 'BACKDROP': file_cs[name].merge(inp_in_backdrop(section)) elif name == 'END': pass # :) # release file file = {} cs = ChangeSet() priorities = [ 'PATTERNS', 'CURVES', 'JUNCTIONS', 'RESERVOIRS', 'TANKS', 'COORDINATES', 'PIPES', 'PUMPS', 'VALVES', 'DEMANDS', 'STATUS', 'OPTIONS', 'TIMES', 'EMITTERS', 'QUALITY', 'SOURCES', 'REACTIONS', 'MIXING', 'ENERGY', 'REPORT', 'VERTICES', 'CONTROLS', 'RULES', 'TITLE', 'TAGS', 'LABELS', 'BACKDROP', 'END', ] for s in priorities: cs.merge(file_cs[s]) return cs def read_inp(project: str, inp: str): if is_project_open(project): close_project(project) if have_project(project): delete_project(project) create_project(project) open_project(project) file = _parse_inp(inp) cs = _read_inp(file) execute_batch_commands(project, cs) op = get_current_operation(project) set_restore_operation(project, op) close_project(project) def import_inp(project: str, cs: ChangeSet) -> ChangeSet: if is_project_open(project): close_project(project) if have_project(project): delete_project(project) create_project(project) open_project(project) file = _parse_cs(cs) new_cs = _read_inp(file) success_cs = execute_batch_commands(project, new_cs) op = get_current_operation(project) set_restore_operation(project, op) close_project(project) # return ? return success_cs