From 36d121a425937727ed6815862afe31e4143dbc91 Mon Sep 17 00:00:00 2001 From: "WQY\\qiong" Date: Sat, 19 Nov 2022 17:08:12 +0800 Subject: [PATCH] Read data from output --- epanet/epanet.py | 93 ++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 18 deletions(-) diff --git a/epanet/epanet.py b/epanet/epanet.py index 1ad3169..4a3b670 100644 --- a/epanet/epanet.py +++ b/epanet/epanet.py @@ -13,10 +13,81 @@ def _verify_platform(): raise Exception(f'Platform {_platform} unsupported (not yet)') +class Output: + def __init__(self, name: str) -> None: + self._name = name + + self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet-output.dll')) + + self._handle = ctypes.c_void_p() + self._check(self._lib.ENR_init(ctypes.byref(self._handle))) + + dir = os.path.dirname(os.getcwd()) + opt = os.path.join(os.path.join(dir, 'temp'), self._name + '.db.opt') + self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(opt.encode()))) + + + def _check(self, result): + if result != 0: + msg = ctypes.c_char_p() + code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg)) + assert code == result + + error = f'Failed to read project [{self._name}] output, message [{msg.value.decode()}]' + + self._lib.ENR_free(ctypes.byref(msg)) + + raise Exception(error) + + + def version(self) -> int: + v = ctypes.c_int() + self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v))) + return v.value + + + def net_size(self) -> dict[str, int]: + element_count = ctypes.POINTER(ctypes.c_int)() + length = ctypes.c_int() + self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length))) + assert length.value == 5 + category = ['node', 'tank', 'link', 'pump', 'valve'] + counts = {} + for i in range(length.value): + counts[category[i]] = element_count[i] + self._lib.ENR_free(ctypes.byref(element_count)) + return counts + + + def units(self) -> dict[str, str]: + f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD'] + p_us = ['PSI', 'MTR', 'KPA'] + q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT'] + f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3) + f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int() + self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u))) + self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u))) + self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u))) + return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] } + + + def __del__(self): + # throw exception in destructor ? :) + self._check(self._lib.ENR_close(ctypes.byref(self._handle))) + + +def _read_output(name: str) -> str: + opt = Output(name) + print(opt.version()) + print(opt.net_size()) + print(opt.units()) + return '' + + # readable True => json, False => binary output def run_project(name: str, readable: bool = True) -> str: if not project.have_project(name): - raise Exception(f'Not found project {name}') + raise Exception(f'Not found project [{name}]') dir = os.path.dirname(os.getcwd()) @@ -32,25 +103,11 @@ def run_project(name: str, readable: bool = True) -> str: result = os.system(command) if result != 0: - raise Exception("Failed to run simulation for project {name}") + raise Exception("Failed to run simulation for project [{name}]") - _lib_output = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet-output.dll')) - - handle = ctypes.c_void_p() - - def _check(result): - if result != 0: - raise Exception(f'Failed to read output {name}') - - _check(_lib_output.ENR_init(ctypes.byref(handle))) - - _check(_lib_output.ENR_open(handle, ctypes.c_char_p(opt.encode()))) - - _check(_lib_output.ENR_close(ctypes.byref(handle))) - - return '' + return _read_output(name) if __name__ == '__main__': _verify_platform() - run_project('net3') + _read_output('net3')