import ctypes import platform import os import sys sys.path.append("..") from api import project from api import parser def _verify_platform(): _platform = platform.system() if _platform != "Windows": raise Exception(f'Platform {_platform} unsupported (not yet)') class Output: def __init__(self, name: str) -> None: self._name = name self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet-output.dll')) self._handle = ctypes.c_void_p() self._check(self._lib.ENR_init(ctypes.byref(self._handle))) dir = os.path.dirname(os.getcwd()) opt = os.path.join(os.path.join(dir, 'temp'), self._name + '.db.opt') self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(opt.encode()))) def _check(self, result): if result != 0: msg = ctypes.c_char_p() code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg)) assert code == result error = f'Failed to read project [{self._name}] output, message [{msg.value.decode()}]' self._lib.ENR_free(ctypes.byref(msg)) raise Exception(error) def version(self) -> int: v = ctypes.c_int() self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v))) return v.value def net_size(self) -> dict[str, int]: element_count = ctypes.POINTER(ctypes.c_int)() length = ctypes.c_int() self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length))) assert length.value == 5 category = ['node', 'tank', 'link', 'pump', 'valve'] counts = {} for i in range(length.value): counts[category[i]] = element_count[i] self._lib.ENR_free(ctypes.byref(element_count)) return counts def units(self) -> dict[str, str]: f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD'] p_us = ['PSI', 'MTR', 'KPA'] q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT'] f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3) f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int() self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u))) self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u))) self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u))) return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] } def __del__(self): # throw exception in destructor ? :) self._check(self._lib.ENR_close(ctypes.byref(self._handle))) def _read_output(name: str) -> str: opt = Output(name) print(opt.version()) print(opt.net_size()) print(opt.units()) return '' # readable True => json, False => binary output def run_project(name: str, readable: bool = True) -> str: if not project.have_project(name): raise Exception(f'Not found project [{name}]') dir = os.path.dirname(os.getcwd()) db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp') parser.dump_inp(name, db_inp) input = name + '.db' exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe') inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp') rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt') opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt') command = f'{exe} {inp} {rpt} {opt}' result = os.system(command) if result != 0: raise Exception("Failed to run simulation for project [{name}]") return _read_output(name) if __name__ == '__main__': _verify_platform() _read_output('net3')