调整 epanet 从 services 迁到 infra

This commit is contained in:
2026-03-09 16:11:29 +08:00
parent 61d540356d
commit 6eec6c04de
14 changed files with 6 additions and 6 deletions
+1
View File
@@ -0,0 +1 @@
from .epanet import run_project, run_project_return_dict, run_inp, dump_output
+451
View File
@@ -0,0 +1,451 @@
import ctypes
import platform
import os
import sys
import json
import base64
from datetime import datetime
import subprocess
import logging
from typing import Any
sys.path.append("..")
from app.native.wndb import project
from app.native.wndb import inp_out
def _verify_platform():
_platform = platform.system()
if _platform not in ["Windows", "Linux"]:
raise Exception(f"Platform {_platform} unsupported (not yet)")
if __name__ == "__main__":
_verify_platform()
class Output:
def __init__(self, path: str) -> None:
self._path = path
if platform.system() == "Windows":
self._lib = ctypes.CDLL(
os.path.join(os.path.dirname(__file__), "windows", "epanet-output.dll")
)
else:
self._lib = ctypes.CDLL(
os.path.join(os.path.dirname(__file__), "linux", "libepanet-output.so")
)
self._handle = ctypes.c_void_p()
self._check(self._lib.ENR_init(ctypes.byref(self._handle)))
self._check(
self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode()))
)
def __del__(self):
# throw exception in destructor ? :)
self._check(self._lib.ENR_close(ctypes.byref(self._handle)))
def _check(self, result):
if result != 0 and result != 10:
msg = ctypes.c_char_p()
code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg))
assert code == result
error = f"Failed to read project [{self._path}] output, message [{msg.value.decode()}]"
self._lib.ENR_free(ctypes.byref(msg))
raise Exception(error)
def version(self) -> int:
v = ctypes.c_int()
self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v)))
return v.value
def net_size(self) -> dict[str, int]:
element_count = ctypes.POINTER(ctypes.c_int)()
length = ctypes.c_int()
self._check(
self._lib.ENR_getNetSize(
self._handle, ctypes.byref(element_count), ctypes.byref(length)
)
)
assert length.value == 5
category = ["node", "tank", "link", "pump", "valve"]
sizes = {}
for i in range(length.value):
sizes[category[i]] = element_count[i]
self._lib.ENR_free(ctypes.byref(element_count))
return sizes
def units(self) -> dict[str, str]:
f_us = ["CFS", "GPM", "MGD", "IMGD", "AFD", "LPS", "LPM", "MLD", "CMH", "CMD"]
p_us = ["PSI", "MTR", "KPA"]
q_us = ["NONE", "MGL", "UGL", "HOURS", "PRCNT"]
f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3)
f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int()
self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u)))
self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u)))
self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u)))
return {
"flow": f_us[f_u.value],
"pressure": p_us[p_u.value],
"quality": q_us[q_u.value],
}
def times(self) -> dict[str, int]:
ts = []
for i in range(1, 5):
t = ctypes.c_int(1)
self._check(
self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t))
)
ts.append(t.value)
d = {}
category = ["report_start", "report_step", "sim_duration", "num_periods"]
for i in range(4):
d[category[i]] = ts[i]
return d
def element_name(self) -> dict[str, list[str]]:
sizes = self.net_size()
node_type = ctypes.c_int(1)
nodes = []
for i in range(1, sizes["node"] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(
self._lib.ENR_getElementName(
self._handle,
node_type,
ctypes.c_int(i),
ctypes.byref(name),
ctypes.byref(name_len),
)
)
nodes.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
link_type = ctypes.c_int(2)
links = []
for i in range(1, sizes["link"] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(
self._lib.ENR_getElementName(
self._handle,
link_type,
ctypes.c_int(i),
ctypes.byref(name),
ctypes.byref(name_len),
)
)
links.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
return {"nodes": nodes, "links": links}
def energy_usage(self) -> list[dict[str, Any]]:
size = self.net_size()["pump"]
usages = []
category = [
"utilization",
"avg.efficiency",
"avg.kW/flow",
"avg.kwatts",
"max.kwatts",
"cost/day",
]
links = self.element_name()["links"]
for i in range(1, size + 1):
index = ctypes.c_int()
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(
self._lib.ENR_getEnergyUsage(
self._handle,
ctypes.c_int(i),
ctypes.byref(index),
ctypes.byref(values),
ctypes.byref(length),
)
)
assert length.value == 6
d = {"pump": links[index.value - 1]}
for j in range(length.value):
d |= {category[j]: values[j]}
usages.append(d)
self._lib.ENR_free(ctypes.byref(values))
return usages
def reactions(self) -> dict[str, float]:
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(
self._lib.ENR_getNetReacts(
self._handle, ctypes.byref(values), ctypes.byref(length)
)
)
assert length.value == 4
category = ["bulk", "wall", "tank", "source"]
d = {}
for i in range(4):
d[category[i]] = values[i]
self._lib.ENR_free(ctypes.byref(values))
return d
def node_results(self) -> list[dict[str, Any]]:
size = self.net_size()["node"]
num_periods = self.times()["num_periods"]
nodes = self.element_name()["nodes"]
category = ["demand", "head", "pressure", "quality"]
ds = []
for i in range(1, size + 1):
d = {"node": nodes[i - 1], "result": []}
for j in range(num_periods):
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(
self._lib.ENR_getNodeResult(
self._handle, j, i, ctypes.byref(values), ctypes.byref(length)
)
)
assert length.value == len(category)
attributes = {}
for k in range(length.value):
attributes[category[k]] = values[k]
d["result"].append(attributes)
self._lib.ENR_free(ctypes.byref(values))
ds.append(d)
return ds
def link_results(self) -> list[dict[str, Any]]:
size = self.net_size()["link"]
num_periods = self.times()["num_periods"]
links = self.element_name()["links"]
category = [
"flow",
"velocity",
"headloss",
"quality",
"status",
"setting",
"reaction",
"friction",
]
ds = []
for i in range(1, size + 1):
d = {"link": links[i - 1], "result": []}
for j in range(num_periods):
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(
self._lib.ENR_getLinkResult(
self._handle, j, i, ctypes.byref(values), ctypes.byref(length)
)
)
assert length.value == len(category)
attributes = {}
for k in range(length.value):
if category[k] == "status":
if values[k] == 2.0:
attributes[category[k]] = "CLOSED"
else:
attributes[category[k]] = "OPEN"
continue
attributes[category[k]] = values[k]
d["result"].append(attributes)
self._lib.ENR_free(ctypes.byref(values))
ds.append(d)
return ds
def dump(self) -> dict[str, Any]:
data = {}
data |= {"version": self.version()}
data |= {"net_size": self.net_size()}
data |= {"units": self.units()}
data |= {"times": self.times()}
data |= {"element_name": self.element_name()}
data |= {"energy_usage": self.energy_usage()}
data |= {"reactions": self.reactions()}
data |= {"node_results": self.node_results()}
data |= {"link_results": self.link_results()}
return data
def _dump_output(path: str) -> dict[str, Any]:
opt = Output(path)
data = opt.dump()
with open(path + ".json", "w") as f:
json.dump(data, f)
return data
def dump_output(path: str) -> str:
data = _dump_output(path)
return json.dumps(data)
def dump_report(path: str) -> str:
return open(path, "r").read()
def dump_output_binary(path: str) -> str:
with open(path, "rb") as f:
data = f.read()
bast64_data = base64.b64encode(data)
return str(bast64_data, "utf-8")
# DingZQ, 2025-02-04, 返回dict[str, Any]
def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]:
if not project.have_project(name):
raise Exception(f"Not found project [{name}]")
dir = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp")
inp_out.dump_inp(name, db_inp, "2")
input = name + ".db"
if platform.system() == "Windows":
exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe")
else:
exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), input + ".opt")
command = f"{exe} {inp} {rpt} {opt}"
if platform.system() != "Windows":
if not os.access(exe, os.X_OK):
os.chmod(exe, 0o755)
data = {}
# 设置环境变量以包含库文件路径
env = os.environ.copy()
if platform.system() == "Linux":
lib_dir = os.path.dirname(exe)
env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}"
# 使用 subprocess 替代 os.system 以传递 env
process = subprocess.run(command, shell=True, env=env)
result = process.returncode
if result != 0:
data["simulation_result"] = "failed"
else:
data["simulation_result"] = "successful"
if readable_output:
data["output"] = _dump_output(opt)
else:
data["output"] = dump_output_binary(opt)
data["report"] = dump_report(rpt)
return data
# original code
def run_project(name: str, readable_output: bool = False) -> str:
if not project.have_project(name):
raise Exception(f"Not found project [{name}]")
dir = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp")
inp_out.dump_inp(name, db_inp, "2")
input = name + ".db"
if platform.system() == "Windows":
exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe")
else:
exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), input + ".opt")
command = f"{exe} {inp} {rpt} {opt}"
logging.info(f"Run simulation at {datetime.now()}")
logging.info(command)
if platform.system() != "Windows":
if not os.access(exe, os.X_OK):
os.chmod(exe, 0o755)
data = {}
# 设置环境变量以包含库文件路径
env = os.environ.copy()
if platform.system() == "Linux":
lib_dir = os.path.dirname(exe)
env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}"
# DingZQ, 2025-06-02, 使用subprocess替代os.system
process = subprocess.run(command, shell=True, env=env)
result = process.returncode
# logging.info(f"Simulation result: {result}")
if result != 0:
data["simulation_result"] = "failed"
logging.error("simulation failed")
else:
data["simulation_result"] = "successful"
logging.info("simulation successful")
if readable_output:
data |= _dump_output(opt)
else:
data["output"] = dump_output_binary(opt)
data["report"] = dump_report(rpt)
# logging.info(f"Report: {data['report']}")
return json.dumps(data)
def run_inp(name: str) -> str:
dir = os.path.abspath(os.getcwd())
if platform.system() == "Windows":
exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe")
else:
exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "inp"), name + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), name + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), name + ".opt")
command = f"{exe} {inp} {rpt} {opt}"
if platform.system() != "Windows":
if not os.access(exe, os.X_OK):
os.chmod(exe, 0o755)
data = {}
# 设置环境变量以包含库文件路径
env = os.environ.copy()
if platform.system() == "Linux":
lib_dir = os.path.dirname(exe)
env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}"
process = subprocess.run(command, shell=True, env=env)
result = process.returncode
if result != 0:
data["simulation_result"] = "failed"
else:
data["simulation_result"] = "successful"
# data |= _dump_output(opt)
data["output"] = dump_output_binary(opt)
data["report"] = dump_report(rpt)
return json.dumps(data)
Binary file not shown.
Binary file not shown.
BIN
View File
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.