import ctypes import platform import os import sys import json import base64 from datetime import datetime import subprocess import logging from typing import Any sys.path.append("..") from api import project from api import inp_out def _verify_platform(): _platform = platform.system() if _platform not in ["Windows", "Linux"]: raise Exception(f"Platform {_platform} unsupported (not yet)") if __name__ == "__main__": _verify_platform() class Output: def __init__(self, path: str) -> None: self._path = path if platform.system() == "Windows": self._lib = ctypes.CDLL( os.path.join(os.path.dirname(__file__), "windows", "epanet-output.dll") ) else: self._lib = ctypes.CDLL( os.path.join(os.path.dirname(__file__), "linux", "libepanet-output.so") ) self._handle = ctypes.c_void_p() self._check(self._lib.ENR_init(ctypes.byref(self._handle))) self._check( self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode())) ) def __del__(self): # throw exception in destructor ? :) self._check(self._lib.ENR_close(ctypes.byref(self._handle))) def _check(self, result): if result != 0 and result != 10: msg = ctypes.c_char_p() code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg)) assert code == result error = f"Failed to read project [{self._path}] output, message [{msg.value.decode()}]" self._lib.ENR_free(ctypes.byref(msg)) raise Exception(error) def version(self) -> int: v = ctypes.c_int() self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v))) return v.value def net_size(self) -> dict[str, int]: element_count = ctypes.POINTER(ctypes.c_int)() length = ctypes.c_int() self._check( self._lib.ENR_getNetSize( self._handle, ctypes.byref(element_count), ctypes.byref(length) ) ) assert length.value == 5 category = ["node", "tank", "link", "pump", "valve"] sizes = {} for i in range(length.value): sizes[category[i]] = element_count[i] self._lib.ENR_free(ctypes.byref(element_count)) return sizes def units(self) -> dict[str, str]: f_us = ["CFS", "GPM", "MGD", "IMGD", "AFD", "LPS", "LPM", "MLD", "CMH", "CMD"] p_us = ["PSI", "MTR", "KPA"] q_us = ["NONE", "MGL", "UGL", "HOURS", "PRCNT"] f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3) f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int() self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u))) self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u))) self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u))) return { "flow": f_us[f_u.value], "pressure": p_us[p_u.value], "quality": q_us[q_u.value], } def times(self) -> dict[str, int]: ts = [] for i in range(1, 5): t = ctypes.c_int(1) self._check( self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t)) ) ts.append(t.value) d = {} category = ["report_start", "report_step", "sim_duration", "num_periods"] for i in range(4): d[category[i]] = ts[i] return d def element_name(self) -> dict[str, list[str]]: sizes = self.net_size() node_type = ctypes.c_int(1) nodes = [] for i in range(1, sizes["node"] + 1): name = ctypes.c_char_p() name_len = ctypes.c_int() self._check( self._lib.ENR_getElementName( self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len), ) ) nodes.append(name.value.decode()) self._lib.ENR_free(ctypes.byref(name)) link_type = ctypes.c_int(2) links = [] for i in range(1, sizes["link"] + 1): name = ctypes.c_char_p() name_len = ctypes.c_int() self._check( self._lib.ENR_getElementName( self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len), ) ) links.append(name.value.decode()) self._lib.ENR_free(ctypes.byref(name)) return {"nodes": nodes, "links": links} def energy_usage(self) -> list[dict[str, Any]]: size = self.net_size()["pump"] usages = [] category = [ "utilization", "avg.efficiency", "avg.kW/flow", "avg.kwatts", "max.kwatts", "cost/day", ] links = self.element_name()["links"] for i in range(1, size + 1): index = ctypes.c_int() values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check( self._lib.ENR_getEnergyUsage( self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length), ) ) assert length.value == 6 d = {"pump": links[index.value - 1]} for j in range(length.value): d |= {category[j]: values[j]} usages.append(d) self._lib.ENR_free(ctypes.byref(values)) return usages def reactions(self) -> dict[str, float]: values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check( self._lib.ENR_getNetReacts( self._handle, ctypes.byref(values), ctypes.byref(length) ) ) assert length.value == 4 category = ["bulk", "wall", "tank", "source"] d = {} for i in range(4): d[category[i]] = values[i] self._lib.ENR_free(ctypes.byref(values)) return d def node_results(self) -> list[dict[str, Any]]: size = self.net_size()["node"] num_periods = self.times()["num_periods"] nodes = self.element_name()["nodes"] category = ["demand", "head", "pressure", "quality"] ds = [] for i in range(1, size + 1): d = {"node": nodes[i - 1], "result": []} for j in range(num_periods): values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check( self._lib.ENR_getNodeResult( self._handle, j, i, ctypes.byref(values), ctypes.byref(length) ) ) assert length.value == len(category) attributes = {} for k in range(length.value): attributes[category[k]] = values[k] d["result"].append(attributes) self._lib.ENR_free(ctypes.byref(values)) ds.append(d) return ds def link_results(self) -> list[dict[str, Any]]: size = self.net_size()["link"] num_periods = self.times()["num_periods"] links = self.element_name()["links"] category = [ "flow", "velocity", "headloss", "quality", "status", "setting", "reaction", "friction", ] ds = [] for i in range(1, size + 1): d = {"link": links[i - 1], "result": []} for j in range(num_periods): values = ctypes.POINTER(ctypes.c_float)() length = ctypes.c_int() self._check( self._lib.ENR_getLinkResult( self._handle, j, i, ctypes.byref(values), ctypes.byref(length) ) ) assert length.value == len(category) attributes = {} for k in range(length.value): if category[k] == "status": if values[k] == 2.0: attributes[category[k]] = "CLOSED" else: attributes[category[k]] = "OPEN" continue attributes[category[k]] = values[k] d["result"].append(attributes) self._lib.ENR_free(ctypes.byref(values)) ds.append(d) return ds def dump(self) -> dict[str, Any]: data = {} data |= {"version": self.version()} data |= {"net_size": self.net_size()} data |= {"units": self.units()} data |= {"times": self.times()} data |= {"element_name": self.element_name()} data |= {"energy_usage": self.energy_usage()} data |= {"reactions": self.reactions()} data |= {"node_results": self.node_results()} data |= {"link_results": self.link_results()} return data def _dump_output(path: str) -> dict[str, Any]: opt = Output(path) data = opt.dump() with open(path + ".json", "w") as f: json.dump(data, f) return data def dump_output(path: str) -> str: data = _dump_output(path) return json.dumps(data) def dump_report(path: str) -> str: return open(path, "r").read() def dump_output_binary(path: str) -> str: with open(path, "rb") as f: data = f.read() bast64_data = base64.b64encode(data) return str(bast64_data, "utf-8") # DingZQ, 2025-02-04, 返回dict[str, Any] def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]: if not project.have_project(name): raise Exception(f"Not found project [{name}]") dir = os.path.abspath(os.getcwd()) db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp") inp_out.dump_inp(name, db_inp, "2") input = name + ".db" if platform.system() == "Windows": exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") else: exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp") rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt") opt = os.path.join(os.path.join(dir, "temp"), input + ".opt") command = f"{exe} {inp} {rpt} {opt}" if platform.system() != "Windows": if not os.access(exe, os.X_OK): os.chmod(exe, 0o755) data = {} # 设置环境变量以包含库文件路径 env = os.environ.copy() if platform.system() == "Linux": lib_dir = os.path.dirname(exe) env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" # 使用 subprocess 替代 os.system 以传递 env process = subprocess.run(command, shell=True, env=env) result = process.returncode if result != 0: data["simulation_result"] = "failed" else: data["simulation_result"] = "successful" if readable_output: data["output"] = _dump_output(opt) else: data["output"] = dump_output_binary(opt) data["report"] = dump_report(rpt) return data # original code def run_project(name: str, readable_output: bool = False) -> str: if not project.have_project(name): raise Exception(f"Not found project [{name}]") dir = os.path.abspath(os.getcwd()) db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp") inp_out.dump_inp(name, db_inp, "2") input = name + ".db" if platform.system() == "Windows": exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") else: exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp") rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt") opt = os.path.join(os.path.join(dir, "temp"), input + ".opt") command = f"{exe} {inp} {rpt} {opt}" logging.info(f"Run simulation at {datetime.now()}") logging.info(command) if platform.system() != "Windows": if not os.access(exe, os.X_OK): os.chmod(exe, 0o755) data = {} # 设置环境变量以包含库文件路径 env = os.environ.copy() if platform.system() == "Linux": lib_dir = os.path.dirname(exe) env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" # DingZQ, 2025-06-02, 使用subprocess替代os.system process = subprocess.run(command, shell=True, env=env) result = process.returncode # logging.info(f"Simulation result: {result}") if result != 0: data["simulation_result"] = "failed" logging.error("simulation failed") else: data["simulation_result"] = "successful" logging.info("simulation successful") if readable_output: data |= _dump_output(opt) else: data["output"] = dump_output_binary(opt) data["report"] = dump_report(rpt) # logging.info(f"Report: {data['report']}") return json.dumps(data) def run_inp(name: str) -> str: dir = os.path.abspath(os.getcwd()) if platform.system() == "Windows": exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") else: exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") inp = os.path.join(os.path.join(dir, "inp"), name + ".inp") rpt = os.path.join(os.path.join(dir, "temp"), name + ".rpt") opt = os.path.join(os.path.join(dir, "temp"), name + ".opt") command = f"{exe} {inp} {rpt} {opt}" if platform.system() != "Windows": if not os.access(exe, os.X_OK): os.chmod(exe, 0o755) data = {} # 设置环境变量以包含库文件路径 env = os.environ.copy() if platform.system() == "Linux": lib_dir = os.path.dirname(exe) env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" process = subprocess.run(command, shell=True, env=env) result = process.returncode if result != 0: data["simulation_result"] = "failed" else: data["simulation_result"] = "successful" # data |= _dump_output(opt) data["output"] = dump_output_binary(opt) data["report"] = dump_report(rpt) return json.dumps(data)