390 lines
13 KiB
Python
390 lines
13 KiB
Python
import ctypes
|
||
import platform
|
||
import os
|
||
import sys
|
||
import json
|
||
import base64
|
||
import subprocess
|
||
import logging
|
||
from typing import Any
|
||
sys.path.append("..")
|
||
from api import project
|
||
from api import inp_out
|
||
|
||
|
||
def _verify_platform():
|
||
_platform = platform.system()
|
||
if _platform != "Windows":
|
||
raise Exception(f'Platform {_platform} unsupported (not yet)')
|
||
|
||
|
||
if __name__ == '__main__':
|
||
_verify_platform()
|
||
|
||
|
||
class Output:
|
||
def __init__(self, path: str) -> None:
|
||
self._path = path
|
||
|
||
self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet', 'epanet-output.dll'))
|
||
|
||
self._handle = ctypes.c_void_p()
|
||
self._check(self._lib.ENR_init(ctypes.byref(self._handle)))
|
||
|
||
self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode())))
|
||
|
||
|
||
def __del__(self):
|
||
# throw exception in destructor ? :)
|
||
self._check(self._lib.ENR_close(ctypes.byref(self._handle)))
|
||
|
||
|
||
def _check(self, result):
|
||
if result != 0 and result != 10:
|
||
msg = ctypes.c_char_p()
|
||
code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg))
|
||
assert code == result
|
||
|
||
error = f'Failed to read project [{self._path}] output, message [{msg.value.decode()}]'
|
||
|
||
self._lib.ENR_free(ctypes.byref(msg))
|
||
|
||
raise Exception(error)
|
||
|
||
|
||
def version(self) -> int:
|
||
v = ctypes.c_int()
|
||
self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v)))
|
||
return v.value
|
||
|
||
|
||
def net_size(self) -> dict[str, int]:
|
||
element_count = ctypes.POINTER(ctypes.c_int)()
|
||
length = ctypes.c_int()
|
||
self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length)))
|
||
assert length.value == 5
|
||
category = ['node', 'tank', 'link', 'pump', 'valve']
|
||
sizes = {}
|
||
for i in range(length.value):
|
||
sizes[category[i]] = element_count[i]
|
||
self._lib.ENR_free(ctypes.byref(element_count))
|
||
return sizes
|
||
|
||
|
||
def units(self) -> dict[str, str]:
|
||
f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD']
|
||
p_us = ['PSI', 'MTR', 'KPA']
|
||
q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT']
|
||
f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3)
|
||
f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int()
|
||
self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u)))
|
||
self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u)))
|
||
self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u)))
|
||
return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] }
|
||
|
||
|
||
def times(self) -> dict[str, int]:
|
||
ts = []
|
||
for i in range(1, 5):
|
||
t = ctypes.c_int(1)
|
||
self._check(self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t)))
|
||
ts.append(t.value)
|
||
d = {}
|
||
category = ['report_start', 'report_step', 'sim_duration', 'num_periods']
|
||
for i in range(4):
|
||
d[category[i]] = ts[i]
|
||
return d
|
||
|
||
|
||
def element_name(self) -> dict[str, list[str]]:
|
||
sizes = self.net_size()
|
||
|
||
node_type = ctypes.c_int(1)
|
||
nodes = []
|
||
for i in range(1, sizes['node'] + 1):
|
||
name = ctypes.c_char_p()
|
||
name_len = ctypes.c_int()
|
||
self._check(self._lib.ENR_getElementName(self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
|
||
nodes.append(name.value.decode())
|
||
self._lib.ENR_free(ctypes.byref(name))
|
||
|
||
link_type = ctypes.c_int(2)
|
||
links = []
|
||
for i in range(1, sizes['link'] + 1):
|
||
name = ctypes.c_char_p()
|
||
name_len = ctypes.c_int()
|
||
self._check(self._lib.ENR_getElementName(self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
|
||
links.append(name.value.decode())
|
||
self._lib.ENR_free(ctypes.byref(name))
|
||
|
||
return { 'nodes' : nodes, 'links': links }
|
||
|
||
|
||
def energy_usage(self) -> list[dict[str, Any]]:
|
||
size = self.net_size()['pump']
|
||
usages = []
|
||
category = ['utilization', 'avg.efficiency', 'avg.kW/flow', 'avg.kwatts', 'max.kwatts', 'cost/day' ]
|
||
links = self.element_name()['links']
|
||
for i in range(1, size + 1):
|
||
index = ctypes.c_int()
|
||
values = ctypes.POINTER(ctypes.c_float)()
|
||
length = ctypes.c_int()
|
||
self._check(self._lib.ENR_getEnergyUsage(self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length)))
|
||
assert length.value == 6
|
||
d = { 'pump' : links[index.value - 1] }
|
||
for j in range(length.value):
|
||
d |= { category[j] : values[j] }
|
||
usages.append(d)
|
||
self._lib.ENR_free(ctypes.byref(values))
|
||
return usages
|
||
|
||
|
||
def reactions(self) -> dict[str, float]:
|
||
values = ctypes.POINTER(ctypes.c_float)()
|
||
length = ctypes.c_int()
|
||
self._check(self._lib.ENR_getNetReacts(self._handle, ctypes.byref(values), ctypes.byref(length)))
|
||
assert length.value == 4
|
||
category = ['bulk', 'wall', 'tank', 'source']
|
||
d = {}
|
||
for i in range(4):
|
||
d[category[i]] = values[i]
|
||
self._lib.ENR_free(ctypes.byref(values))
|
||
return d
|
||
|
||
|
||
def node_results(self) -> list[dict[str, Any]]:
|
||
size = self.net_size()['node']
|
||
num_periods = self.times()['num_periods']
|
||
nodes = self.element_name()['nodes']
|
||
category = ['demand', 'head', 'pressure', 'quality']
|
||
ds = []
|
||
for i in range(1, size + 1):
|
||
d = { 'node' : nodes[i - 1], 'result' : [] }
|
||
for j in range(num_periods):
|
||
values = ctypes.POINTER(ctypes.c_float)()
|
||
length = ctypes.c_int()
|
||
self._check(self._lib.ENR_getNodeResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length)))
|
||
assert length.value == len(category)
|
||
attributes = {}
|
||
for k in range(length.value):
|
||
attributes[category[k]] = values[k]
|
||
d['result'].append(attributes)
|
||
self._lib.ENR_free(ctypes.byref(values))
|
||
ds.append(d)
|
||
return ds
|
||
|
||
|
||
def link_results(self) -> list[dict[str, Any]]:
|
||
size = self.net_size()['link']
|
||
num_periods = self.times()['num_periods']
|
||
links = self.element_name()['links']
|
||
category = ['flow', 'velocity', 'headloss', 'quality', 'status', 'setting', 'reaction', 'friction']
|
||
ds = []
|
||
|
||
for i in range(1, size + 1):
|
||
d = { 'link' : links[i - 1], 'result' : [] }
|
||
for j in range(num_periods):
|
||
values = ctypes.POINTER(ctypes.c_float)()
|
||
length = ctypes.c_int()
|
||
self._check(self._lib.ENR_getLinkResult(self._handle, j, i, ctypes.byref(values), ctypes.byref(length)))
|
||
assert length.value == len(category)
|
||
attributes = {}
|
||
for k in range(length.value):
|
||
if category[k] == 'status':
|
||
if values[k] == 2.0:
|
||
attributes[category[k]] = 'CLOSED'
|
||
else:
|
||
attributes[category[k]] = 'OPEN'
|
||
continue
|
||
attributes[category[k]] = values[k]
|
||
d['result'].append(attributes)
|
||
self._lib.ENR_free(ctypes.byref(values))
|
||
ds.append(d)
|
||
return ds
|
||
|
||
|
||
def dump(self) -> dict[str, Any]:
|
||
data = {}
|
||
data |= { 'version' : self.version() }
|
||
data |= { 'net_size' : self.net_size() }
|
||
data |= { 'units' : self.units() }
|
||
data |= { 'times' : self.times() }
|
||
data |= { 'element_name' : self.element_name() }
|
||
data |= { 'energy_usage' : self.energy_usage() }
|
||
data |= { 'reactions' : self.reactions() }
|
||
data |= { 'node_results' : self.node_results() }
|
||
data |= { 'link_results' : self.link_results() }
|
||
return data
|
||
|
||
|
||
def _dump_output(path: str) -> dict[str, Any]:
|
||
opt = Output(path)
|
||
data = opt.dump()
|
||
with open(path + '.json', 'w') as f:
|
||
json.dump(data, f)
|
||
return data
|
||
|
||
|
||
def dump_output(path: str) -> str:
|
||
data = _dump_output(path)
|
||
return json.dumps(data)
|
||
|
||
|
||
def dump_report(path: str) -> str:
|
||
return open(path, 'r').read()
|
||
|
||
|
||
def dump_output_binary(path: str) -> str:
|
||
with open(path, 'rb') as f:
|
||
data = f.read()
|
||
bast64_data = base64.b64encode(data)
|
||
return str(bast64_data, 'utf-8')
|
||
|
||
#DingZQ, 2025-02-04, 返回dict[str, Any]
|
||
def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]:
|
||
if not project.have_project(name):
|
||
raise Exception(f'Not found project [{name}]')
|
||
|
||
dir = os.path.abspath(os.getcwd())
|
||
|
||
db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp')
|
||
inp_out.dump_inp(name, db_inp, '2')
|
||
|
||
input = name + '.db'
|
||
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
|
||
inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp')
|
||
rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt')
|
||
opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt')
|
||
command = f'{exe} {inp} {rpt} {opt}'
|
||
|
||
data = {}
|
||
|
||
result = os.system(command)
|
||
if result != 0:
|
||
data['simulation_result'] = 'failed'
|
||
else:
|
||
data['simulation_result'] = 'successful'
|
||
if readable_output:
|
||
data ['output'] = _dump_output(opt)
|
||
else:
|
||
data['output'] = dump_output_binary(opt)
|
||
|
||
data['report'] = dump_report(rpt)
|
||
|
||
return data
|
||
|
||
# original code
|
||
def run_project(name: str, readable_output: bool = False) -> str:
|
||
if not project.have_project(name):
|
||
raise Exception(f'Not found project [{name}]')
|
||
|
||
dir = os.path.abspath(os.getcwd())
|
||
|
||
db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp')
|
||
inp_out.dump_inp(name, db_inp, '2')
|
||
|
||
input = name + '.db'
|
||
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
|
||
inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp')
|
||
rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt')
|
||
opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt')
|
||
command = f'{exe} {inp} {rpt} {opt}'
|
||
print(command)
|
||
|
||
data = {}
|
||
|
||
# DingZQ, 2025-06-02, 使用subprocess.Popen捕获输出到全局日志, 原来的代码是这么写的
|
||
# result = os.system(command)
|
||
|
||
"""
|
||
执行带参数的外部exe并捕获输出到全局日志
|
||
|
||
:param exe_path: exe文件路径
|
||
:param arguments: 命令行参数列表
|
||
:param timeout: 执行超时时间(秒)
|
||
:return: 进程退出状态码
|
||
"""
|
||
result = -1
|
||
try:
|
||
# 启动子进程
|
||
with subprocess.Popen(
|
||
command,
|
||
stdout=subprocess.STDOUT,
|
||
stderr=subprocess.STDOUT, # 合并错误输出和标准输出
|
||
text=True, # 文本模式(Python 3.7+)
|
||
bufsize=1, # 行缓冲
|
||
encoding='utf-8', # 编码设置
|
||
errors='replace' # 编码错误处理方式
|
||
) as process:
|
||
|
||
# 实时读取输出流
|
||
while True:
|
||
output_line = process.stdout.readline()
|
||
error_line = process.stderr.readline()
|
||
if output_line == '' and process.poll() is not None:
|
||
break
|
||
|
||
if error_line == '' and process.poll() is not None:
|
||
break
|
||
|
||
if output_line:
|
||
stripped_line = output_line.rstrip()
|
||
logging.info(f"EXE_OUTPUT: {stripped_line}")
|
||
|
||
if error_line:
|
||
stripped_line = error_line.rstrip()
|
||
logging.error(f"EXE_ERROR: {stripped_line}")
|
||
|
||
# 获取退出状态码
|
||
returncode = process.poll()
|
||
|
||
# 记录结束状态
|
||
logging.info("-" * 60)
|
||
if returncode == 0:
|
||
logging.info(f"成功结束! 退出码: {returncode}")
|
||
else:
|
||
logging.error(f"异常结束! 退出码: {returncode}")
|
||
|
||
result = returncode
|
||
|
||
except Exception as e:
|
||
logging.exception(f"执行过程中出错: {str(e)}")
|
||
result = -1 # 自定义错误码
|
||
|
||
if result != 0:
|
||
data['simulation_result'] = 'failed'
|
||
else:
|
||
data['simulation_result'] = 'successful'
|
||
if readable_output:
|
||
data |= _dump_output(opt)
|
||
else:
|
||
data['output'] = dump_output_binary(opt)
|
||
|
||
data['report'] = dump_report(rpt)
|
||
|
||
return json.dumps(data)
|
||
|
||
|
||
def run_inp(name: str) -> str:
|
||
dir = os.path.abspath(os.getcwd())
|
||
|
||
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
|
||
inp = os.path.join(os.path.join(dir, 'inp'), name + '.inp')
|
||
rpt = os.path.join(os.path.join(dir, 'temp'), name + '.rpt')
|
||
opt = os.path.join(os.path.join(dir, 'temp'), name + '.opt')
|
||
command = f'{exe} {inp} {rpt} {opt}'
|
||
|
||
data = {}
|
||
|
||
result = os.system(command)
|
||
if result != 0:
|
||
data['simulation_result'] = 'failed'
|
||
else:
|
||
data['simulation_result'] = 'successful'
|
||
# data |= _dump_output(opt)
|
||
data['output'] = dump_output_binary(opt)
|
||
|
||
data['report'] = dump_report(rpt)
|
||
|
||
return json.dumps(data)
|