import ctypes import platform import os import sys import json import base64 from typing import Any sys.path.append("..") from api import project from api import inp_out def _verify_platform(): _platform = platform.system() if _platform != "Windows": raise Exception(f'Platform {_platform} unsupported (not yet)') if __name__ == '__main__': _verify_platform() class Output: def __init__(self, path: str) -> None: self._path = path self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet', 'epanet-output.dll')) self._handle = ctypes.c_void_p() self._check(self._lib.ENR_init(ctypes.byref(self._handle))) self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode()))) def __del__(self): # throw exception in destructor ? :) self._check(self._lib.ENR_close(ctypes.byref(self._handle))) def _check(self, result): if result != 0 and result != 10: msg = ctypes.c_char_p() code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg)) assert code == result error = f'Failed to read project [{self._path}] output, message [{msg.value.decode()}]' self._lib.ENR_free(ctypes.byref(msg)) raise Exception(error) def version(self) -> int: v = ctypes.c_int() self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v))) return v.value def net_size(self) -> dict[str, int]: element_count = ctypes.POINTER(ctypes.c_int)() length = ctypes.c_int() self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length))) assert length.value == 5 category = ['node', 'tank', 'link', 'pump', 'valve'] sizes = {} for i in range(length.value): sizes[category[i]] = element_count[i] self._lib.ENR_free(ctypes.byref(element_count)) return sizes def units(self) -> dict[str, str]: f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD'] p_us = ['PSI', 'MTR', 'KPA'] q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT'] f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3) f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int() self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u))) self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u))) self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u))) return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] } def times(self) -> dict[str, int]: ts = [] for i in range(1, 5): t = ctypes.c_int(1) self._check(self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t))) ts.append(t.value) d = {} category = ['report_start', 'report_step', 'sim_duration', 'num_periods'] for i in range(4): d[category[i]] = ts[i] return d def element_name(self) -> dict[str, list[str]]: sizes = self.net_size() node_type = ctypes.c_int(1) nodes = [] for i in range(1, sizes['node'] + 1): name = ctypes.c_char_p() name_len = ctypes.c_int() self._check(self._lib.ENR_getElementName(self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len))) nodes.append(name.value.decode()) self._lib.ENR_free(ctypes.byref(name)) link_type = ctypes.c_int(2) links = [] for i in range(1, sizes['link'] + 1): name = ctypes.c_char_p() name_len = ctypes.c_int() self._check(self._lib.ENR_getElementName(self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len))) links.append(name.value.decode()) self._lib.ENR_free(ctypes.byref(name)) return { 'nodes' : nodes, 'links': links } def energy_usage(self) -> list[dict[str, Any]]: size = self.net_size()['pump'] usages = [] category = ['utilization', 'avg.efficiency', 'avg.kW/flow', 'avg.kwatts', 'max.kwatts', 'cost/day' ] links = self.element_name()['links'] for i in range(1, size + 1): index = ctypes.c_int() values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check(self._lib.ENR_getEnergyUsage(self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length))) assert length.value == 6 d = { 'pump' : links[index.value - 1] } for j in range(length.value): d |= { category[j] : values[j] } usages.append(d) self._lib.ENR_free(ctypes.byref(values)) return usages def reactions(self) -> dict[str, float]: values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check(self._lib.ENR_getNetReacts(self._handle, ctypes.byref(values), ctypes.byref(length))) assert length.value == 4 category = ['bulk', 'wall', 'tank', 'source'] d = {} for i in range(4): d[category[i]] = values[i] self._lib.ENR_free(ctypes.byref(values)) return d def node_results(self) -> list[dict[str, Any]]: size = self.net_size()['node'] num_periods = self.times()['num_periods'] nodes = self.element_name()['nodes'] category = ['demand', 'head', 'pressure', 'quality'] ds = [] for i in range(1, size + 1): d = { 'node' : nodes[i - 1], 'result' : [] } for j in range(num_periods): values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check(self._lib.ENR_getNodeResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length))) assert length.value == len(category) attributes = {} for k in range(length.value): attributes[category[k]] = values[k] d['result'].append(attributes) self._lib.ENR_free(ctypes.byref(values)) ds.append(d) return ds def link_results(self) -> list[dict[str, Any]]: size = self.net_size()['link'] num_periods = self.times()['num_periods'] links = self.element_name()['links'] category = ['flow', 'velocity', 'headloss', 'quality', 'status', 'setting', 'reaction', 'friction'] ds = [] for i in range(1, size + 1): d = { 'link' : links[i - 1], 'result' : [] } for j in range(num_periods): values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check(self._lib.ENR_getLinkResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length))) assert length.value == len(category) attributes = {} for k in range(length.value): if category[k] == 'status': if values[k] == 2.0: attributes[category[k]] = 'CLOSED' else: attributes[category[k]] = 'OPEN' continue attributes[category[k]] = values[k] d['result'].append(attributes) self._lib.ENR_free(ctypes.byref(values)) ds.append(d) return ds def dump(self) -> dict[str, Any]: data = {} data |= { 'version' : self.version() } data |= { 'net_size' : self.net_size() } data |= { 'units' : self.units() } data |= { 'times' : self.times() } data |= { 'element_name' : self.element_name() } data |= { 'energy_usage' : self.energy_usage() } data |= { 'reactions' : self.reactions() } data |= { 'node_results' : self.node_results() } data |= { 'link_results' : self.link_results() } return data def _dump_output(path: str) -> dict[str, Any]: opt = Output(path) data = opt.dump() with open(path + '.json', 'w') as f: json.dump(data, f) return data def dump_output(path: str) -> str: data = _dump_output(path) return json.dumps(data) def dump_report(path: str) -> str: return open(path, 'r').read() def dump_output_binary(path: str) -> str: with open(path, 'rb') as f: data = f.read() bast64_data = base64.b64encode(data) return str(bast64_data, 'utf-8') #DingZQ, 2025-02-04, 返回dict[str, Any] def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]: if not project.have_project(name): raise Exception(f'Not found project [{name}]') dir = os.path.abspath(os.getcwd()) db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp') inp_out.dump_inp(name, db_inp, '2') input = name + '.db' exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe') inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp') rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt') opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt') command = f'{exe} {inp} {rpt} {opt}' data = {} result = os.system(command) if result != 0: data['simulation_result'] = 'failed' else: data['simulation_result'] = 'successful' if readable_output: data ['output'] = _dump_output(opt) else: data['output'] = dump_output_binary(opt) data['report'] = dump_report(rpt) return data # original code def run_project(name: str, readable_output: bool = False) -> str: if not project.have_project(name): raise Exception(f'Not found project [{name}]') dir = os.path.abspath(os.getcwd()) db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp') inp_out.dump_inp(name, db_inp, '2') input = name + '.db' exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe') inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp') rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt') opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt') command = f'{exe} {inp} {rpt} {opt}' data = {} result = os.system(command) if result != 0: data['simulation_result'] = 'failed' else: data['simulation_result'] = 'successful' if readable_output: data |= _dump_output(opt) else: data['output'] = dump_output_binary(opt) data['report'] = dump_report(rpt) return json.dumps(data) def run_inp(name: str) -> str: dir = os.path.abspath(os.getcwd()) exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe') inp = os.path.join(os.path.join(dir, 'inp'), name + '.inp') rpt = os.path.join(os.path.join(dir, 'temp'), name + '.rpt') opt = os.path.join(os.path.join(dir, 'temp'), name + '.opt') command = f'{exe} {inp} {rpt} {opt}' data = {} result = os.system(command) if result != 0: data['simulation_result'] = 'failed' else: data['simulation_result'] = 'successful' # data |= _dump_output(opt) data['output'] = dump_output_binary(opt) data['report'] = dump_report(rpt) return json.dumps(data)