import ctypes import platform import os import sys import json from typing import Any sys.path.append("..") from api import project from api import parser def _verify_platform(): _platform = platform.system() if _platform != "Windows": raise Exception(f'Platform {_platform} unsupported (not yet)') class Output: def __init__(self, path: str) -> None: self._path = path self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet', 'epanet-output.dll')) self._handle = ctypes.c_void_p() self._check(self._lib.ENR_init(ctypes.byref(self._handle))) self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode()))) def __del__(self): # throw exception in destructor ? :) self._check(self._lib.ENR_close(ctypes.byref(self._handle))) def _check(self, result): if result != 0: msg = ctypes.c_char_p() code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg)) assert code == result error = f'Failed to read project [{self._path}] output, message [{msg.value.decode()}]' self._lib.ENR_free(ctypes.byref(msg)) raise Exception(error) def version(self) -> int: v = ctypes.c_int() self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v))) return v.value def net_size(self) -> dict[str, int]: element_count = ctypes.POINTER(ctypes.c_int)() length = ctypes.c_int() self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length))) assert length.value == 5 category = ['node', 'tank', 'link', 'pump', 'valve'] sizes = {} for i in range(length.value): sizes[category[i]] = element_count[i] self._lib.ENR_free(ctypes.byref(element_count)) return sizes def units(self) -> dict[str, str]: f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD'] p_us = ['PSI', 'MTR', 'KPA'] q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT'] f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3) f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int() self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u))) self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u))) self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u))) return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] } def times(self) -> dict[str, int]: ts = [] for i in range(1, 5): t = ctypes.c_int(1) self._check(self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t))) ts.append(t.value) d = {} category = ['report_start', 'report_step', 'sim_duration', 'num_periods'] for i in range(4): d[category[i]] = ts[i] return d def element_name(self) -> dict[str, list[str]]: sizes = self.net_size() node_type = ctypes.c_int(1) nodes = [] for i in range(1, sizes['node'] + 1): name = ctypes.c_char_p() name_len = ctypes.c_int() self._check(self._lib.ENR_getElementName(self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len))) nodes.append(name.value.decode()) self._lib.ENR_free(ctypes.byref(name)) link_type = ctypes.c_int(2) links = [] for i in range(1, sizes['link'] + 1): name = ctypes.c_char_p() name_len = ctypes.c_int() self._check(self._lib.ENR_getElementName(self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len))) links.append(name.value.decode()) self._lib.ENR_free(ctypes.byref(name)) return { 'nodes' : nodes, 'links': links } def energy_usage(self) -> list[dict[str, Any]]: size = self.net_size()['pump'] usages = [] category = ['utilization', 'avg.efficiency', 'avg.kW/flow', 'avg.kwatts', 'max.kwatts', 'cost/day' ] for i in range(1, size + 1): index = ctypes.c_int() values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check(self._lib.ENR_getEnergyUsage(self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length))) assert length.value == 6 d = { 'pump_index' : i - 1, 'link_index' : index.value - 1 } for j in range(length.value): d |= { category[j] : values[j] } usages.append(d) return usages def reactions(self) -> dict[str, float]: values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check(self._lib.ENR_getNetReacts(self._handle, ctypes.byref(values), ctypes.byref(length))) assert length.value == 4 category = ['bulk', 'wall', 'tank', 'source'] d = {} for i in range(4): d[category[i]] = values[i] return d def dump(self, cache: bool = True) -> str: data = {} data |= { 'version' : self.version() } data |= { 'net_size' : self.net_size() } data |= { 'units' : self.units() } data |= { 'times' : self.times() } data |= { 'element_name' : self.element_name() } data |= { 'energy_usage' : self.energy_usage() } data |= { 'reactions' : self.reactions() } if cache: with open(self._path + '.json', 'w') as f: json.dump(data, f) return json.dumps(data) def dump_output(path: str) -> str: opt = Output(path) return opt.dump() def run_project(name: str) -> str: if not project.have_project(name): raise Exception(f'Not found project [{name}]') dir = os.path.abspath(os.getcwd()) db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp') parser.dump_inp(name, db_inp) input = name + '.db' exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe') inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp') rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt') opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt') command = f'{exe} {inp} {rpt} {opt}' result = os.system(command) if result != 0: msg = f'Failed to run simulation for project [{name}]' raise Exception(msg) return dump_output(opt) def run_inp(name: str) -> str: dir = os.path.abspath(os.getcwd()) exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe') inp = os.path.join(os.path.join(dir, 'inp'), name + '.inp') rpt = os.path.join(os.path.join(dir, 'temp'), name + '.rpt') opt = os.path.join(os.path.join(dir, 'temp'), name + '.opt') command = f'{exe} {inp} {rpt} {opt}' result = os.system(command) if result != 0: msg = f'Failed to run simulation for project [{name}]' raise Exception(msg) return dump_output(opt) if __name__ == '__main__': _verify_platform() print(run_inp('net3'))