实现多进程 epanet 模拟,不保留临时文件

This commit is contained in:
2026-03-18 16:56:44 +08:00
parent c5d3075ae2
commit 7c44654195
6 changed files with 114 additions and 127 deletions
+6 -7
View File
@@ -9,7 +9,6 @@ from app.algorithms.simulation.runner import (
run_simulation_ex, run_simulation_ex,
from_clock_to_seconds_2, from_clock_to_seconds_2,
) )
from app.infra.epanet.epanet import Output
from app.services.scheme_management import store_scheme_info from app.services.scheme_management import store_scheme_info
from app.services.tjnetwork import ( from app.services.tjnetwork import (
ChangeSet, ChangeSet,
@@ -666,21 +665,21 @@ def age_analysis(
modify_total_duration, modify_total_duration,
downloading_prohibition=True, downloading_prohibition=True,
) )
simulation_result = json.loads(result)
output_data = simulation_result.get("output")
if not isinstance(output_data, dict):
raise RuntimeError("run_simulation_ex did not return JSON output content")
# step 2. restore the base model status # step 2. restore the base model status
# execute_undo(name) #有疑惑 # execute_undo(name) #有疑惑
if is_project_open(new_name): if is_project_open(new_name):
close_project(new_name) close_project(new_name)
delete_project(new_name) delete_project(new_name)
output = Output("./temp/{}.db.out".format(new_name))
# element_name = output.element_name()
# node_name = element_name['nodes']
# link_name = element_name['links']
nodes_age = [] nodes_age = []
node_result = output.node_results() node_result = output_data.get("node_results") or []
for node in node_result: for node in node_result:
nodes_age.append(node["result"][-1]["quality"]) nodes_age.append(node["result"][-1]["quality"])
links_age = [] links_age = []
link_result = output.link_results() link_result = output_data.get("link_results") or []
for link in link_result: for link in link_result:
links_age.append(link["result"][-1]["quality"]) links_age.append(link["result"][-1]["quality"])
age_result = {"nodes": nodes_age, "links": links_age} age_result = {"nodes": nodes_age, "links": links_age}
+6 -33
View File
@@ -9,7 +9,6 @@ from fastapi.responses import PlainTextResponse
import app.infra.db.influxdb.api as influxdb_api import app.infra.db.influxdb.api as influxdb_api
import app.services.simulation as simulation import app.services.simulation as simulation
import app.services.globals as globals import app.services.globals as globals
from app.infra.cache.redis_client import redis_client
from app.services.tjnetwork import ( from app.services.tjnetwork import (
run_project, run_project,
run_project_return_dict, run_project_return_dict,
@@ -139,36 +138,23 @@ def run_simulation_manually_by_date(
# 必须用这个PlainTextResponse,不然每个key都有引号 # 必须用这个PlainTextResponse,不然每个key都有引号
@router.get("/runproject/", response_class=PlainTextResponse, summary="运行项目模拟", description="基于指定的管网项目运行标准水力模拟,返回纯文本格式的模拟报告。使用分布式锁机制确保同时只有一个模拟任务运行。") @router.get("/runproject/", response_class=PlainTextResponse, summary="运行项目模拟", description="基于指定的管网项目运行标准水力模拟,返回纯文本格式的模拟报告。")
async def run_project_endpoint(network: str = Query(..., description="管网名称(或数据库名称)")) -> str: async def run_project_endpoint(network: str = Query(..., description="管网名称(或数据库名称)")) -> str:
""" """
运行项目模拟 运行项目模拟
- **network**: 管网名称(或数据库名称) - **network**: 管网名称(或数据库名称)
使用分布式锁机制确保同一时间只有一个模拟任务在运行。如果已有任务在运行,返回409状态码 运行指定管网项目的标准水力模拟并返回文本报告
""" """
lock_key = "exclusive_api_lock" return run_project(network)
timeout = 120 # 锁自动过期时间(秒)
# 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间)
acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout)
if not acquired:
raise HTTPException(status_code=409, detail="is in simulation")
else:
try:
return run_project(network)
finally:
# 手动释放锁(可选,依赖过期时间自动释放更安全)
redis_client.delete(lock_key)
# DingZQ, 2025-02-04, 返回dict[str, Any] # DingZQ, 2025-02-04, 返回dict[str, Any]
# output 和 report # output 和 report
# output 是 json # output 是 json
# report 是 text # report 是 text
@router.get("/runprojectreturndict/", summary="运行项目模拟(返回字典)", description="基于指定的管网项目运行标准水力模拟,返回JSON格式的字典,包含输出数据和报告文本。使用分布式锁机制确保同时只有一个模拟任务运行。") @router.get("/runprojectreturndict/", summary="运行项目模拟(返回字典)", description="基于指定的管网项目运行标准水力模拟,返回JSON格式的字典,包含输出数据和报告文本。")
async def run_project_return_dict_endpoint(network: str = Query(..., description="管网名称(或数据库名称)")) -> dict[str, Any]: async def run_project_return_dict_endpoint(network: str = Query(..., description="管网名称(或数据库名称)")) -> dict[str, Any]:
""" """
运行项目模拟(返回字典) 运行项目模拟(返回字典)
@@ -179,22 +165,9 @@ async def run_project_return_dict_endpoint(network: str = Query(..., description
- output: JSON格式的模拟输出数据 - output: JSON格式的模拟输出数据
- report: 文本格式的模拟报告 - report: 文本格式的模拟报告
使用分布式锁机制确保同一时间只有一个模拟任务在运行 运行指定管网项目的标准水力模拟并返回字典结果
""" """
lock_key = "exclusive_api_lock" return run_project_return_dict(network)
timeout = 120 # 锁自动过期时间(秒)
# 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间)
acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout)
if not acquired:
raise HTTPException(status_code=409, detail="is in simulation")
else:
try:
return run_project_return_dict(network)
finally:
# 手动释放锁(可选,依赖过期时间自动释放更安全)
redis_client.delete(lock_key)
# put in inp folder, name without extension # put in inp folder, name without extension
+65 -34
View File
@@ -8,6 +8,7 @@ from datetime import datetime
import subprocess import subprocess
import logging import logging
from typing import Any from typing import Any
import uuid
sys.path.append("..") sys.path.append("..")
from app.native.wndb import project from app.native.wndb import project
@@ -281,8 +282,6 @@ class Output:
def _dump_output(path: str) -> dict[str, Any]: def _dump_output(path: str) -> dict[str, Any]:
opt = Output(path) opt = Output(path)
data = opt.dump() data = opt.dump()
with open(path + ".json", "w") as f:
json.dump(data, f)
return data return data
@@ -302,25 +301,37 @@ def dump_output_binary(path: str) -> str:
return str(bast64_data, "utf-8") return str(bast64_data, "utf-8")
def _safe_remove(path: str) -> None:
try:
if os.path.exists(path):
os.remove(path)
except Exception:
logging.warning("failed to remove temp file: %s", path)
def _make_isolated_run_paths(base_name: str, cwd: str) -> tuple[str, str, str]:
# 进程号 + UUID 生成唯一后缀,避免并发进程互相覆盖临时文件。
token = f"{os.getpid()}_{uuid.uuid4().hex}"
inp = os.path.join(cwd, "db_inp", f"{base_name}.db.{token}.inp")
rpt = os.path.join(cwd, "temp", f"{base_name}.db.{token}.rpt")
opt = os.path.join(cwd, "temp", f"{base_name}.db.{token}.opt")
return inp, rpt, opt
# DingZQ, 2025-02-04, 返回dict[str, Any] # DingZQ, 2025-02-04, 返回dict[str, Any]
def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]: def run_project_return_dict(name: str, readable_output: bool = True) -> dict[str, Any]:
if not project.have_project(name): if not project.have_project(name):
raise Exception(f"Not found project [{name}]") raise Exception(f"Not found project [{name}]")
dir = os.path.abspath(os.getcwd()) cwd = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp") inp, rpt, opt = _make_isolated_run_paths(name, cwd)
inp_out.dump_inp(name, db_inp, "2") inp_out.dump_inp(name, inp, "2")
input = name + ".db"
if platform.system() == "Windows": if platform.system() == "Windows":
exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe")
else: else:
exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), input + ".opt")
command = f"{exe} {inp} {rpt} {opt}"
if platform.system() != "Windows": if platform.system() != "Windows":
if not os.access(exe, os.X_OK): if not os.access(exe, os.X_OK):
@@ -334,8 +345,7 @@ def run_project_return_dict(name: str, readable_output: bool = False) -> dict[st
lib_dir = os.path.dirname(exe) lib_dir = os.path.dirname(exe)
env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}"
# 使用 subprocess 替代 os.system 以传递 env process = subprocess.run([exe, inp, rpt, opt], env=env)
process = subprocess.run(command, shell=True, env=env)
result = process.returncode result = process.returncode
if result != 0: if result != 0:
@@ -347,33 +357,36 @@ def run_project_return_dict(name: str, readable_output: bool = False) -> dict[st
else: else:
data["output"] = dump_output_binary(opt) data["output"] = dump_output_binary(opt)
data["input_file"] = inp
data["report_file"] = rpt
data["output_file"] = opt
data["report"] = dump_report(rpt) data["report"] = dump_report(rpt)
# 返回内容后删除仿真临时文件,避免临时文件堆积。
_safe_remove(inp)
_safe_remove(rpt)
_safe_remove(opt)
return data return data
# original code # original code
def run_project(name: str, readable_output: bool = False) -> str: def run_project(name: str, readable_output: bool = True) -> str:
if not project.have_project(name): if not project.have_project(name):
raise Exception(f"Not found project [{name}]") raise Exception(f"Not found project [{name}]")
dir = os.path.abspath(os.getcwd()) cwd = os.path.abspath(os.getcwd())
db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp") inp, rpt, opt = _make_isolated_run_paths(name, cwd)
inp_out.dump_inp(name, db_inp, "2") inp_out.dump_inp(name, inp, "2")
input = name + ".db"
if platform.system() == "Windows": if platform.system() == "Windows":
exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe")
else: else:
exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt")
opt = os.path.join(os.path.join(dir, "temp"), input + ".opt")
command = f"{exe} {inp} {rpt} {opt}"
logging.info(f"Run simulation at {datetime.now()}") logging.info(f"Run simulation at {datetime.now()}")
logging.info(command) logging.info("%s %s %s %s", exe, inp, rpt, opt)
if platform.system() != "Windows": if platform.system() != "Windows":
if not os.access(exe, os.X_OK): if not os.access(exe, os.X_OK):
@@ -388,7 +401,7 @@ def run_project(name: str, readable_output: bool = False) -> str:
env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}"
# DingZQ, 2025-06-02, 使用subprocess替代os.system # DingZQ, 2025-06-02, 使用subprocess替代os.system
process = subprocess.run(command, shell=True, env=env) process = subprocess.run([exe, inp, rpt, opt], env=env)
result = process.returncode result = process.returncode
# logging.info(f"Simulation result: {result}") # logging.info(f"Simulation result: {result}")
@@ -402,27 +415,35 @@ def run_project(name: str, readable_output: bool = False) -> str:
logging.info("simulation successful") logging.info("simulation successful")
if readable_output: if readable_output:
data |= _dump_output(opt) data["output"] = _dump_output(opt)
else: else:
data["output"] = dump_output_binary(opt) data["output"] = dump_output_binary(opt)
data["input_file"] = inp
data["report_file"] = rpt
data["output_file"] = opt
data["report"] = dump_report(rpt) data["report"] = dump_report(rpt)
# 返回内容后删除仿真临时文件,避免临时文件堆积。
_safe_remove(inp)
_safe_remove(rpt)
_safe_remove(opt)
# logging.info(f"Report: {data['report']}") # logging.info(f"Report: {data['report']}")
return json.dumps(data) return json.dumps(data)
def run_inp(name: str) -> str: def run_inp(name: str, readable_output: bool = True) -> str:
dir = os.path.abspath(os.getcwd()) cwd = os.path.abspath(os.getcwd())
if platform.system() == "Windows": if platform.system() == "Windows":
exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe")
else: else:
exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet")
inp = os.path.join(os.path.join(dir, "inp"), name + ".inp") source_inp = os.path.join(cwd, "inp", name + ".inp")
rpt = os.path.join(os.path.join(dir, "temp"), name + ".rpt") token = f"{os.getpid()}_{uuid.uuid4().hex}"
opt = os.path.join(os.path.join(dir, "temp"), name + ".opt") rpt = os.path.join(cwd, "temp", f"{name}.{token}.rpt")
command = f"{exe} {inp} {rpt} {opt}" opt = os.path.join(cwd, "temp", f"{name}.{token}.opt")
if platform.system() != "Windows": if platform.system() != "Windows":
if not os.access(exe, os.X_OK): if not os.access(exe, os.X_OK):
@@ -436,16 +457,26 @@ def run_inp(name: str) -> str:
lib_dir = os.path.dirname(exe) lib_dir = os.path.dirname(exe)
env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}"
process = subprocess.run(command, shell=True, env=env) process = subprocess.run([exe, source_inp, rpt, opt], env=env)
result = process.returncode result = process.returncode
if result != 0: if result != 0:
data["simulation_result"] = "failed" data["simulation_result"] = "failed"
else: else:
data["simulation_result"] = "successful" data["simulation_result"] = "successful"
# data |= _dump_output(opt) if readable_output:
data["output"] = dump_output_binary(opt) data["output"] = _dump_output(opt)
else:
data["output"] = dump_output_binary(opt)
data["input_file"] = source_inp
data["report_file"] = rpt
data["output_file"] = opt
data["report"] = dump_report(rpt) data["report"] = dump_report(rpt)
# 返回内容后删除仿真临时文件,避免临时文件堆积。
_safe_remove(source_inp)
_safe_remove(rpt)
_safe_remove(opt)
return json.dumps(data) return json.dumps(data)
+13 -36
View File
@@ -27,15 +27,12 @@ import json
import pytz import pytz
import requests import requests
import time import time
import shutil
from app.infra.epanet.epanet import Output
from typing import Optional, Tuple from typing import Optional, Tuple
import app.infra.db.influxdb.api as influxdb_api import app.infra.db.influxdb.api as influxdb_api
import typing import typing
import psycopg import psycopg
import logging import logging
import app.services.globals as globals import app.services.globals as globals
import uuid
import app.services.project_info as project_info import app.services.project_info as project_info
from app.core.config import get_pgconn_string from app.core.config import get_pgconn_string
from app.infra.db.timescaledb.internal_queries import ( from app.infra.db.timescaledb.internal_queries import (
@@ -1219,7 +1216,7 @@ def run_simulation(
cs.append(valve_status) cs.append(valve_status)
set_status(name_c, cs) set_status(name_c, cs)
# 运行并返回结果 # 运行并返回结果
run_project(name_c) result_data = json.loads(run_project(name_c))
time_cost_end = time.perf_counter() time_cost_end = time.perf_counter()
print( print(
"{} -- Hydraulic simulation finished, cost time: {:.2f} s.".format( "{} -- Hydraulic simulation finished, cost time: {:.2f} s.".format(
@@ -1227,23 +1224,22 @@ def run_simulation(
time_cost_end - time_cost_start, time_cost_end - time_cost_start,
) )
) )
# DingZQ 下面这几句一定要这样,不然读取不了 output_data = result_data.get("output")
# time.sleep(5) # wait 5 seconds if not isinstance(output_data, dict):
raise RuntimeError("run_project did not return JSON output content")
# TODO: 2025/03/24 node_result = output_data.get("node_results")
# DingZQ 这个名字要用随机数来处理 link_result = output_data.get("link_results")
tmp_file = f"./temp/simulation_{uuid.uuid4()}.result.out" if node_result is None or link_result is None:
shutil.copy(f"./temp/{name_c}.db.opt", tmp_file) raise RuntimeError("run_project output missing node_results or link_results")
output = Output(tmp_file)
node_result = output.node_results()
link_result = output.link_results()
# link_flow = [] # link_flow = []
# for link in link_result: # for link in link_result:
# link_flow.append(link['result'][-1]['flow']) # link_flow.append(link['result'][-1]['flow'])
# print(link_flow) # print(link_flow)
num_periods_result = output.times()["num_periods"] times_info = output_data.get("times") or {}
num_periods_result = times_info.get("num_periods")
if num_periods_result is None:
raise RuntimeError("run_project output missing times.num_periods")
print("simulation_type", simulation_type) print("simulation_type", simulation_type)
print("before store result") print("before store result")
# print(num_periods_result) # print(num_periods_result)
@@ -1274,27 +1270,8 @@ def run_simulation(
# 暂不需要再次存储 SCADA 模拟信息 # 暂不需要再次存储 SCADA 模拟信息
# TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name) # TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
# if simulation_type.upper() == "REALTIME":
# influxdb_api.store_realtime_simulation_result_to_influxdb(
# node_result, link_result, modify_pattern_start_time
# )
# elif simulation_type.upper() == "EXTENDED":
# influxdb_api.store_scheme_simulation_result_to_influxdb(
# node_result,
# link_result,
# modify_pattern_start_time,
# num_periods_result,
# scheme_type,
# scheme_name,
# )
# 暂不需要再次存储 SCADA 模拟信息
# influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
print("after store result") print("after store result")
del output
os.remove(tmp_file)
if __name__ == "__main__": if __name__ == "__main__":
# 计算前,获取scada_info中的信息,按照设定的方法修改pg数据库 # 计算前,获取scada_info中的信息,按照设定的方法修改pg数据库
+23 -16
View File
@@ -5,7 +5,6 @@ from math import pi
import pytz import pytz
from app.algorithms.simulation.runner import run_simulation_ex from app.algorithms.simulation.runner import run_simulation_ex
from app.infra.epanet.epanet import Output
from app.services.tjnetwork import ( from app.services.tjnetwork import (
close_project, close_project,
copy_project, copy_project,
@@ -110,10 +109,16 @@ def scheduling_simulation(
+ " -- Database Loading OK." + " -- Database Loading OK."
) )
run_simulation_ex( simulation_result = json.loads(
new_name, "realtime", start_time, duration=0, pump_control=pump_control run_simulation_ex(
new_name, "realtime", start_time, duration=0, pump_control=pump_control
)
) )
output_data = simulation_result.get("output")
if not isinstance(output_data, dict):
raise RuntimeError("run_simulation_ex did not return JSON output content")
if not is_project_open(new_name): if not is_project_open(new_name):
open_project(new_name) open_project(new_name)
@@ -131,18 +136,14 @@ def scheduling_simulation(
else: else:
tank_pipe_flow_direction[pipe_id] = -1 tank_pipe_flow_direction[pipe_id] = -1
output = Output("./temp/{}.db.out".format(new_name)) node_results = output_data.get("node_results") or [] # [{'node': str, 'result': [{'pressure': float}]}]
node_results = (
output.node_results()
) # [{'node': str, 'result': [{'pressure': float}]}]
water_plant_output_pressure = 0 water_plant_output_pressure = 0
for node_result in node_results: for node_result in node_results:
if node_result["node"] == water_plant_output_id: # 水厂出水压力(m) if node_result["node"] == water_plant_output_id: # 水厂出水压力(m)
water_plant_output_pressure = node_result["result"][-1]["pressure"] water_plant_output_pressure = node_result["result"][-1]["pressure"]
water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa) water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa)
pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}] pipe_results = output_data.get("link_results") or [] # [{'link': str, 'result': [{'flow': float}]}]
tank_inflow = 0 tank_inflow = 0
for pipe_result in pipe_results: for pipe_result in pipe_results:
for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道 for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道
@@ -200,18 +201,24 @@ def daily_scheduling_simulation(
+ " -- Database Loading OK." + " -- Database Loading OK."
) )
run_simulation_ex( simulation_result = json.loads(
new_name, "realtime", start_time, duration=86400, pump_control=pump_control run_simulation_ex(
new_name,
"realtime",
start_time,
duration=86400,
pump_control=pump_control,
)
) )
output_data = simulation_result.get("output")
if not isinstance(output_data, dict):
raise RuntimeError("run_simulation_ex did not return JSON output content")
if not is_project_open(new_name): if not is_project_open(new_name):
open_project(new_name) open_project(new_name)
output = Output("./temp/{}.db.out".format(new_name)) node_results = output_data.get("node_results") or [] # [{'node': str, 'result': [{'pressure': float, 'head': float}]}]
node_results = (
output.node_results()
) # [{'node': str, 'result': [{'pressure': float, 'head': float}]}]
water_plant_output_pressure = [] water_plant_output_pressure = []
reservoir_level = [] reservoir_level = []
tank_level = [] tank_level = []
+1 -1
View File
@@ -16,6 +16,6 @@ if __name__ == "__main__":
"app.main:app", "app.main:app",
host="0.0.0.0", host="0.0.0.0",
port=8000, port=8000,
# workers=4, # 这里可以设置多进程 # workers=2, # 这里可以设置多进程
loop="asyncio", loop="asyncio",
) )