Support dump output to json

This commit is contained in:
WQY\qiong
2022-11-19 18:09:48 +08:00
parent 0d946af50b
commit dbc8b04114

View File

@@ -1,7 +1,9 @@
import ctypes import ctypes
import platform import platform
import os import os
import sys import sys
import json
from typing import Any
sys.path.append("..") sys.path.append("..")
from api import project from api import project
from api import parser from api import parser
@@ -23,8 +25,8 @@ class Output:
self._check(self._lib.ENR_init(ctypes.byref(self._handle))) self._check(self._lib.ENR_init(ctypes.byref(self._handle)))
dir = os.path.dirname(os.getcwd()) dir = os.path.dirname(os.getcwd())
opt = os.path.join(os.path.join(dir, 'temp'), self._name + '.db.opt') self.opt = os.path.join(os.path.join(dir, 'temp'), self._name + '.db.opt')
self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(opt.encode()))) self._check(self._lib.ENR_open(self._handle, ctypes.c_char_p(self.opt.encode())))
def _check(self, result): def _check(self, result):
@@ -52,11 +54,11 @@ class Output:
self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length))) self._check(self._lib.ENR_getNetSize(self._handle, ctypes.byref(element_count), ctypes.byref(length)))
assert length.value == 5 assert length.value == 5
category = ['node', 'tank', 'link', 'pump', 'valve'] category = ['node', 'tank', 'link', 'pump', 'valve']
counts = {} sizes = {}
for i in range(length.value): for i in range(length.value):
counts[category[i]] = element_count[i] sizes[category[i]] = element_count[i]
self._lib.ENR_free(ctypes.byref(element_count)) self._lib.ENR_free(ctypes.byref(element_count))
return counts return sizes
def units(self) -> dict[str, str]: def units(self) -> dict[str, str]:
@@ -71,6 +73,61 @@ class Output:
return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] } return { 'flow': f_us[f_u.value], 'pressure': p_us[p_u.value], 'quality': q_us[q_u.value] }
def times(self) -> dict[str, int]:
ts = []
for i in range(1, 5):
t = ctypes.c_int(1)
self._check(self._lib.ENR_getTimes(self._handle, ctypes.c_int(i), ctypes.byref(t)))
ts.append(t.value)
d = {}
category = ['report_start', 'report_step', 'sim_duration', 'num_periods']
for i in range(4):
d[category[i]] = ts[i]
return d
def element_name(self) -> dict[str, list[str]]:
sizes = self.net_size()
node_type = ctypes.c_int(1)
nodes = []
for i in range(1, sizes['node'] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, node_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
nodes.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
link_type = ctypes.c_int(2)
links = []
for i in range(1, sizes['link'] + 1):
name = ctypes.c_char_p()
name_len = ctypes.c_int()
self._check(self._lib.ENR_getElementName(self._handle, link_type, ctypes.c_int(i), ctypes.byref(name), ctypes.byref(name_len)))
links.append(name.value.decode())
self._lib.ENR_free(ctypes.byref(name))
return { 'nodes' : nodes, 'links': links }
# { pump_index, link_index, utilization, avg.efficiency, avg.kW/flow, avg.kwatts, max.kwatts, cost/day }
def energy_usage(self) -> list[dict[str, Any]]:
size = self.net_size()['pump']
usages = []
category = ['utilization', 'avg.efficiency', 'avg.kW/flow', 'avg.kwatts', 'max.kwatts', 'cost/day' ]
for i in range(1, size + 1):
index = ctypes.c_int()
values = ctypes.POINTER(ctypes.c_float)()
length = ctypes.c_int()
self._check(self._lib.ENR_getEnergyUsage(self._handle, ctypes.c_int(i), ctypes.byref(index), ctypes.byref(values), ctypes.byref(length)))
assert length.value == 6
d = { 'pump_index' : i - 1, 'link_index' : index.value - 1 }
for j in range(length.value):
d |= { category[j] : values[j] }
usages.append(d)
return usages
def __del__(self): def __del__(self):
# throw exception in destructor ? :) # throw exception in destructor ? :)
self._check(self._lib.ENR_close(ctypes.byref(self._handle))) self._check(self._lib.ENR_close(ctypes.byref(self._handle)))
@@ -78,14 +135,19 @@ class Output:
def _read_output(name: str) -> str: def _read_output(name: str) -> str:
opt = Output(name) opt = Output(name)
print(opt.version()) data = {}
print(opt.net_size()) data |= { 'version' : opt.version() }
print(opt.units()) data |= { 'net_size' : opt.net_size() }
return '' data |= { 'units' : opt.units() }
data |= { 'times' : opt.times() }
data |= { 'element_name' : opt.element_name() }
data |= { 'energy_usage' : opt.energy_usage() }
with open(opt.opt + '.json', 'w') as f:
json.dump(data, f)
return json.dumps(data)
# readable True => json, False => binary output def run_project(name: str) -> str:
def run_project(name: str, readable: bool = True) -> str:
if not project.have_project(name): if not project.have_project(name):
raise Exception(f'Not found project [{name}]') raise Exception(f'Not found project [{name}]')
@@ -110,4 +172,4 @@ def run_project(name: str, readable: bool = True) -> str:
if __name__ == '__main__': if __name__ == '__main__':
_verify_platform() _verify_platform()
_read_output('net3') print(_read_output('net3'))