diff --git a/app/algorithms/simulation/scenarios.py b/app/algorithms/simulation/scenarios.py index 9db4ea7..239dab6 100644 --- a/app/algorithms/simulation/scenarios.py +++ b/app/algorithms/simulation/scenarios.py @@ -9,7 +9,6 @@ from app.algorithms.simulation.runner import ( run_simulation_ex, from_clock_to_seconds_2, ) -from app.infra.epanet.epanet import Output from app.services.scheme_management import store_scheme_info from app.services.tjnetwork import ( ChangeSet, @@ -666,21 +665,21 @@ def age_analysis( modify_total_duration, downloading_prohibition=True, ) + simulation_result = json.loads(result) + output_data = simulation_result.get("output") + if not isinstance(output_data, dict): + raise RuntimeError("run_simulation_ex did not return JSON output content") # step 2. restore the base model status # execute_undo(name) #有疑惑 if is_project_open(new_name): close_project(new_name) delete_project(new_name) - output = Output("./temp/{}.db.out".format(new_name)) - # element_name = output.element_name() - # node_name = element_name['nodes'] - # link_name = element_name['links'] nodes_age = [] - node_result = output.node_results() + node_result = output_data.get("node_results") or [] for node in node_result: nodes_age.append(node["result"][-1]["quality"]) links_age = [] - link_result = output.link_results() + link_result = output_data.get("link_results") or [] for link in link_result: links_age.append(link["result"][-1]["quality"]) age_result = {"nodes": nodes_age, "links": links_age} diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index 2f77051..34b0e3a 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -9,7 +9,6 @@ from fastapi.responses import PlainTextResponse import app.infra.db.influxdb.api as influxdb_api import app.services.simulation as simulation import app.services.globals as globals -from app.infra.cache.redis_client import redis_client from app.services.tjnetwork import ( run_project, run_project_return_dict, @@ -139,36 +138,23 @@ def run_simulation_manually_by_date( # 必须用这个PlainTextResponse,不然每个key都有引号 -@router.get("/runproject/", response_class=PlainTextResponse, summary="运行项目模拟", description="基于指定的管网项目运行标准水力模拟,返回纯文本格式的模拟报告。使用分布式锁机制确保同时只有一个模拟任务运行。") +@router.get("/runproject/", response_class=PlainTextResponse, summary="运行项目模拟", description="基于指定的管网项目运行标准水力模拟,返回纯文本格式的模拟报告。") async def run_project_endpoint(network: str = Query(..., description="管网名称(或数据库名称)")) -> str: """ 运行项目模拟 - **network**: 管网名称(或数据库名称) - 使用分布式锁机制确保同一时间只有一个模拟任务在运行。如果已有任务在运行,返回409状态码。 + 运行指定管网项目的标准水力模拟并返回文本报告。 """ - lock_key = "exclusive_api_lock" - timeout = 120 # 锁自动过期时间(秒) - - # 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间) - acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout) - - if not acquired: - raise HTTPException(status_code=409, detail="is in simulation") - else: - try: - return run_project(network) - finally: - # 手动释放锁(可选,依赖过期时间自动释放更安全) - redis_client.delete(lock_key) + return run_project(network) # DingZQ, 2025-02-04, 返回dict[str, Any] # output 和 report # output 是 json # report 是 text -@router.get("/runprojectreturndict/", summary="运行项目模拟(返回字典)", description="基于指定的管网项目运行标准水力模拟,返回JSON格式的字典,包含输出数据和报告文本。使用分布式锁机制确保同时只有一个模拟任务运行。") +@router.get("/runprojectreturndict/", summary="运行项目模拟(返回字典)", description="基于指定的管网项目运行标准水力模拟,返回JSON格式的字典,包含输出数据和报告文本。") async def run_project_return_dict_endpoint(network: str = Query(..., description="管网名称(或数据库名称)")) -> dict[str, Any]: """ 运行项目模拟(返回字典) @@ -179,22 +165,9 @@ async def run_project_return_dict_endpoint(network: str = Query(..., description - output: JSON格式的模拟输出数据 - report: 文本格式的模拟报告 - 使用分布式锁机制确保同一时间只有一个模拟任务在运行。 + 运行指定管网项目的标准水力模拟并返回字典结果。 """ - lock_key = "exclusive_api_lock" - timeout = 120 # 锁自动过期时间(秒) - - # 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间) - acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout) - - if not acquired: - raise HTTPException(status_code=409, detail="is in simulation") - else: - try: - return run_project_return_dict(network) - finally: - # 手动释放锁(可选,依赖过期时间自动释放更安全) - redis_client.delete(lock_key) + return run_project_return_dict(network) # put in inp folder, name without extension diff --git a/app/infra/epanet/epanet.py b/app/infra/epanet/epanet.py index 8283190..2bf327b 100644 --- a/app/infra/epanet/epanet.py +++ b/app/infra/epanet/epanet.py @@ -8,6 +8,7 @@ from datetime import datetime import subprocess import logging from typing import Any +import uuid sys.path.append("..") from app.native.wndb import project @@ -281,8 +282,6 @@ class Output: def _dump_output(path: str) -> dict[str, Any]: opt = Output(path) data = opt.dump() - with open(path + ".json", "w") as f: - json.dump(data, f) return data @@ -302,25 +301,37 @@ def dump_output_binary(path: str) -> str: return str(bast64_data, "utf-8") +def _safe_remove(path: str) -> None: + try: + if os.path.exists(path): + os.remove(path) + except Exception: + logging.warning("failed to remove temp file: %s", path) + + +def _make_isolated_run_paths(base_name: str, cwd: str) -> tuple[str, str, str]: + # 进程号 + UUID 生成唯一后缀,避免并发进程互相覆盖临时文件。 + token = f"{os.getpid()}_{uuid.uuid4().hex}" + inp = os.path.join(cwd, "db_inp", f"{base_name}.db.{token}.inp") + rpt = os.path.join(cwd, "temp", f"{base_name}.db.{token}.rpt") + opt = os.path.join(cwd, "temp", f"{base_name}.db.{token}.opt") + return inp, rpt, opt + + # DingZQ, 2025-02-04, 返回dict[str, Any] -def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]: +def run_project_return_dict(name: str, readable_output: bool = True) -> dict[str, Any]: if not project.have_project(name): raise Exception(f"Not found project [{name}]") - dir = os.path.abspath(os.getcwd()) + cwd = os.path.abspath(os.getcwd()) - db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp") - inp_out.dump_inp(name, db_inp, "2") + inp, rpt, opt = _make_isolated_run_paths(name, cwd) + inp_out.dump_inp(name, inp, "2") - input = name + ".db" if platform.system() == "Windows": exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") else: exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") - inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp") - rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt") - opt = os.path.join(os.path.join(dir, "temp"), input + ".opt") - command = f"{exe} {inp} {rpt} {opt}" if platform.system() != "Windows": if not os.access(exe, os.X_OK): @@ -334,8 +345,7 @@ def run_project_return_dict(name: str, readable_output: bool = False) -> dict[st lib_dir = os.path.dirname(exe) env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" - # 使用 subprocess 替代 os.system 以传递 env - process = subprocess.run(command, shell=True, env=env) + process = subprocess.run([exe, inp, rpt, opt], env=env) result = process.returncode if result != 0: @@ -347,33 +357,36 @@ def run_project_return_dict(name: str, readable_output: bool = False) -> dict[st else: data["output"] = dump_output_binary(opt) + data["input_file"] = inp + data["report_file"] = rpt + data["output_file"] = opt data["report"] = dump_report(rpt) + # 返回内容后删除仿真临时文件,避免临时文件堆积。 + _safe_remove(inp) + _safe_remove(rpt) + _safe_remove(opt) + return data # original code -def run_project(name: str, readable_output: bool = False) -> str: +def run_project(name: str, readable_output: bool = True) -> str: if not project.have_project(name): raise Exception(f"Not found project [{name}]") - dir = os.path.abspath(os.getcwd()) + cwd = os.path.abspath(os.getcwd()) - db_inp = os.path.join(os.path.join(dir, "db_inp"), name + ".db.inp") - inp_out.dump_inp(name, db_inp, "2") + inp, rpt, opt = _make_isolated_run_paths(name, cwd) + inp_out.dump_inp(name, inp, "2") - input = name + ".db" if platform.system() == "Windows": exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") else: exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") - inp = os.path.join(os.path.join(dir, "db_inp"), input + ".inp") - rpt = os.path.join(os.path.join(dir, "temp"), input + ".rpt") - opt = os.path.join(os.path.join(dir, "temp"), input + ".opt") - command = f"{exe} {inp} {rpt} {opt}" logging.info(f"Run simulation at {datetime.now()}") - logging.info(command) + logging.info("%s %s %s %s", exe, inp, rpt, opt) if platform.system() != "Windows": if not os.access(exe, os.X_OK): @@ -388,7 +401,7 @@ def run_project(name: str, readable_output: bool = False) -> str: env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" # DingZQ, 2025-06-02, 使用subprocess替代os.system - process = subprocess.run(command, shell=True, env=env) + process = subprocess.run([exe, inp, rpt, opt], env=env) result = process.returncode # logging.info(f"Simulation result: {result}") @@ -402,27 +415,35 @@ def run_project(name: str, readable_output: bool = False) -> str: logging.info("simulation successful") if readable_output: - data |= _dump_output(opt) + data["output"] = _dump_output(opt) else: data["output"] = dump_output_binary(opt) + data["input_file"] = inp + data["report_file"] = rpt + data["output_file"] = opt data["report"] = dump_report(rpt) + + # 返回内容后删除仿真临时文件,避免临时文件堆积。 + _safe_remove(inp) + _safe_remove(rpt) + _safe_remove(opt) # logging.info(f"Report: {data['report']}") return json.dumps(data) -def run_inp(name: str) -> str: - dir = os.path.abspath(os.getcwd()) +def run_inp(name: str, readable_output: bool = True) -> str: + cwd = os.path.abspath(os.getcwd()) if platform.system() == "Windows": exe = os.path.join(os.path.dirname(__file__), "windows", "runepanet.exe") else: exe = os.path.join(os.path.dirname(__file__), "linux", "runepanet") - inp = os.path.join(os.path.join(dir, "inp"), name + ".inp") - rpt = os.path.join(os.path.join(dir, "temp"), name + ".rpt") - opt = os.path.join(os.path.join(dir, "temp"), name + ".opt") - command = f"{exe} {inp} {rpt} {opt}" + source_inp = os.path.join(cwd, "inp", name + ".inp") + token = f"{os.getpid()}_{uuid.uuid4().hex}" + rpt = os.path.join(cwd, "temp", f"{name}.{token}.rpt") + opt = os.path.join(cwd, "temp", f"{name}.{token}.opt") if platform.system() != "Windows": if not os.access(exe, os.X_OK): @@ -436,16 +457,26 @@ def run_inp(name: str) -> str: lib_dir = os.path.dirname(exe) env["LD_LIBRARY_PATH"] = f"{lib_dir}:{env.get('LD_LIBRARY_PATH', '')}" - process = subprocess.run(command, shell=True, env=env) + process = subprocess.run([exe, source_inp, rpt, opt], env=env) result = process.returncode if result != 0: data["simulation_result"] = "failed" else: data["simulation_result"] = "successful" - # data |= _dump_output(opt) - data["output"] = dump_output_binary(opt) + if readable_output: + data["output"] = _dump_output(opt) + else: + data["output"] = dump_output_binary(opt) + data["input_file"] = source_inp + data["report_file"] = rpt + data["output_file"] = opt data["report"] = dump_report(rpt) + # 返回内容后删除仿真临时文件,避免临时文件堆积。 + _safe_remove(source_inp) + _safe_remove(rpt) + _safe_remove(opt) + return json.dumps(data) diff --git a/app/services/simulation.py b/app/services/simulation.py index 4ac2779..204c572 100644 --- a/app/services/simulation.py +++ b/app/services/simulation.py @@ -27,15 +27,12 @@ import json import pytz import requests import time -import shutil -from app.infra.epanet.epanet import Output from typing import Optional, Tuple import app.infra.db.influxdb.api as influxdb_api import typing import psycopg import logging import app.services.globals as globals -import uuid import app.services.project_info as project_info from app.core.config import get_pgconn_string from app.infra.db.timescaledb.internal_queries import ( @@ -1219,7 +1216,7 @@ def run_simulation( cs.append(valve_status) set_status(name_c, cs) # 运行并返回结果 - run_project(name_c) + result_data = json.loads(run_project(name_c)) time_cost_end = time.perf_counter() print( "{} -- Hydraulic simulation finished, cost time: {:.2f} s.".format( @@ -1227,23 +1224,22 @@ def run_simulation( time_cost_end - time_cost_start, ) ) - # DingZQ 下面这几句一定要这样,不然读取不了 - # time.sleep(5) # wait 5 seconds - - # TODO: 2025/03/24 - # DingZQ 这个名字要用随机数来处理 - tmp_file = f"./temp/simulation_{uuid.uuid4()}.result.out" - shutil.copy(f"./temp/{name_c}.db.opt", tmp_file) - - output = Output(tmp_file) - node_result = output.node_results() - link_result = output.link_results() - + output_data = result_data.get("output") + if not isinstance(output_data, dict): + raise RuntimeError("run_project did not return JSON output content") + node_result = output_data.get("node_results") + link_result = output_data.get("link_results") + if node_result is None or link_result is None: + raise RuntimeError("run_project output missing node_results or link_results") + # link_flow = [] # for link in link_result: # link_flow.append(link['result'][-1]['flow']) # print(link_flow) - num_periods_result = output.times()["num_periods"] + times_info = output_data.get("times") or {} + num_periods_result = times_info.get("num_periods") + if num_periods_result is None: + raise RuntimeError("run_project output missing times.num_periods") print("simulation_type", simulation_type) print("before store result") # print(num_periods_result) @@ -1274,27 +1270,8 @@ def run_simulation( # 暂不需要再次存储 SCADA 模拟信息 # TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name) - # if simulation_type.upper() == "REALTIME": - # influxdb_api.store_realtime_simulation_result_to_influxdb( - # node_result, link_result, modify_pattern_start_time - # ) - # elif simulation_type.upper() == "EXTENDED": - # influxdb_api.store_scheme_simulation_result_to_influxdb( - # node_result, - # link_result, - # modify_pattern_start_time, - # num_periods_result, - # scheme_type, - # scheme_name, - # ) - # 暂不需要再次存储 SCADA 模拟信息 - # influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name) - print("after store result") - del output - os.remove(tmp_file) - if __name__ == "__main__": # 计算前,获取scada_info中的信息,按照设定的方法修改pg数据库 diff --git a/app/services/simulation_ops.py b/app/services/simulation_ops.py index 2110deb..08b8020 100644 --- a/app/services/simulation_ops.py +++ b/app/services/simulation_ops.py @@ -5,7 +5,6 @@ from math import pi import pytz from app.algorithms.simulation.runner import run_simulation_ex -from app.infra.epanet.epanet import Output from app.services.tjnetwork import ( close_project, copy_project, @@ -110,10 +109,16 @@ def scheduling_simulation( + " -- Database Loading OK." ) - run_simulation_ex( - new_name, "realtime", start_time, duration=0, pump_control=pump_control + simulation_result = json.loads( + run_simulation_ex( + new_name, "realtime", start_time, duration=0, pump_control=pump_control + ) ) + output_data = simulation_result.get("output") + if not isinstance(output_data, dict): + raise RuntimeError("run_simulation_ex did not return JSON output content") + if not is_project_open(new_name): open_project(new_name) @@ -131,18 +136,14 @@ def scheduling_simulation( else: tank_pipe_flow_direction[pipe_id] = -1 - output = Output("./temp/{}.db.out".format(new_name)) - - node_results = ( - output.node_results() - ) # [{'node': str, 'result': [{'pressure': float}]}] + node_results = output_data.get("node_results") or [] # [{'node': str, 'result': [{'pressure': float}]}] water_plant_output_pressure = 0 for node_result in node_results: if node_result["node"] == water_plant_output_id: # 水厂出水压力(m) water_plant_output_pressure = node_result["result"][-1]["pressure"] water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa) - pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}] + pipe_results = output_data.get("link_results") or [] # [{'link': str, 'result': [{'flow': float}]}] tank_inflow = 0 for pipe_result in pipe_results: for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道 @@ -200,18 +201,24 @@ def daily_scheduling_simulation( + " -- Database Loading OK." ) - run_simulation_ex( - new_name, "realtime", start_time, duration=86400, pump_control=pump_control + simulation_result = json.loads( + run_simulation_ex( + new_name, + "realtime", + start_time, + duration=86400, + pump_control=pump_control, + ) ) + output_data = simulation_result.get("output") + if not isinstance(output_data, dict): + raise RuntimeError("run_simulation_ex did not return JSON output content") + if not is_project_open(new_name): open_project(new_name) - output = Output("./temp/{}.db.out".format(new_name)) - - node_results = ( - output.node_results() - ) # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] + node_results = output_data.get("node_results") or [] # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] water_plant_output_pressure = [] reservoir_level = [] tank_level = [] diff --git a/scripts/run_server.py b/scripts/run_server.py index 70416aa..57f2c67 100644 --- a/scripts/run_server.py +++ b/scripts/run_server.py @@ -16,6 +16,6 @@ if __name__ == "__main__": "app.main:app", host="0.0.0.0", port=8000, - # workers=4, # 这里可以设置多进程 + # workers=2, # 这里可以设置多进程 loop="asyncio", )