Accept Merge Request #102: (api -> master)

Merge Request: Add api for simulation and dump output

Created By: @王琼钰
Accepted By: @王琼钰
URL: https://tjwater.coding.net/p/tjwatercloud/d/TJWaterServer/git/merge/102
This commit is contained in:
王琼钰
2022-11-19 18:46:20 +08:00
4 changed files with 190 additions and 22 deletions

12
.gitignore vendored
View File

@@ -7,9 +7,13 @@ __pycache__/
# pytest # pytest
.pytest_cache/ .pytest_cache/
# epanet # dev
dev_demo.py
# db inp
*.db.inp
# calculation
*.rpt *.rpt
*.opt *.opt
*.opt.json
# dev_demo
dev_demo.py

1
epanet/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .epanet import run_project, run_inp, dump_output

View File

@@ -1,45 +1,196 @@
import ctypes import ctypes
import platform import platform
import os import os
import sys import sys
import json
from typing import Any
sys.path.append("..") sys.path.append("..")
from api import project from api import project
from api import parser from api import parser
_lib_core = None
_lib_output = None
def _load_epanet(): def _verify_platform():
_platform = platform.system() _platform = platform.system()
if _platform != "Windows": if _platform != "Windows":
raise Exception(f'Platform {_platform} unsupported (not yet)') raise Exception(f'Platform {_platform} unsupported (not yet)')
_lib_core = ctypes.CDLL("./epanet2.dll")
#version = ctypes.c_int()
#lib_core.EN_getversion(ctypes.byref(version))
#print(f'Load EPANET {version}')
_lib_output = ctypes.CDLL("./epanet-output.dll")
# readable True => json, False => binary output
def run_project(name: str, readable: bool = True) -> str: class Output:
def __init__(self, path: str) -> None:
self._path = path
self._lib = ctypes.CDLL(os.path.join(os.getcwd(), 'epanet-output.dll'))
self._handle = ctypes.c_void_p()
self._check(self._lib.ENR_init(ctypes.byref(self._handle)))
self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(self._path.encode())))
def _check(self, result):
if result != 0:
msg = ctypes.c_char_p()
code = self._lib.ENR_checkError(self._handle, ctypes.byref(msg))
assert code == result
error = f'Failed to read project [{self._path}] output, message [{msg.value.decode()}]'
self._lib.ENR_free(ctypes.byref(msg))
raise Exception(error)
def version(self) -> int:
v = ctypes.c_int()
self._check(self._lib.ENR_getVersion(self._handle, ctypes.byref(v)))
return v.value
def net_size(self) -> dict[str, int]:
element_count = ctypes.POINTER(ctypes.c_int)()
length = ctypes.c_int()
self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length)))
assert length.value == 5
category = ['node', 'tank', 'link', 'pump', 'valve']
sizes = {}
for i in range(length.value):
sizes[category[i]] = element_count[i]
self._lib.ENR_free(ctypes.byref(element_count))
return sizes
def units(self) -> dict[str, str]:
f_us = ['CFS', 'GPM', 'MGD', 'IMGD', 'AFD', 'LPS', 'LPM', 'MLD', 'CMH', 'CMD']
p_us = ['PSI', 'MTR', 'KPA']
q_us = ['NONE', 'MGL', 'UGL', 'HOURS', 'PRCNT']
f, p, q = ctypes.c_int(1), ctypes.c_int(2), ctypes.c_int(3)
f_u, p_u, q_u = ctypes.c_int(), ctypes.c_int(), ctypes.c_int()
self._check(self._lib.ENR_getUnits(self._handle, f, ctypes.byref(f_u)))
self._check(self._lib.ENR_getUnits(self._handle, p, ctypes.byref(p_u)))
self._check(self._lib.ENR_getUnits(self._handle, q, ctypes.byref(q_u)))
return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] }
def times(self) -> dict[str, int]:
ts = []
for i in range(1, 5):
t = ctypes.c_int(1)
self._check(self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t)))
ts.append(t.value)
d = {}
category = ['report_start', 'report_step', 'sim_duration', 'num_periods']
for i in range(4):
d[category[i]] = ts[i]
return d
def element_name(self) -> dict[str, list[str]]:
sizes = self.net_size()
node_type = ctypes.c_int(1)
nodes = []
for i in range(1, sizes['node'] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
nodes.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
link_type = ctypes.c_int(2)
links = []
for i in range(1, sizes['link'] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
links.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
return { 'nodes' : nodes, 'links': links }
# { pump_index, link_index, utilization, avg.efficiency, avg.kW/flow, avg.kwatts, max.kwatts, cost/day }
def energy_usage(self) -> list[dict[str, Any]]:
size = self.net_size()['pump']
usages = []
category = ['utilization', 'avg.efficiency', 'avg.kW/flow', 'avg.kwatts', 'max.kwatts', 'cost/day' ]
for i in range(1, size + 1):
index = ctypes.c_int()
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getEnergyUsage(self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length)))
assert length.value == 6
d = { 'pump_index' : i - 1, 'link_index' : index.value - 1 }
for j in range(length.value):
d |= { category[j] : values[j] }
usages.append(d)
return usages
def dump(self, cache: bool = True) -> str:
data = {}
data |= { 'version' : self.version() }
data |= { 'net_size' : self.net_size() }
data |= { 'units' : self.units() }
data |= { 'times' : self.times() }
data |= { 'element_name' : self.element_name() }
data |= { 'energy_usage' : self.energy_usage() }
if cache:
with open(self._path + '.json', 'w') as f:
json.dump(data, f)
return json.dumps(data)
def __del__(self):
# throw exception in destructor ? :)
self._check(self._lib.ENR_close(ctypes.byref(self._handle)))
def dump_output(path: str) -> str:
opt = Output(path)
return opt.dump()
def run_project(name: str) -> str:
if not project.have_project(name): if not project.have_project(name):
raise Exception("No such project!") raise Exception(f'Not found project [{name}]')
dir = os.path.dirname(os.getcwd()) dir = os.path.dirname(os.getcwd())
db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp') db_inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.db.inp')
parser.dump_inp(name, db_inp) parser.dump_inp(name, db_inp)
name += '.db' input = name + '.db'
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'db_inp'), input + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), input + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), input + '.opt')
command = f'{exe} {inp} {rpt} {opt}'
result = os.system(command)
if result != 0:
msg = f'Failed to run simulation for project [{name}]'
raise Exception(msg)
return dump_output(opt)
def run_inp(name: str) -> str:
dir = os.path.dirname(os.getcwd())
exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe') exe = os.path.join(os.path.join(dir, 'epanet'), 'runepanet.exe')
inp = os.path.join(os.path.join(dir, 'db_inp'), name + '.inp') inp = os.path.join(os.path.join(dir, 'inp'), name + '.inp')
rpt = os.path.join(os.path.join(dir, 'temp'), name + '.rpt') rpt = os.path.join(os.path.join(dir, 'temp'), name + '.rpt')
opt = os.path.join(os.path.join(dir, 'temp'), name + '.opt') opt = os.path.join(os.path.join(dir, 'temp'), name + '.opt')
command = f'{exe} {inp} {rpt} {opt}' command = f'{exe} {inp} {rpt} {opt}'
#print(command)
result = os.system(command) result = os.system(command)
return '' if result != 0:
msg = f'Failed to run simulation for project [{name}]'
raise Exception(msg)
return dump_output(opt)
if __name__ == '__main__': if __name__ == '__main__':
_load_epanet() _verify_platform()
run_project('net3') print(run_inp('net3'))

View File

@@ -1,5 +1,6 @@
from typing import Any from typing import Any
import api import api
import epanet
############################################################ ############################################################
@@ -129,6 +130,17 @@ def read_inp(name: str, inp: str) -> None:
def dump_inp(name: str, inp: str) -> None: def dump_inp(name: str, inp: str) -> None:
return api.dump_inp(name, inp) return api.dump_inp(name, inp)
def run_project(name: str) -> str:
return epanet.run_project(name)
# put in inp folder, name without extension
def run_inp(name: str) -> str:
return epanet.run_inp(name)
# path is absolute path
def dump_output(path: str) -> str:
return epanet.dump_output(path)
############################################################ ############################################################
# operation # operation