Read data from output
This commit is contained in:
@@ -13,10 +13,81 @@ def _verify_platform():
|
|||||||
raise Exception(f'Platform {_platform} unsupported (not yet)')
|
raise Exception(f'Platform {_platform} unsupported (not yet)')
|
||||||
|
|
||||||
|
|
||||||
|
class Output:
|
||||||
|
def __init__(self, name: str) -> None:
|
||||||
|
self._name = name
|
||||||
|
|
||||||
|
self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet-output.dll'))
|
||||||
|
|
||||||
|
self._handle = ctypes.c_void_p()
|
||||||
|
self._check(self._lib.ENR_init(ctypes.byref(self._handle)))
|
||||||
|
|
||||||
|
dir = os.path.dirname(os.getcwd())
|
||||||
|
opt = os.path.join(os.path.join(dir, 'temp'), self._name + '.db.opt')
|
||||||
|
self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(opt.encode())))
|
||||||
|
|
||||||
|
|
||||||
|
def _check(self, result):
|
||||||
|
if result != 0:
|
||||||
|
msg = ctypes.c_char_p()
|
||||||
|
code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg))
|
||||||
|
assert code == result
|
||||||
|
|
||||||
|
error = f'Failed to read project [{self._name}] output, message [{msg.value.decode()}]'
|
||||||
|
|
||||||
|
self._lib.ENR_free(ctypes.byref(msg))
|
||||||
|
|
||||||
|
raise Exception(error)
|
||||||
|
|
||||||
|
|
||||||
|
def version(self) -> int:
|
||||||
|
v = ctypes.c_int()
|
||||||
|
self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v)))
|
||||||
|
return v.value
|
||||||
|
|
||||||
|
|
||||||
|
def net_size(self) -> dict[str, int]:
|
||||||
|
element_count = ctypes.POINTER(ctypes.c_int)()
|
||||||
|
length = ctypes.c_int()
|
||||||
|
self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length)))
|
||||||
|
assert length.value == 5
|
||||||
|
category = ['node', 'tank', 'link', 'pump', 'valve']
|
||||||
|
counts = {}
|
||||||
|
for i in range(length.value):
|
||||||
|
counts[category[i]] = element_count[i]
|
||||||
|
self._lib.ENR_free(ctypes.byref(element_count))
|
||||||
|
return counts
|
||||||
|
|
||||||
|
|
||||||
|
def units(self) -> dict[str, str]:
|
||||||
|
f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD']
|
||||||
|
p_us = ['PSI', 'MTR', 'KPA']
|
||||||
|
q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT']
|
||||||
|
f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3)
|
||||||
|
f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int()
|
||||||
|
self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u)))
|
||||||
|
self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u)))
|
||||||
|
self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u)))
|
||||||
|
return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] }
|
||||||
|
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
# throw exception in destructor ? :)
|
||||||
|
self._check(self._lib.ENR_close(ctypes.byref(self._handle)))
|
||||||
|
|
||||||
|
|
||||||
|
def _read_output(name: str) -> str:
|
||||||
|
opt = Output(name)
|
||||||
|
print(opt.version())
|
||||||
|
print(opt.net_size())
|
||||||
|
print(opt.units())
|
||||||
|
return ''
|
||||||
|
|
||||||
|
|
||||||
# readable True => json, False => binary output
|
# readable True => json, False => binary output
|
||||||
def run_project(name: str, readable: bool = True) -> str:
|
def run_project(name: str, readable: bool = True) -> str:
|
||||||
if not project.have_project(name):
|
if not project.have_project(name):
|
||||||
raise Exception(f'Not found project {name}')
|
raise Exception(f'Not found project [{name}]')
|
||||||
|
|
||||||
dir = os.path.dirname(os.getcwd())
|
dir = os.path.dirname(os.getcwd())
|
||||||
|
|
||||||
@@ -32,25 +103,11 @@ def run_project(name: str, readable: bool = True) -> str:
|
|||||||
|
|
||||||
result = os.system(command)
|
result = os.system(command)
|
||||||
if result != 0:
|
if result != 0:
|
||||||
raise Exception("Failed to run simulation for project {name}")
|
raise Exception("Failed to run simulation for project [{name}]")
|
||||||
|
|
||||||
_lib_output = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet-output.dll'))
|
return _read_output(name)
|
||||||
|
|
||||||
handle = ctypes.c_void_p()
|
|
||||||
|
|
||||||
def _check(result):
|
|
||||||
if result != 0:
|
|
||||||
raise Exception(f'Failed to read output {name}')
|
|
||||||
|
|
||||||
_check(_lib_output.ENR_init(ctypes.byref(handle)))
|
|
||||||
|
|
||||||
_check(_lib_output.ENR_open(handle, ctypes.c_char_p(opt.encode())))
|
|
||||||
|
|
||||||
_check(_lib_output.ENR_close(ctypes.byref(handle)))
|
|
||||||
|
|
||||||
return ''
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
_verify_platform()
|
_verify_platform()
|
||||||
run_project('net3')
|
_read_output('net3')
|
||||||
|
|||||||
Reference in New Issue
Block a user