Files
TJWaterServer/epanet/epanet.py
2025-06-02 18:57:41 +08:00

390 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import ctypes
import platform
import os
import sys
import json
import base64
import subprocess
import logging
from typing import Any
sys.path.append("..")
from api import project
from api import inp_out
def _verify_platform():
_platform = platform.system()
if _platform != "Windows":
raise Exception(f'Platform {_platform} unsupported (not yet)')
if __name__ == '__main__':
_verify_platform()
class Output:
def __init__(self, path: str) -> None:
self._path = path
self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet', 'epanet-output.dll'))
self._handle = ctypes.c_void_p()
self._check(self._lib.ENR_init(ctypes.byref(self._handle)))
self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode())))
def __del__(self):
# throw exception in destructor ? :)
self._check(self._lib.ENR_close(ctypes.byref(self._handle)))
def _check(self, result):
if result != 0 and result != 10:
msg = ctypes.c_char_p()
code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg))
assert code == result
error = f'Failed to read project [{self._path}] output, message [{msg.value.decode()}]'
self._lib.ENR_free(ctypes.byref(msg))
raise Exception(error)
def version(self) -> int:
v = ctypes.c_int()
self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v)))
return v.value
def net_size(self) -> dict[str, int]:
element_count = ctypes.POINTER(ctypes.c_int)()
length = ctypes.c_int()
self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length)))
assert length.value == 5
category = ['node', 'tank', 'link', 'pump', 'valve']
sizes = {}
for i in range(length.value):
sizes[category[i]] = element_count[i]
self._lib.ENR_free(ctypes.byref(element_count))
return sizes
def units(self) -> dict[str, str]:
f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD']
p_us = ['PSI', 'MTR', 'KPA']
q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT']
f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3)
f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int()
self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u)))
self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u)))
self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u)))
return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] }
def times(self) -> dict[str, int]:
ts = []
for i in range(1, 5):
t = ctypes.c_int(1)
self._check(self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t)))
ts.append(t.value)
d = {}
category = ['report_start', 'report_step', 'sim_duration', 'num_periods']
for i in range(4):
d[category[i]] = ts[i]
return d
def element_name(self) -> dict[str, list[str]]:
sizes = self.net_size()
node_type = ctypes.c_int(1)
nodes = []
for i in range(1, sizes['node'] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
nodes.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
link_type = ctypes.c_int(2)
links = []
for i in range(1, sizes['link'] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
links.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
return { 'nodes' : nodes, 'links': links }
def energy_usage(self) -> list[dict[str, Any]]:
size = self.net_size()['pump']
usages = []
category = ['utilization', 'avg.efficiency', 'avg.kW/flow', 'avg.kwatts', 'max.kwatts', 'cost/day' ]
links = self.element_name()['links']
for i in range(1, size + 1):
index = ctypes.c_int()
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getEnergyUsage(self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length)))
assert length.value == 6
d = { 'pump' : links[index.value - 1] }
for j in range(length.value):
d |= { category[j] : values[j] }
usages.append(d)
self._lib.ENR_free(ctypes.byref(values))
return usages
def reactions(self) -> dict[str, float]:
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getNetReacts(self._handle, ctypes.byref(values), ctypes.byref(length)))
assert length.value == 4
category = ['bulk', 'wall', 'tank', 'source']
d = {}
for i in range(4):
d[category[i]] = values[i]
self._lib.ENR_free(ctypes.byref(values))
return d
def node_results(self) -> list[dict[str, Any]]:
size = self.net_size()['node']
num_periods = self.times()['num_periods']
nodes = self.element_name()['nodes']
category = ['demand', 'head', 'pressure', 'quality']
ds = []
for i in range(1, size + 1):
d = { 'node' : nodes[i - 1], 'result' : [] }
for j in range(num_periods):
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getNodeResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length)))
assert length.value == len(category)
attributes = {}
for k in range(length.value):
attributes[category[k]] = values[k]
d['result'].append(attributes)
self._lib.ENR_free(ctypes.byref(values))
ds.append(d)
return ds
def link_results(self) -> list[dict[str, Any]]:
size = self.net_size()['link']
num_periods = self.times()['num_periods']
links = self.element_name()['links']
category = ['flow', 'velocity', 'headloss', 'quality', 'status', 'setting', 'reaction', 'friction']
ds = []
for i in range(1, size + 1):
d = { 'link' : links[i - 1], 'result' : [] }
for j in range(num_periods):
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getLinkResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length)))
assert length.value == len(category)
attributes = {}
for k in range(length.value):
if category[k] == 'status':
if values[k] == 2.0:
attributes[category[k]] = 'CLOSED'
else:
attributes[category[k]] = 'OPEN'
continue
attributes[category[k]] = values[k]
d['result'].append(attributes)
self._lib.ENR_free(ctypes.byref(values))
ds.append(d)
return ds
def dump(self) -> dict[str, Any]:
data = {}
data |= { 'version' : self.version() }
data |= { 'net_size' : self.net_size() }
data |= { 'units' : self.units() }
data |= { 'times' : self.times() }
data |= { 'element_name' : self.element_name() }
data |= { 'energy_usage' : self.energy_usage() }
data |= { 'reactions' : self.reactions() }
data |= { 'node_results' : self.node_results() }
data |= { 'link_results' : self.link_results() }
return data
def _dump_output(path: str) -> dict[str, Any]:
opt = Output(path)
data = opt.dump()
with open(path + '.json', 'w') as f:
json.dump(data, f)
return data
def dump_output(path: str) -> str:
data = _dump_output(path)
return json.dumps(data)
def dump_report(path: str) -> str:
return open(path, 'r').read()
def dump_output_binary(path: str) -> str:
with open(path, 'rb') as f:
data = f.read()
bast64_data = base64.b64encode(data)
return str(bast64_data, 'utf-8')
#DingZQ, 2025-02-04, 返回dict[str, Any]
def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]:
if not project.have_project(name):
raise Exception(f'Not found project [{name}]')
dir = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp')
inp_out.dump_inp(name, db_inp, '2')
input = name + '.db'
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt')
command = f'{exe} {inp} {rpt} {opt}'
data = {}
result = os.system(command)
if result != 0:
data['simulation_result'] = 'failed'
else:
data['simulation_result'] = 'successful'
if readable_output:
data ['output'] = _dump_output(opt)
else:
data['output'] = dump_output_binary(opt)
data['report'] = dump_report(rpt)
return data
# original code
def run_project(name: str, readable_output: bool = False) -> str:
if not project.have_project(name):
raise Exception(f'Not found project [{name}]')
dir = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp')
inp_out.dump_inp(name, db_inp, '2')
input = name + '.db'
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt')
command = f'{exe} {inp} {rpt} {opt}'
print(command)
data = {}
# DingZQ, 2025-06-02, 使用subprocess.Popen捕获输出到全局日志, 原来的代码是这么写的
# result = os.system(command)
"""
执行带参数的外部exe并捕获输出到全局日志
:param exe_path: exe文件路径
:param arguments: 命令行参数列表
:param timeout: 执行超时时间(秒)
:return: 进程退出状态码
"""
result = -1
try:
# 启动子进程
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # 合并错误输出和标准输出
text=True, # 文本模式Python 3.7+
bufsize=1, # 行缓冲
encoding='utf-8', # 编码设置
errors='replace' # 编码错误处理方式
) as process:
# 实时读取输出流
while True:
output_line = process.stdout.readline()
error_line = process.stderr.readline()
if output_line == '' and process.poll() is not None:
break
if error_line == '' and process.poll() is not None:
break
if output_line:
stripped_line = output_line.rstrip()
logging.info(f"EXE_OUTPUT: {stripped_line}")
if error_line:
stripped_line = error_line.rstrip()
logging.error(f"EXE_ERROR: {stripped_line}")
# 获取退出状态码
returncode = process.poll()
# 记录结束状态
logging.info("-" * 60)
if returncode == 0:
logging.info(f"成功结束! 退出码: {returncode}")
else:
logging.error(f"异常结束! 退出码: {returncode}")
result = returncode
except Exception as e:
logging.exception(f"执行过程中出错: {str(e)}")
result = -1 # 自定义错误码
if result != 0:
data['simulation_result'] = 'failed'
else:
data['simulation_result'] = 'successful'
if readable_output:
data |= _dump_output(opt)
else:
data['output'] = dump_output_binary(opt)
data['report'] = dump_report(rpt)
return json.dumps(data)
def run_inp(name: str) -> str:
dir = os.path.abspath(os.getcwd())
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'inp'), name + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), name + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), name + '.opt')
command = f'{exe} {inp} {rpt} {opt}'
data = {}
result = os.system(command)
if result != 0:
data['simulation_result'] = 'failed'
else:
data['simulation_result'] = 'successful'
# data |= _dump_output(opt)
data['output'] = dump_output_binary(opt)
data['report'] = dump_report(rpt)
return json.dumps(data)