新增 epanet Linux版本;为 epanet.py 新增 Linux 的环境运行代码

This commit is contained in:
JIANG
2025-12-30 18:21:22 +08:00
parent e8a883bcb7
commit 79c2bf811e
8 changed files with 3624 additions and 113 deletions

View File

@@ -8,18 +8,19 @@ from datetime import datetime
import subprocess
import logging
from typing import Any
sys.path.append("..")
sys.path.append("..")
from api import project
from api import inp_out
def _verify_platform():
_platform = platform.system()
if _platform != "Windows":
raise Exception(f'Platform {_platform} unsupported (not yet)')
if _platform not in ["Windows", "Linux"]:
raise Exception(f"Platform {_platform} unsupported (not yet)")
if __name__ == '__main__':
if __name__ == "__main__":
_verify_platform()
@@ -27,201 +28,260 @@ class Output:
def __init__(self, path: str) -> None:
self._path = path
self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet', 'epanet-output.dll'))
if platform.system() == "Windows":
self._lib = ctypes.CDLL(
os.path.join(os.getcwd(), "epanet", "epanet-output.dll")
)
else:
self._lib = ctypes.CDLL(
os.path.join(os.getcwd(), "epanet", "linux", "libepanet-output.so")
)
self._handle = ctypes.c_void_p()
self._check(self._lib.ENR_init(ctypes.byref(self._handle)))
self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode())))
self._check(
self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode()))
)
def __del__(self):
# throw exception in destructor ? :)
self._check(self._lib.ENR_close(ctypes.byref(self._handle)))
def _check(self, result):
if result != 0 and result != 10:
msg = ctypes.c_char_p()
code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg))
assert code == result
error = f'Failed to read project [{self._path}] output, message [{msg.value.decode()}]'
error = f"Failed to read project [{self._path}] output, message [{msg.value.decode()}]"
self._lib.ENR_free(ctypes.byref(msg))
raise Exception(error)
def version(self) -> int:
v = ctypes.c_int()
self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v)))
return v.value
def net_size(self) -> dict[str, int]:
element_count = ctypes.POINTER(ctypes.c_int)()
length = ctypes.c_int()
self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length)))
self._check(
self._lib.ENR_getNetSize(
self._handle, ctypes.byref(element_count), ctypes.byref(length)
)
)
assert length.value == 5
category = ['node', 'tank', 'link', 'pump', 'valve']
category = ["node", "tank", "link", "pump", "valve"]
sizes = {}
for i in range(length.value):
sizes[category[i]] = element_count[i]
self._lib.ENR_free(ctypes.byref(element_count))
return sizes
def units(self) -> dict[str, str]:
f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD']
p_us = ['PSI', 'MTR', 'KPA']
q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT']
f_us = ["CFS", "GPM", "MGD", "IMGD", "AFD", "LPS", "LPM", "MLD", "CMH", "CMD"]
p_us = ["PSI", "MTR", "KPA"]
q_us = ["NONE", "MGL", "UGL", "HOURS", "PRCNT"]
f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3)
f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int()
self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u)))
self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u)))
self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u)))
return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] }
return {
"flow": f_us[f_u.value],
"pressure": p_us[p_u.value],
"quality": q_us[q_u.value],
}
def times(self) -> dict[str, int]:
ts = []
for i in range(1, 5):
t = ctypes.c_int(1)
self._check(self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t)))
self._check(
self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t))
)
ts.append(t.value)
d = {}
category = ['report_start', 'report_step', 'sim_duration', 'num_periods']
category = ["report_start", "report_step", "sim_duration", "num_periods"]
for i in range(4):
d[category[i]] = ts[i]
return d
def element_name(self) -> dict[str, list[str]]:
sizes = self.net_size()
node_type = ctypes.c_int(1)
nodes = []
for i in range(1, sizes['node'] + 1):
for i in range(1, sizes["node"] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
self._check(
self._lib.ENR_getElementName(
self._handle,
node_type,
ctypes.c_int(i),
ctypes.byref(name),
ctypes.byref(name_len),
)
)
nodes.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
link_type = ctypes.c_int(2)
links = []
for i in range(1, sizes['link'] + 1):
for i in range(1, sizes["link"] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
self._check(
self._lib.ENR_getElementName(
self._handle,
link_type,
ctypes.c_int(i),
ctypes.byref(name),
ctypes.byref(name_len),
)
)
links.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
return { 'nodes' : nodes, 'links': links }
return {"nodes": nodes, "links": links}
def energy_usage(self) -> list[dict[str, Any]]:
size = self.net_size()['pump']
size = self.net_size()["pump"]
usages = []
category = ['utilization', 'avg.efficiency', 'avg.kW/flow', 'avg.kwatts', 'max.kwatts', 'cost/day' ]
links = self.element_name()['links']
category = [
"utilization",
"avg.efficiency",
"avg.kW/flow",
"avg.kwatts",
"max.kwatts",
"cost/day",
]
links = self.element_name()["links"]
for i in range(1, size + 1):
index = ctypes.c_int()
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getEnergyUsage(self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length)))
self._check(
self._lib.ENR_getEnergyUsage(
self._handle,
ctypes.c_int(i),
ctypes.byref(index),
ctypes.byref(values),
ctypes.byref(length),
)
)
assert length.value == 6
d = { 'pump' : links[index.value - 1] }
d = {"pump": links[index.value - 1]}
for j in range(length.value):
d |= { category[j] : values[j] }
d |= {category[j]: values[j]}
usages.append(d)
self._lib.ENR_free(ctypes.byref(values))
return usages
def reactions(self) -> dict[str, float]:
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getNetReacts(self._handle, ctypes.byref(values), ctypes.byref(length)))
self._check(
self._lib.ENR_getNetReacts(
self._handle, ctypes.byref(values), ctypes.byref(length)
)
)
assert length.value == 4
category = ['bulk', 'wall', 'tank', 'source']
category = ["bulk", "wall", "tank", "source"]
d = {}
for i in range(4):
d[category[i]] = values[i]
self._lib.ENR_free(ctypes.byref(values))
return d
def node_results(self) -> list[dict[str, Any]]:
size = self.net_size()['node']
num_periods = self.times()['num_periods']
nodes = self.element_name()['nodes']
category = ['demand', 'head', 'pressure', 'quality']
size = self.net_size()["node"]
num_periods = self.times()["num_periods"]
nodes = self.element_name()["nodes"]
category = ["demand", "head", "pressure", "quality"]
ds = []
for i in range(1, size + 1):
d = { 'node' : nodes[i - 1], 'result' : [] }
d = {"node": nodes[i - 1], "result": []}
for j in range(num_periods):
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getNodeResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length)))
self._check(
self._lib.ENR_getNodeResult(
self._handle, j, i, ctypes.byref(values), ctypes.byref(length)
)
)
assert length.value == len(category)
attributes = {}
for k in range(length.value):
attributes[category[k]] = values[k]
d['result'].append(attributes)
d["result"].append(attributes)
self._lib.ENR_free(ctypes.byref(values))
ds.append(d)
return ds
def link_results(self) -> list[dict[str, Any]]:
size = self.net_size()['link']
num_periods = self.times()['num_periods']
links = self.element_name()['links']
category = ['flow', 'velocity', 'headloss', 'quality', 'status', 'setting', 'reaction', 'friction']
size = self.net_size()["link"]
num_periods = self.times()["num_periods"]
links = self.element_name()["links"]
category = [
"flow",
"velocity",
"headloss",
"quality",
"status",
"setting",
"reaction",
"friction",
]
ds = []
for i in range(1, size + 1):
d = { 'link' : links[i - 1], 'result' : [] }
d = {"link": links[i - 1], "result": []}
for j in range(num_periods):
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getLinkResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length)))
self._check(
self._lib.ENR_getLinkResult(
self._handle, j, i, ctypes.byref(values), ctypes.byref(length)
)
)
assert length.value == len(category)
attributes = {}
for k in range(length.value):
if category[k] == 'status':
if category[k] == "status":
if values[k] == 2.0:
attributes[category[k]] = 'CLOSED'
attributes[category[k]] = "CLOSED"
else:
attributes[category[k]] = 'OPEN'
attributes[category[k]] = "OPEN"
continue
attributes[category[k]] = values[k]
d['result'].append(attributes)
d["result"].append(attributes)
self._lib.ENR_free(ctypes.byref(values))
ds.append(d)
return ds
def dump(self) -> dict[str, Any]:
data = {}
data |= { 'version' : self.version() }
data |= { 'net_size' : self.net_size() }
data |= { 'units' : self.units() }
data |= { 'times' : self.times() }
data |= { 'element_name' : self.element_name() }
data |= { 'energy_usage' : self.energy_usage() }
data |= { 'reactions' : self.reactions() }
data |= { 'node_results' : self.node_results() }
data |= { 'link_results' : self.link_results() }
data |= {"version": self.version()}
data |= {"net_size": self.net_size()}
data |= {"units": self.units()}
data |= {"times": self.times()}
data |= {"element_name": self.element_name()}
data |= {"energy_usage": self.energy_usage()}
data |= {"reactions": self.reactions()}
data |= {"node_results": self.node_results()}
data |= {"link_results": self.link_results()}
return data
def _dump_output(path: str) -> dict[str, Any]:
opt = Output(path)
data = opt.dump()
with open(path + '.json', 'w') as f:
with open(path + ".json", "w") as f:
json.dump(data, f)
return data
@@ -232,90 +292,106 @@ def dump_output(path: str) -> str:
def dump_report(path: str) -> str:
return open(path, 'r').read()
return open(path, "r").read()
def dump_output_binary(path: str) -> str:
with open(path, 'rb') as f:
with open(path, "rb") as f:
data = f.read()
bast64_data = base64.b64encode(data)
return str(bast64_data, 'utf-8')
return str(bast64_data, "utf-8")
#DingZQ, 2025-02-04, 返回dict[str, Any]
# DingZQ, 2025-02-04, 返回dict[str, Any]
def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]:
if not project.have_project(name):
raise Exception(f'Not found project [{name}]')
raise Exception(f"Not found project [{name}]")
dir = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp')
inp_out.dump_inp(name, db_inp, '2')
db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp")
inp_out.dump_inp(name, db_inp, "2")
input = name + '.db'
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt')
command = f'{exe} {inp} {rpt} {opt}'
input = name + ".db"
if platform.system() == "Windows":
exe = os.path.join(os.path.join(dir, "epanet"), "runepanet.exe")
else:
exe = os.path.join(os.path.join(dir, "epanet"), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), input + ".opt")
command = f"{exe} {inp} {rpt} {opt}"
if platform.system() != "Windows":
if not os.access(exe, os.X_OK):
os.chmod(exe, 0o755)
data = {}
result = os.system(command)
if result != 0:
data['simulation_result'] = 'failed'
data["simulation_result"] = "failed"
else:
data['simulation_result'] = 'successful'
data["simulation_result"] = "successful"
if readable_output:
data ['output'] = _dump_output(opt)
data["output"] = _dump_output(opt)
else:
data['output'] = dump_output_binary(opt)
data["output"] = dump_output_binary(opt)
data['report'] = dump_report(rpt)
data["report"] = dump_report(rpt)
return data
# original code
def run_project(name: str, readable_output: bool = False) -> str:
if not project.have_project(name):
raise Exception(f'Not found project [{name}]')
raise Exception(f"Not found project [{name}]")
dir = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp')
inp_out.dump_inp(name, db_inp, '2')
db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp")
inp_out.dump_inp(name, db_inp, "2")
input = name + '.db'
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt')
input = name + ".db"
if platform.system() == "Windows":
exe = os.path.join(os.path.join(dir, "epanet"), "runepanet.exe")
else:
exe = os.path.join(os.path.join(dir, "epanet"), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), input + ".opt")
command = f'{exe} {inp} {rpt} {opt}'
command = f"{exe} {inp} {rpt} {opt}"
logging.info(f"Run simulation at {datetime.now()}")
logging.info(command)
if platform.system() != "Windows":
if not os.access(exe, os.X_OK):
os.chmod(exe, 0o755)
data = {}
# DingZQ, 2025-06-02, 使用subprocess.Popen捕获输出到全局日志, 原来的代码是这么写的
result = os.system(command)
#logging.info(f"Simulation result: {result}")
# logging.info(f"Simulation result: {result}")
if result != 0:
data['simulation_result'] = 'failed'
data["simulation_result"] = "failed"
logging.error('simulation failed')
logging.error("simulation failed")
else:
data['simulation_result'] = 'successful'
logging.info('simulation successful')
data["simulation_result"] = "successful"
logging.info("simulation successful")
if readable_output:
data |= _dump_output(opt)
else:
data['output'] = dump_output_binary(opt)
data["output"] = dump_output_binary(opt)
data['report'] = dump_report(rpt)
#logging.info(f"Report: {data['report']}")
data["report"] = dump_report(rpt)
# logging.info(f"Report: {data['report']}")
return json.dumps(data)
@@ -323,22 +399,29 @@ def run_project(name: str, readable_output: bool = False) -> str:
def run_inp(name: str) -> str:
dir = os.path.abspath(os.getcwd())
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'inp'), name + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), name + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), name + '.opt')
command = f'{exe} {inp} {rpt} {opt}'
if platform.system() == "Windows":
exe = os.path.join(os.path.join(dir, "epanet"), "runepanet.exe")
else:
exe = os.path.join(os.path.join(dir, "epanet"), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "inp"), name + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), name + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), name + ".opt")
command = f"{exe} {inp} {rpt} {opt}"
if platform.system() != "Windows":
if not os.access(exe, os.X_OK):
os.chmod(exe, 0o755)
data = {}
result = os.system(command)
if result != 0:
data['simulation_result'] = 'failed'
data["simulation_result"] = "failed"
else:
data['simulation_result'] = 'successful'
data["simulation_result"] = "successful"
# data |= _dump_output(opt)
data['output'] = dump_output_binary(opt)
data["output"] = dump_output_binary(opt)
data['report'] = dump_report(rpt)
data["report"] = dump_report(rpt)
return json.dumps(data)