From 7c9667822fcbed0d1ef7aeab31b8bb3ef44eac94 Mon Sep 17 00:00:00 2001 From: Jiang Date: Mon, 26 Jan 2026 17:22:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8B=86=E5=88=86online=5FAnalysis.py=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/algorithms/__init__.py | 28 + app/algorithms/data_cleaning.py | 57 + app/algorithms/online_Analysis.py | 1568 ---------------------------- app/algorithms/sensors.py | 91 ++ app/algorithms/simulations.py | 688 ++++++++++++ app/api/v1/endpoints/simulation.py | 10 +- app/services/__init__.py | 32 + app/services/network_import.py | 197 ++++ app/services/scheme_management.py | 266 +++++ app/services/simulation_ops.py | 233 +++++ 10 files changed, 1597 insertions(+), 1573 deletions(-) create mode 100644 app/algorithms/data_cleaning.py delete mode 100644 app/algorithms/online_Analysis.py create mode 100644 app/algorithms/sensors.py create mode 100644 app/algorithms/simulations.py create mode 100644 app/services/network_import.py create mode 100644 app/services/scheme_management.py create mode 100644 app/services/simulation_ops.py diff --git a/app/algorithms/__init__.py b/app/algorithms/__init__.py index e69de29..c64213d 100644 --- a/app/algorithms/__init__.py +++ b/app/algorithms/__init__.py @@ -0,0 +1,28 @@ +from app.algorithms.data_cleaning import flow_data_clean, pressure_data_clean +from app.algorithms.sensors import ( + pressure_sensor_placement_sensitivity, + pressure_sensor_placement_kmeans, +) +from app.algorithms.simulations import ( + convert_to_local_unit, + burst_analysis, + valve_close_analysis, + flushing_analysis, + contaminant_simulation, + age_analysis, + pressure_regulation, +) + +__all__ = [ + "flow_data_clean", + "pressure_data_clean", + "pressure_sensor_placement_sensitivity", + "pressure_sensor_placement_kmeans", + "convert_to_local_unit", + "burst_analysis", + "valve_close_analysis", + "flushing_analysis", + "contaminant_simulation", + "age_analysis", + "pressure_regulation", +] diff --git a/app/algorithms/data_cleaning.py b/app/algorithms/data_cleaning.py new file mode 100644 index 0000000..fcad898 --- /dev/null +++ b/app/algorithms/data_cleaning.py @@ -0,0 +1,57 @@ +import os + +import app.algorithms.api_ex.Fdataclean as Fdataclean +import app.algorithms.api_ex.Pdataclean as Pdataclean + + +############################################################ +# 流量监测数据清洗 ***卡尔曼滤波法*** +############################################################ +# 2025/08/21 hxyan + + +def flow_data_clean(input_csv_file: str) -> str: + """ + 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 + 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 + :param: input_csv_file: 输入的 CSV 文件明或路径 + :return: 输出文件的绝对路径 + """ + + # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 + script_dir = os.path.dirname(os.path.abspath(__file__)) + input_csv_path = os.path.join(script_dir, input_csv_file) + + # 检查文件是否存在 + if not os.path.exists(input_csv_path): + raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") + # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 + out_xlsx_path = Fdataclean.clean_flow_data_kf(input_csv_path) + print("清洗后的数据已保存到:", out_xlsx_path) + + +############################################################ +# 压力监测数据清洗 ***kmean++法*** +############################################################ +# 2025/08/21 hxyan + + +def pressure_data_clean(input_csv_file: str) -> str: + """ + 读取 input_csv_path 中的每列时间序列,使用Kmean++清洗数据。 + 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 + 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 + :param input_csv_path: 输入的 CSV 文件路径 + :return: 输出文件的绝对路径 + """ + + # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 + script_dir = os.path.dirname(os.path.abspath(__file__)) + input_csv_path = os.path.join(script_dir, input_csv_file) + + # 检查文件是否存在 + if not os.path.exists(input_csv_path): + raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") + # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 + out_xlsx_path = Pdataclean.clean_pressure_data_km(input_csv_path) + print("清洗后的数据已保存到:", out_xlsx_path) diff --git a/app/algorithms/online_Analysis.py b/app/algorithms/online_Analysis.py deleted file mode 100644 index 011bd48..0000000 --- a/app/algorithms/online_Analysis.py +++ /dev/null @@ -1,1568 +0,0 @@ -import os -from app.services.tjnetwork import * -from app.native.api.project import copy_project -from app.algorithms.api_ex.run_simulation import run_simulation_ex, from_clock_to_seconds_2 -from math import sqrt, pi -from app.services.epanet.epanet import Output -import json -from datetime import datetime -import time -import pytz -import psycopg -from psycopg import sql -import pandas as pd -import csv -import chardet -import app.services.simulation as simulation -import geopandas as gpd -from sqlalchemy import create_engine -import ast -import app.services.project_info as project_info -import app.algorithms.api_ex.kmeans_sensor as kmeans_sensor -import app.algorithms.api_ex.Fdataclean as Fdataclean -import app.algorithms.api_ex.Pdataclean as Pdataclean -import app.algorithms.api_ex.sensitivity as sensitivity -from app.native.api.postgresql_info import get_pgconn_string - - -############################################################ -# burst analysis 01 -############################################################ -def convert_to_local_unit(proj: str, emitters: float) -> float: - open_project(proj) - proj_opt = get_option(proj) - str_unit = proj_opt.get("UNITS") - - if str_unit == "CMH": - return emitters * 3.6 - elif str_unit == "LPS": - return emitters - elif str_unit == "CMS": - return emitters / 1000.0 - elif str_unit == "MGD": - return emitters * 0.0438126 - - # Unknown unit: log and return original value - print(str_unit) - return emitters - - -def burst_analysis( - name: str, - modify_pattern_start_time: str, - burst_ID: list | str = None, - burst_size: list | float | int = None, - modify_total_duration: int = 900, - modify_fixed_pump_pattern: dict[str, list] = None, - modify_variable_pump_pattern: dict[str, list] = None, - modify_valve_opening: dict[str, float] = None, - scheme_Name: str = None, -) -> None: - """ - 爆管模拟 - :param name: 模型名称,数据库中对应的名字 - :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' - :param burst_ID: 爆管管道的ID,选取的是管道,单独传入一个爆管管道,可以是str或list,传入多个爆管管道是用list - :param burst_size: 爆管管道破裂的孔口面积,和burst_ID列表各位置的ID对应,以cm*cm计算 - :param modify_total_duration: 模拟总历时,秒 - :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern - :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern - :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 - :param scheme_Name: 方案名称 - :return: - """ - scheme_detail: dict = { - "burst_ID": burst_ID, - "burst_size": burst_size, - "modify_total_duration": modify_total_duration, - "modify_fixed_pump_pattern": modify_fixed_pump_pattern, - "modify_variable_pump_pattern": modify_variable_pump_pattern, - "modify_valve_opening": modify_valve_opening, - } - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"burst_Anal_{name}" - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - simulation.run_simulation( - name=new_name, - simulation_type="manually_temporary", - modify_pattern_start_time=modify_pattern_start_time, - ) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - ##step 1 set the emitter coefficient of end node of busrt pipe - if isinstance(burst_ID, list): - if (burst_size is not None) and (type(burst_size) is not list): - return json.dumps("Type mismatch.") - elif isinstance(burst_ID, str): - burst_ID = [burst_ID] - if burst_size is not None: - if isinstance(burst_size, float) or isinstance(burst_size, int): - burst_size = [burst_size] - else: - return json.dumps("Type mismatch.") - else: - return json.dumps("Type mismatch.") - if burst_size is None: - burst_size = [-1] * len(burst_ID) - elif len(burst_size) < len(burst_ID): - burst_size += [-1] * (len(burst_ID) - len(burst_size)) - elif len(burst_size) > len(burst_ID): - # burst_size = burst_size[:len(burst_ID)] - return json.dumps("Length mismatch.") - for burst_ID_, burst_size_ in zip(burst_ID, burst_size): - pipe = get_pipe(new_name, burst_ID_) - str_start_node = pipe["node1"] - str_end_node = pipe["node2"] - d_pipe = pipe["diameter"] / 1000.0 - if burst_size_ <= 0: - burst_size_ = 3.14 * d_pipe * d_pipe / 4 / 8 - else: - burst_size_ = burst_size_ / 10000 - emitter_coeff = ( - 0.65 * burst_size_ * sqrt(19.6) * 1000 - ) # 1/8开口面积作为coeff,单位 L/S - emitter_coeff = convert_to_local_unit(new_name, emitter_coeff) - emitter_node = "" - if is_junction(new_name, str_end_node): - emitter_node = str_end_node - elif is_junction(new_name, str_start_node): - emitter_node = str_start_node - old_emitter = get_emitter(new_name, emitter_node) - if old_emitter != None: - old_emitter["coefficient"] = emitter_coeff # 爆管的emitter coefficient设置 - else: - old_emitter = {"junction": emitter_node, "coefficient": emitter_coeff} - new_emitter = ChangeSet() - new_emitter.append(old_emitter) - set_emitter(new_name, new_emitter) - # step 2. run simulation - # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA - options = get_option(new_name) - options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA - options["REQUIRED PRESSURE"] = "10.0000" - cs_options = ChangeSet() - cs_options.append(options) - set_option(new_name, cs_options) - # valve_control = None - # if modify_valve_opening is not None: - # valve_control = {} - # for valve in modify_valve_opening: - # valve_control[valve] = {'status': 'CLOSED'} - # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, - # end_datetime=modify_pattern_start_time, - # modify_total_duration=modify_total_duration, - # modify_pump_pattern=modify_pump_pattern, - # valve_control=valve_control, - # downloading_prohibition=True) - simulation.run_simulation( - name=new_name, - simulation_type="extended", - modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, - modify_fixed_pump_pattern=modify_fixed_pump_pattern, - modify_variable_pump_pattern=modify_variable_pump_pattern, - modify_valve_opening=modify_valve_opening, - scheme_Type="burst_Analysis", - scheme_Name=scheme_Name, - ) - # step 3. restore the base model status - # execute_undo(name) #有疑惑 - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # return result - store_scheme_info( - name=name, - scheme_name=scheme_Name, - scheme_type="burst_Analysis", - username="admin", - scheme_start_time=modify_pattern_start_time, - scheme_detail=scheme_detail, - ) - - -############################################################ -# valve closing analysis 02 -############################################################ -def valve_close_analysis( - name: str, - modify_pattern_start_time: str, - modify_total_duration: int = 900, - modify_valve_opening: dict[str, float] = None, - scheme_Name: str = None, -) -> None: - """ - 关阀模拟 - :param name: 模型名称,数据库中对应的名字 - :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' - :param modify_total_duration: 模拟总历时,秒 - :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 - :param scheme_Name: 方案名称 - :return: - """ - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"valve_close_Anal_{name}" - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - # step 1. change the valves status to 'closed' - # for valve in valves: - # if not is_valve(new_name,valve): - # result='ID:{}is not a valve type'.format(valve) - # return result - # cs=ChangeSet() - # status=get_status(new_name,valve) - # status['status']='CLOSED' - # cs.append(status) - # set_status(new_name,cs) - # step 2. run simulation - # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA - options = get_option(new_name) - options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA - options["REQUIRED PRESSURE"] = "20.0000" - cs_options = ChangeSet() - cs_options.append(options) - set_option(new_name, cs_options) - # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, - # downloading_prohibition=True) - simulation.run_simulation( - name=new_name, - simulation_type="extended", - modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, - modify_valve_opening=modify_valve_opening, - scheme_Type="valve_close_Analysis", - scheme_Name=scheme_Name, - ) - # step 3. restore the base model - # for valve in valves: - # execute_undo(name) - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # return result - - -############################################################ -# flushing analysis 03 -# Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string -############################################################ -def flushing_analysis( - name: str, - modify_pattern_start_time: str, - modify_total_duration: int = 900, - modify_valve_opening: dict[str, float] = None, - drainage_node_ID: str = None, - flushing_flow: float = 0, - scheme_Name: str = None, -) -> None: - """ - 管道冲洗模拟 - :param name: 模型名称,数据库中对应的名字 - :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' - :param modify_total_duration: 模拟总历时,秒 - :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 - :param drainage_node_ID: 冲洗排放口所在节点ID - :param flushing_flow: 冲洗水量,传入参数单位为m3/h - :param scheme_Name: 方案名称 - :return: - """ - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"flushing_Anal_{name}" - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # if is_project_open(name): - # close_project(name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - if not is_junction(new_name, drainage_node_ID): - return "Wrong Drainage node type" - # step 1. change the valves status to 'closed' - # for valve, valve_k in zip(valves, valves_k): - # cs=ChangeSet() - # status=get_status(new_name,valve) - # # status['status']='CLOSED' - # if valve_k == 0: - # status['status'] = 'CLOSED' - # elif valve_k < 1: - # status['status'] = 'OPEN' - # status['setting'] = 0.1036 * pow(valve_k, -3.105) - # cs.append(status) - # set_status(new_name,cs) - units = get_option(new_name) - # step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node - emitter_demand = get_demand(new_name, drainage_node_ID) - cs = ChangeSet() - if flushing_flow > 0: - for r in emitter_demand["demands"]: - if units == "LPS": - r["demand"] += flushing_flow / 3.6 - elif units == "CMH": - r["demand"] += flushing_flow - cs.append(emitter_demand) - set_demand(new_name, cs) - else: - pipes = get_node_links(new_name, drainage_node_ID) - flush_diameter = 50 - for pipe in pipes: - d = get_pipe(new_name, pipe)["diameter"] - if flush_diameter < d: - flush_diameter = d - flush_diameter /= 1000 - emitter_coeff = ( - 0.65 * 3.14 * (flush_diameter * flush_diameter / 4) * sqrt(19.6) * 1000 - ) # 全开口面积作为coeff - - old_emitter = get_emitter(new_name, drainage_node_ID) - if old_emitter != None: - old_emitter["coefficient"] = emitter_coeff # 爆管的emitter coefficient设置 - else: - old_emitter = {"junction": drainage_node_ID, "coefficient": emitter_coeff} - new_emitter = ChangeSet() - new_emitter.append(old_emitter) - set_emitter(new_name, new_emitter) - # step 3. run simulation - # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA - options = get_option(new_name) - options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA - options["REQUIRED PRESSURE"] = "20.0000" - cs_options = ChangeSet() - cs_options.append(options) - set_option(new_name, cs_options) - # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, - # downloading_prohibition=True) - simulation.run_simulation( - name=new_name, - simulation_type="extended", - modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, - modify_valve_opening=modify_valve_opening, - scheme_Type="flushing_Analysis", - scheme_Name=scheme_Name, - ) - # step 4. restore the base model - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # return result - - -############################################################ -# Contaminant simulation 04 -# -############################################################ -def contaminant_simulation( - name: str, - modify_pattern_start_time: str, # 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' - modify_total_duration: int = 900, # 模拟总历时,秒 - source: str = None,# 污染源节点ID - concentration: float = None, # 污染源浓度,单位mg/L - source_pattern: str = None, # 污染源时间变化模式名称 - scheme_Name: str = None, -) -> None: - """ - 污染模拟 - :param name: 模型名称,数据库中对应的名字 - :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' - :param modify_total_duration: 模拟总历时,秒 - :param source: 污染源所在的节点ID - :param concentration: 污染源位置处的浓度,单位mg/L,即默认的污染模拟setting为concentration(应改为 Set point booster) - :param source_pattern: 污染源的时间变化模式,若不传入则默认以恒定浓度持续模拟,时间长度等于duration; - 若传入,则格式为{1.0,0.5,1.1}等系数列表pattern_step模拟等于模型的hydraulic time step - :param scheme_Name: 方案名称 - :return: - """ - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"contaminant_Sim_{name}" - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # if is_project_open(name): - # close_project(name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - dic_time = get_time(new_name) - dic_time["QUALITY TIMESTEP"] = "0:05:00" - cs = ChangeSet() - cs.operations.append(dic_time) - set_time(new_name, cs) # set QUALITY TIMESTEP - time_option = get_time(new_name) - hydraulic_step = time_option["HYDRAULIC TIMESTEP"] - secs = from_clock_to_seconds_2(hydraulic_step) - operation_step = 0 - # step 1. set duration - if modify_total_duration == None: - modify_total_duration = secs - # step 2. set pattern - if source_pattern != None: - pt = get_pattern(new_name, source_pattern) - if pt == None: - str_response = str("cant find source_pattern") - return str_response - else: - cs_pattern = ChangeSet() - pt = {} - factors = [] - tmp_duration = modify_total_duration - while tmp_duration > 0: - factors.append(1.0) - tmp_duration = tmp_duration - secs - pt["id"] = "contam_pt" - pt["factors"] = factors - cs_pattern.append(pt) - add_pattern(new_name, cs_pattern) - operation_step += 1 - # step 3. set source/initial quality - # source quality - cs_source = ChangeSet() - source_schema = { - "node": source, - "s_type": SOURCE_TYPE_CONCEN, - "strength": concentration, - "pattern": pt["id"], - } - cs_source.append(source_schema) - source_node = get_source(new_name, source) - if len(source_node) == 0: - add_source(new_name, cs_source) - else: - set_source(new_name, cs_source) - dict_demand = get_demand(new_name, source) - for demands in dict_demand["demands"]: - dict_demand["demands"][dict_demand["demands"].index(demands)]["demand"] = -1 - dict_demand["demands"][dict_demand["demands"].index(demands)]["pattern"] = None - cs = ChangeSet() - cs.append(dict_demand) - set_demand(new_name, cs) # set inflow node - # # initial quality - # dict_quality = get_quality(new_name, source) - # dict_quality['quality'] = concentration - # cs = ChangeSet() - # cs.append(dict_quality) - # set_quality(new_name, cs) - operation_step += 1 - # step 4 set option of quality to chemical - opt = get_option(new_name) - opt["QUALITY"] = OPTION_QUALITY_CHEMICAL - cs_option = ChangeSet() - cs_option.append(opt) - set_option(new_name, cs_option) - operation_step += 1 - # step 5. run simulation - # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, - # downloading_prohibition=True) - simulation.run_simulation( - name=new_name, - simulation_type="extended", - modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, - scheme_Type="contaminant_Analysis", - scheme_Name=scheme_Name, - ) - - # for i in range(1,operation_step): - # execute_undo(name) - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # return result - - -############################################################ -# age analysis 05 ***水龄模拟目前还没和实时模拟打通,不确定是否需要,先不要使用*** -############################################################ - - -def age_analysis( - name: str, modify_pattern_start_time: str, modify_total_duration: int = 900 -) -> None: - """ - 水龄模拟 - :param name: 模型名称,数据库中对应的名字 - :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' - :param modify_total_duration: 模拟总历时,秒 - :return: - """ - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"age_Anal_{name}" - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # if is_project_open(name): - # close_project(name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - # step 1. run simulation - - result = run_simulation_ex( - new_name, - "realtime", - modify_pattern_start_time, - modify_total_duration, - downloading_prohibition=True, - ) - # step 2. restore the base model status - # execute_undo(name) #有疑惑 - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - output = Output("./temp/{}.db.out".format(new_name)) - # element_name = output.element_name() - # node_name = element_name['nodes'] - # link_name = element_name['links'] - nodes_age = [] - node_result = output.node_results() - for node in node_result: - nodes_age.append(node["result"][-1]["quality"]) - links_age = [] - link_result = output.link_results() - for link in link_result: - links_age.append(link["result"][-1]["quality"]) - age_result = {"nodes": nodes_age, "links": links_age} - # age_result = {'nodes': nodes_age, 'links': links_age, 'nodeIDs': node_name, 'linkIDs': link_name} - return json.dumps(age_result) - - -############################################################ -# pressure regulation 06 -############################################################ - - -def pressure_regulation( - name: str, - modify_pattern_start_time: str, - modify_total_duration: int = 900, - modify_tank_initial_level: dict[str, float] = None, - modify_fixed_pump_pattern: dict[str, list] = None, - modify_variable_pump_pattern: dict[str, list] = None, - scheme_Name: str = None, -) -> None: - """ - 区域调压模拟,用来模拟未来15分钟内,开关水泵对区域压力的影响 - :param name: 模型名称,数据库中对应的名字 - :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' - :param modify_total_duration: 模拟总历时,秒 - :param modify_tank_initial_level: dict中包含多个水塔,str为水塔的id,float为修改后的initial_level - :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern - :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern - :param scheme_Name: 模拟方案名称 - :return: - """ - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"pressure_regulation_{name}" - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # if is_project_open(name): - # close_project(name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - # 全部关泵后,压力计算不合理,改为压力驱动PDA - options = get_option(new_name) - options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA - options["REQUIRED PRESSURE"] = "15.0000" - cs_options = ChangeSet() - cs_options.append(options) - set_option(new_name, cs_options) - # result = run_simulation_ex(name=new_name, - # simulation_type='realtime', - # start_datetime=start_datetime, - # duration=900, - # pump_control=pump_control, - # tank_initial_level_control=tank_initial_level_control, - # downloading_prohibition=True) - simulation.run_simulation( - name=new_name, - simulation_type="extended", - modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, - modify_tank_initial_level=modify_tank_initial_level, - modify_fixed_pump_pattern=modify_fixed_pump_pattern, - modify_variable_pump_pattern=modify_variable_pump_pattern, - scheme_Type="pressure_regulation", - scheme_Name=scheme_Name, - ) - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # return result - - -############################################################ -# project management 07 ***暂时不使用,与业务需求无关*** -############################################################ - - -def project_management( - prj_name, - start_datetime, - pump_control, - tank_initial_level_control=None, - region_demand_control=None, -) -> str: - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"project_management_{prj_name}" - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # if is_project_open(prj_name): - # close_project(prj_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(prj_name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(prj_name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - result = run_simulation_ex( - name=new_name, - simulation_type="realtime", - start_datetime=start_datetime, - duration=86400, - pump_control=pump_control, - tank_initial_level_control=tank_initial_level_control, - region_demand_control=region_demand_control, - downloading_prohibition=True, - ) - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - return result - - -############################################################ -# scheduling analysis 08 ***暂时不使用,与业务需求无关*** -############################################################ - - -def scheduling_simulation( - prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300 -) -> str: - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"scheduling_{prj_name}" - - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # if is_project_open(prj_name): - # close_project(prj_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(prj_name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(prj_name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - - run_simulation_ex( - new_name, "realtime", start_time, duration=0, pump_control=pump_control - ) - - if not is_project_open(new_name): - open_project(new_name) - - tank = get_tank(new_name, tank_id) # 水塔信息 - tank_floor_space = pi * pow(tank["diameter"] / 2, 2) # 水塔底面积(m^2) - tank_init_level = tank["init_level"] # 水塔初始水位(m) - tank_pipes_id = tank["links"] # pipes list - - tank_pipe_flow_direction = ( - {} - ) # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1 - for pipe_id in tank_pipes_id: - if get_pipe(new_name, pipe_id)["node2"] == tank_id: # 水塔为下游节点 - tank_pipe_flow_direction[pipe_id] = 1 - else: - tank_pipe_flow_direction[pipe_id] = -1 - - output = Output("./temp/{}.db.out".format(new_name)) - - node_results = ( - output.node_results() - ) # [{'node': str, 'result': [{'pressure': float}]}] - water_plant_output_pressure = 0 - for node_result in node_results: - if node_result["node"] == water_plant_output_id: # 水厂出水压力(m) - water_plant_output_pressure = node_result["result"][-1]["pressure"] - water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa) - - pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}] - tank_inflow = 0 - for pipe_result in pipe_results: - for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道 - if pipe_result["link"] == pipe_id: # 水塔入流流量(L/s) - tank_inflow += ( - pipe_result["result"][-1]["flow"] - * tank_pipe_flow_direction[pipe_id] - ) - tank_inflow /= 1000 # 水塔入流流量(m^3/s) - tank_level_delta = tank_inflow * time_delta / tank_floor_space # 水塔水位改变值(m) - tank_level = tank_init_level + tank_level_delta # 预计水塔水位(m) - - simulation_results = { - "water_plant_output_pressure": water_plant_output_pressure, - "tank_init_level": tank_init_level, - "tank_level": tank_level, - } - - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - - return json.dumps(simulation_results) - - -def daily_scheduling_simulation( - prj_name, start_time, pump_control, reservoir_id, tank_id, water_plant_output_id -) -> str: - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Analysis." - ) - new_name = f"daily_scheduling_{prj_name}" - - if have_project(new_name): - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - # if is_project_open(prj_name): - # close_project(prj_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Copying Database." - ) - # CopyProjectEx()(prj_name, new_name, - # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - copy_project(prj_name + "_template", new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Start Opening Database." - ) - open_project(new_name) - print( - datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") - + " -- Database Loading OK." - ) - - run_simulation_ex( - new_name, "realtime", start_time, duration=86400, pump_control=pump_control - ) - - if not is_project_open(new_name): - open_project(new_name) - - output = Output("./temp/{}.db.out".format(new_name)) - - node_results = ( - output.node_results() - ) # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] - water_plant_output_pressure = [] - reservoir_level = [] - tank_level = [] - for node_result in node_results: - if node_result["node"] == water_plant_output_id: - for result in node_result["result"]: - water_plant_output_pressure.append( - result["pressure"] / 100 - ) # 水厂出水压力(Mpa) - elif node_result["node"] == reservoir_id: - for result in node_result["result"]: - reservoir_level.append(result["head"] - 250.35) # 清水池液位(m) - elif node_result["node"] == tank_id: - for result in node_result["result"]: - tank_level.append(result["pressure"]) # 调节池液位(m) - - simulation_results = { - "water_plant_output_pressure": water_plant_output_pressure, - "reservoir_level": reservoir_level, - "tank_level": tank_level, - } - - if is_project_open(new_name): - close_project(new_name) - delete_project(new_name) - - return json.dumps(simulation_results) - - -############################################################ -# network_update 10 -############################################################ - - -def network_update(file_path: str) -> None: - """ - 更新pg数据库中的inp文件 - :param file_path: inp文件 - :return: - """ - read_inp("szh", file_path) - - csv_path = "./history_pattern_flow.csv" - - # # 检查文件是否存在 - # if os.path.exists(csv_path): - # print(f"history_patterns_flows文件存在,开始处理...") - # - # # 读取 CSV 文件 - # df = pd.read_csv(csv_path) - # - # # 连接到 PostgreSQL 数据库(这里是数据库 "bb") - # with psycopg.connect("dbname=bb host=127.0.0.1") as conn: - # with conn.cursor() as cur: - # for index, row in df.iterrows(): - # # 直接将数据插入,不进行唯一性检查 - # insert_sql = sql.SQL(""" - # INSERT INTO history_patterns_flows (id, factor, flow) - # VALUES (%s, %s, %s); - # """) - # # 将数据插入数据库 - # cur.execute(insert_sql, (row['id'], row['factor'], row['flow'])) - # conn.commit() - # print("数据成功导入到 'history_patterns_flows' 表格。") - # else: - # print(f"history_patterns_flows文件不存在。") - # 检查文件是否存在 - if os.path.exists(csv_path): - print(f"history_patterns_flows文件存在,开始处理...") - - # 连接到 PostgreSQL 数据库(这里是数据库 "bb") - with psycopg.connect(f"dbname={project_info.name} host=127.0.0.1") as conn: - with conn.cursor() as cur: - with open(csv_path, newline="", encoding="utf-8-sig") as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - # 直接将数据插入,不进行唯一性检查 - insert_sql = sql.SQL( - """ - INSERT INTO history_patterns_flows (id, factor, flow) - VALUES (%s, %s, %s); - """ - ) - # 将数据插入数据库 - cur.execute(insert_sql, (row["id"], row["factor"], row["flow"])) - conn.commit() - print("数据成功导入到 'history_patterns_flows' 表格。") - else: - print(f"history_patterns_flows文件不存在。") - - -def submit_scada_info(name: str, coord_id: str) -> None: - """ - 将scada信息表导入pg数据库 - :param name: 项目名称(数据库名称) - :param coord_id: 坐标系的id,如4326,根据原始坐标信息输入 - :return: - """ - scada_info_path = "./scada_info.csv" - # 检查文件是否存在 - if os.path.exists(scada_info_path): - print(f"scada_info文件存在,开始处理...") - - # 自动检测文件编码 - with open(scada_info_path, "rb") as file: - raw_data = file.read() - detected = chardet.detect(raw_data) - file_encoding = detected["encoding"] - print(f"检测到的文件编码:{file_encoding}") - try: - # 动态替换数据库名称 - conn_string = get_pgconn_string(db_name=name) - - # 连接到 PostgreSQL 数据库(这里是数据库 "bb") - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - # 检查 scada_info 表是否为空 - cur.execute("SELECT COUNT(*) FROM scada_info;") - count = cur.fetchone()[0] - - if count > 0: - print("scada_info表中已有数据,正在清空记录...") - cur.execute("DELETE FROM scada_info;") - print("表记录已清空。") - - with open( - scada_info_path, newline="", encoding=file_encoding - ) as csvfile: - reader = csv.DictReader(csvfile) - for row in reader: - # 将CSV单元格值为空的字段转换为 None - cleaned_row = { - key: (value if value.strip() else None) - for key, value in row.items() - } - - # 处理 associated_source_outflow_id 列动态变化 - associated_columns = [ - f"associated_source_outflow_id{i}" for i in range(1, 21) - ] - associated_values = [ - ( - cleaned_row.get(col).strip() - if cleaned_row.get(col) - and cleaned_row.get(col).strip() - else None - ) - for col in associated_columns - ] - - # 将 X_coor 和 Y_coor 转换为 geometry 类型 - x_coor = ( - float(cleaned_row["X_coor"]) - if cleaned_row["X_coor"] - else None - ) - y_coor = ( - float(cleaned_row["Y_coor"]) - if cleaned_row["Y_coor"] - else None - ) - coord = ( - f"SRID={coord_id};POINT({x_coor} {y_coor})" - if x_coor and y_coor - else None - ) - - # 准备插入 SQL 语句 - insert_sql = sql.SQL( - """ - INSERT INTO scada_info ( - id, type, associated_element_id, associated_pattern, - associated_pipe_flow_id, {associated_columns}, - API_query_id, transmission_mode, transmission_frequency, - reliability, X_coor, Y_coor, coord - ) - VALUES ( - %s, %s, %s, %s, %s, {associated_placeholders}, - %s, %s, %s, %s, %s, %s, %s - ); - """ - ).format( - associated_columns=sql.SQL(", ").join( - sql.Identifier(col) for col in associated_columns - ), - associated_placeholders=sql.SQL(", ").join( - sql.Placeholder() for _ in associated_columns - ), - ) - # 将数据插入数据库 - cur.execute( - insert_sql, - ( - cleaned_row["id"], - cleaned_row["type"], - cleaned_row["associated_element_id"], - cleaned_row.get("associated_pattern"), - cleaned_row.get("associated_pipe_flow_id"), - *associated_values, - cleaned_row.get("API_query_id"), - cleaned_row["transmission_mode"], - cleaned_row["transmission_frequency"], - cleaned_row["reliability"], - x_coor, - y_coor, - coord, - ), - ) - conn.commit() - print("数据成功导入到 'scada_info' 表格。") - except Exception as e: - print(f"导入时出错:{e}") - else: - print(f"scada_info文件不存在。") - - -# 2025/03/23 -def create_user(name: str, username: str, password: str): - """ - 创建用户 - :param name: 数据库名称 - :param username: 用户名 - :param password: 密码 - :return: - """ - try: - # 动态替换数据库名称 - conn_string = get_pgconn_string(db_name=name) - # 连接到 PostgreSQL 数据库(这里是数据库 "bb") - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - cur.execute( - "INSERT INTO users (username, password) VALUES (%s, %s)", - (username, password), - ) - # 提交事务 - conn.commit() - print("新用户创建成功!") - except Exception as e: - print(f"创建用户出错:{e}") - - -# 2025/03/23 -def delete_user(name: str, username: str): - """ - 删除用户 - :param name: 数据库名称 - :param username: 用户名 - :return: - """ - try: - # 动态替换数据库名称 - conn_string = get_pgconn_string(db_name=name) - # 连接到 PostgreSQL 数据库(这里是数据库 "bb") - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - cur.execute("DELETE FROM users WHERE username = %s", (username,)) - conn.commit() - print(f"用户 {username} 删除成功!") - except Exception as e: - print(f"删除用户出错:{e}") - - -# 2025/03/23 -def scheme_name_exists(name: str, scheme_name: str) -> bool: - """ - 判断传入的 scheme_name 是否已存在于 scheme_list 表中,用于输入框判断 - :param name: 数据库名称 - :param scheme_name: 需要判断的方案名称 - :return: 如果存在返回 True,否则返回 False - """ - try: - conn_string = get_pgconn_string(db_name=name) - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - cur.execute( - "SELECT COUNT(*) FROM scheme_list WHERE scheme_name = %s", - (scheme_name,), - ) - result = cur.fetchone() - if result is not None and result[0] > 0: - return True - else: - return False - except Exception as e: - print(f"查询 scheme_name 时出错:{e}") - return False - - -# 2025/03/23 -def store_scheme_info( - name: str, - scheme_name: str, - scheme_type: str, - username: str, - scheme_start_time: str, - scheme_detail: dict, -): - """ - 将一条方案记录插入 scheme_list 表中 - :param name: 数据库名称 - :param scheme_name: 方案名称 - :param scheme_type: 方案类型 - :param username: 用户名(需在 users 表中已存在) - :param scheme_start_time: 方案起始时间(字符串) - :param scheme_detail: 方案详情(字典,会转换为 JSON) - :return: - """ - try: - conn_string = get_pgconn_string(db_name=name) - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - sql = """ - INSERT INTO scheme_list (scheme_name, scheme_type, username, scheme_start_time, scheme_detail) - VALUES (%s, %s, %s, %s, %s) - """ - # 将字典转换为 JSON 字符串 - scheme_detail_json = json.dumps(scheme_detail) - cur.execute( - sql, - ( - scheme_name, - scheme_type, - username, - scheme_start_time, - scheme_detail_json, - ), - ) - conn.commit() - print("方案信息存储成功!") - except Exception as e: - print(f"存储方案信息时出错:{e}") - - -# 2025/03/23 -def delete_scheme_info(name: str, scheme_name: str) -> None: - """ - 从 scheme_list 表中删除指定的方案 - :param name: 数据库名称 - :param scheme_name: 要删除的方案名称 - """ - try: - conn_string = get_pgconn_string(db_name=name) - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - # 使用参数化查询删除方案记录 - cur.execute( - "DELETE FROM scheme_list WHERE scheme_name = %s", (scheme_name,) - ) - conn.commit() - print(f"方案 {scheme_name} 删除成功!") - except Exception as e: - print(f"删除方案时出错:{e}") - - -# 2025/03/23 -def query_scheme_list(name: str) -> list: - """ - 查询pg数据库中的scheme_list,按照 create_time 降序排列,离现在时间最近的记录排在最前面 - :param name: 项目名称(数据库名称) - :return: 返回查询结果的所有行 - """ - try: - # 动态替换数据库名称 - conn_string = get_pgconn_string(db_name=name) - # 连接到 PostgreSQL 数据库(这里是数据库 "bb") - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - # 按 create_time 降序排列 - cur.execute("SELECT * FROM scheme_list ORDER BY create_time DESC") - rows = cur.fetchall() - return rows - - except Exception as e: - print(f"查询错误:{e}") - - -# 2025/03/23 -def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str): - """ - 将 Shapefile 文件上传到 PostgreSQL 数据库 - :param name: 项目名称(数据库名称) - :param table_name: 创建表的名字 - :param role: 数据库角色名,位于c盘user中查看 - :param shp_file_path: shp文件的路径 - :return: - """ - try: - # 动态连接到指定的数据库 - conn_string = get_pgconn_string(db_name=name) - with psycopg.connect(conn_string) as conn: - # 读取 Shapefile 文件 - gdf = gpd.read_file(shp_file_path) - - # 检查投影坐标系(CRS),并确保是 EPSG:4326 - if gdf.crs.to_string() != "EPSG:4490": - gdf = gdf.to_crs(epsg=4490) - - # 使用 GeoDataFrame 的 .to_postgis 方法将数据写入 PostgreSQL - # 需要在数据库中提前安装 PostGIS 扩展 - engine = create_engine(f"postgresql+psycopg2://{role}:@127.0.0.1/{name}") - gdf.to_postgis( - table_name, engine, if_exists="replace", index=True, index_label="id" - ) - - print( - f"Shapefile 文件成功上传到 PostgreSQL 数据库 '{name}' 的表 '{table_name}'." - ) - - except Exception as e: - print(f"上传 Shapefile 到 PostgreSQL 时出错:{e}") - - -def submit_risk_probability_result(name: str, result_file_path: str) -> None: - """ - 将管网风险评估结果导入pg数据库 - :param name: 项目名称(数据库名称) - :param result_file_path: 结果文件路径 - :return: - """ - # 自动检测文件编码 - # with open({result_file_path}, 'rb') as file: - # raw_data = file.read() - # detected = chardet.detect(raw_data) - # file_encoding = detected['encoding'] - # print(f"检测到的文件编码:{file_encoding}") - - try: - # 动态替换数据库名称 - conn_string = get_pgconn_string(db_name=name) - - # 连接到 PostgreSQL 数据库 - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - # 检查 scada_info 表是否为空 - cur.execute("SELECT COUNT(*) FROM pipe_risk_probability;") - count = cur.fetchone()[0] - - if count > 0: - print("pipe_risk_probability表中已有数据,正在清空记录...") - cur.execute("DELETE FROM pipe_risk_probability;") - print("表记录已清空。") - - # 读取Excel并转换x/y列为列表 - df = pd.read_excel(result_file_path, sheet_name="Sheet1") - df["x"] = df["x"].apply(ast.literal_eval) - df["y"] = df["y"].apply(ast.literal_eval) - - # 批量插入数据 - for index, row in df.iterrows(): - insert_query = """ - INSERT INTO pipe_risk_probability - (pipeID, pipeage, risk_probability_now, x, y) - VALUES (%s, %s, %s, %s, %s) - """ - cur.execute( - insert_query, - ( - row["pipeID"], - row["pipeage"], - row["risk_probability_now"], - row["x"], # 直接传递列表 - row["y"], # 同上 - ), - ) - - conn.commit() - print("风险评估结果导入成功") - - except Exception as e: - print(f"导入时出错:{e}") - - -def pressure_sensor_placement_sensitivity( - name: str, scheme_name: str, sensor_number: int, min_diameter: int, username: str -) -> None: - """ - 基于改进灵敏度法进行压力监测点优化布置 - :param name: 数据库名称 - :param scheme_name: 监测优化布置方案名称 - :param sensor_number: 传感器数目 - :param min_diameter: 最小管径 - :param username: 用户名 - :return: - """ - sensor_location = sensitivity.get_ID( - name=name, sensor_num=sensor_number, min_diameter=min_diameter - ) - try: - conn_string = get_pgconn_string(db_name=name) - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - sql = """ - INSERT INTO sensor_placement (scheme_name, sensor_number, min_diameter, username, sensor_location) - VALUES (%s, %s, %s, %s, %s) - """ - - cur.execute( - sql, - ( - scheme_name, - sensor_number, - min_diameter, - username, - sensor_location, - ), - ) - conn.commit() - print("方案信息存储成功!") - except Exception as e: - print(f"存储方案信息时出错:{e}") - - -# 2025/08/21 -# 基于kmeans聚类法进行压力监测点优化布置 -def pressure_sensor_placement_kmeans( - name: str, scheme_name: str, sensor_number: int, min_diameter: int, username: str -) -> None: - """ - 基于聚类法进行压力监测点优化布置 - :param name: 数据库名称(注意,此处数据库名称也是inp文件名称,inp文件与pg库名要一样) - :param scheme_name: 监测优化布置方案名称 - :param sensor_number: 传感器数目 - :param min_diameter: 最小管径 - :param username: 用户名 - :return: - """ - # dump_inp - inp_name = f"./db_inp/{name}.db.inp" - dump_inp(name, inp_name, "2") - sensor_location = kmeans_sensor.kmeans_sensor_placement( - name=name, sensor_num=sensor_number, min_diameter=min_diameter - ) - try: - conn_string = get_pgconn_string(db_name=name) - with psycopg.connect(conn_string) as conn: - with conn.cursor() as cur: - sql = """ - INSERT INTO sensor_placement (scheme_name, sensor_number, min_diameter, username, sensor_location) - VALUES (%s, %s, %s, %s, %s) - """ - - cur.execute( - sql, - ( - scheme_name, - sensor_number, - min_diameter, - username, - sensor_location, - ), - ) - conn.commit() - print("方案信息存储成功!") - except Exception as e: - print(f"存储方案信息时出错:{e}") - - -############################################################ -# 流量监测数据清洗 ***卡尔曼滤波法*** -############################################################ -# 2025/08/21 hxyan - - -def flow_data_clean(input_csv_file: str) -> str: - """ - 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 - 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 - :param: input_csv_file: 输入的 CSV 文件明或路径 - :return: 输出文件的绝对路径 - """ - - # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 - script_dir = os.path.dirname(os.path.abspath(__file__)) - input_csv_path = os.path.join(script_dir, input_csv_file) - - # 检查文件是否存在 - if not os.path.exists(input_csv_path): - raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") - # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = Fdataclean.clean_flow_data_kf(input_csv_path) - print("清洗后的数据已保存到:", out_xlsx_path) - - -############################################################ -# 压力监测数据清洗 ***kmean++法*** -############################################################ -# 2025/08/21 hxyan - - -def pressure_data_clean(input_csv_file: str) -> str: - """ - 读取 input_csv_path 中的每列时间序列,使用Kmean++清洗数据。 - 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 - 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 - :param input_csv_path: 输入的 CSV 文件路径 - :return: 输出文件的绝对路径 - """ - - # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 - script_dir = os.path.dirname(os.path.abspath(__file__)) - input_csv_path = os.path.join(script_dir, input_csv_file) - - # 检查文件是否存在 - if not os.path.exists(input_csv_path): - raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") - # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = Pdataclean.clean_pressure_data_km(input_csv_path) - print("清洗后的数据已保存到:", out_xlsx_path) - - -if __name__ == "__main__": - # contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800) - # flushing_analysis('bb_model','2024-04-01T08:00:00Z',{'GSD230719205857733F8F5214FF','GSD230719205857C0AF65B6A170'},'GSD2307192058570DEDF28E4F73',0,duration=900) - # flushing_analysis('bb_model', '2024-08-26T08:00:00Z', ['GSD2307192058572E5C0E14D83E'], [0.5], 'ZBBDTZDP009410', 0, - # duration=1800) - # valve_close_analysis('bb_model','2024-04-01T08:00:00Z',['GSD2307192058576122D929EE99(L)'],duration=1800) - - # burst_analysis('bb','2024-04-01T08:00:00Z','ZBBGXSZW000001',burst_size=200,duration=1800) - # run_simulation('beibeizone','2024-04-01T08:00:00Z') - # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_no_burst.out') - # with open("out_dump.txt", "w") as f: - # f.write(str_dump) - # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_busrtID(ZBBGXSZW000001).out') - # with open("burst_out_dump.txt", "w") as f: - # f.write(str_dump) - - # # 更新inp文件,并插入history_patterns_flows - # network_update('fx0217-mass injection.inp') - - # # 更新scada_info文件 - # submit_scada_info(project_info.name, '4490') - - # 示例:scheme_name_exists - # if scheme_name_exists(name='bb', scheme_name='burst_scheme'): - # print(f"方案名已存在,请更改!") - # else: - # print(f"方案名不存在,可以使用。") - - # 示例1:burst_analysis - # burst_analysis(name='bb', modify_pattern_start_time='2025-04-17T00:00:00+08:00', - # burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_Name='GSD230112144241FA18292A84CB_400') - - # 示例:create_user - # create_user(name=project_info.name, username='tjwater dev', password='123456') - - # # 示例:delete_user - # delete_user(name=project_info.name, username='admin_test') - - # # 示例:query_scheme_list - # result = query_scheme_list(name=project_info.name) - # print(result) - - # # 示例:delete_scheme_info - # delete_scheme_info(name=project_info.name, scheme_name='burst_scheme') - - # # 示例:upload_shp_to_pg - # # 这里的role是 电脑的用户名,服务器上是 Administrator - # upload_shp_to_pg(name=project_info.name, table_name='GIS_pipe', role='Administrator', shp_file_path='市政管线.shp') - - # # 示例:submit_risk_probability_result - # submit_risk_probability_result(name=project_info.name, result_file_path='./北碚市政管线风险评价结果.xlsx') - - # # 示例:pressure_sensor_placement_sensitivity - # pressure_sensor_placement_sensitivity(name=project_info.name, scheme_name='20250517', sensor_number=10, min_diameter=300, username='admin') - - # 示例:pressure_sensor_placement_kmeans - # pressure_sensor_placement_kmeans(name=project_info.name, scheme_name='sensor_1103', sensor_number=35, min_diameter=300, username='admin') - - # 测试:convert emitters coefficients - convert_to_local_unit("szh", 100) diff --git a/app/algorithms/sensors.py b/app/algorithms/sensors.py new file mode 100644 index 0000000..4f0387c --- /dev/null +++ b/app/algorithms/sensors.py @@ -0,0 +1,91 @@ +import psycopg + +import app.algorithms.api_ex.kmeans_sensor as kmeans_sensor +import app.algorithms.api_ex.sensitivity as sensitivity +from app.native.api.postgresql_info import get_pgconn_string +from app.services.tjnetwork import dump_inp + + +def pressure_sensor_placement_sensitivity( + name: str, scheme_name: str, sensor_number: int, min_diameter: int, username: str +) -> None: + """ + 基于改进灵敏度法进行压力监测点优化布置 + :param name: 数据库名称 + :param scheme_name: 监测优化布置方案名称 + :param sensor_number: 传感器数目 + :param min_diameter: 最小管径 + :param username: 用户名 + :return: + """ + sensor_location = sensitivity.get_ID( + name=name, sensor_num=sensor_number, min_diameter=min_diameter + ) + try: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + sql = """ + INSERT INTO sensor_placement (scheme_name, sensor_number, min_diameter, username, sensor_location) + VALUES (%s, %s, %s, %s, %s) + """ + + cur.execute( + sql, + ( + scheme_name, + sensor_number, + min_diameter, + username, + sensor_location, + ), + ) + conn.commit() + print("方案信息存储成功!") + except Exception as e: + print(f"存储方案信息时出错:{e}") + + +# 2025/08/21 +# 基于kmeans聚类法进行压力监测点优化布置 +def pressure_sensor_placement_kmeans( + name: str, scheme_name: str, sensor_number: int, min_diameter: int, username: str +) -> None: + """ + 基于聚类法进行压力监测点优化布置 + :param name: 数据库名称(注意,此处数据库名称也是inp文件名称,inp文件与pg库名要一样) + :param scheme_name: 监测优化布置方案名称 + :param sensor_number: 传感器数目 + :param min_diameter: 最小管径 + :param username: 用户名 + :return: + """ + # dump_inp + inp_name = f"./db_inp/{name}.db.inp" + dump_inp(name, inp_name, "2") + sensor_location = kmeans_sensor.kmeans_sensor_placement( + name=name, sensor_num=sensor_number, min_diameter=min_diameter + ) + try: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + sql = """ + INSERT INTO sensor_placement (scheme_name, sensor_number, min_diameter, username, sensor_location) + VALUES (%s, %s, %s, %s, %s) + """ + + cur.execute( + sql, + ( + scheme_name, + sensor_number, + min_diameter, + username, + sensor_location, + ), + ) + conn.commit() + print("方案信息存储成功!") + except Exception as e: + print(f"存储方案信息时出错:{e}") diff --git a/app/algorithms/simulations.py b/app/algorithms/simulations.py new file mode 100644 index 0000000..24f3d60 --- /dev/null +++ b/app/algorithms/simulations.py @@ -0,0 +1,688 @@ +import json +from datetime import datetime +from math import pi, sqrt + +import pytz + +import app.services.simulation as simulation +from app.algorithms.api_ex.run_simulation import run_simulation_ex, from_clock_to_seconds_2 +from app.native.api.project import copy_project +from app.services.epanet.epanet import Output +from app.services.scheme_management import store_scheme_info +from app.services.tjnetwork import * + + +############################################################ +# burst analysis 01 +############################################################ +def convert_to_local_unit(proj: str, emitters: float) -> float: + open_project(proj) + proj_opt = get_option(proj) + str_unit = proj_opt.get("UNITS") + + if str_unit == "CMH": + return emitters * 3.6 + elif str_unit == "LPS": + return emitters + elif str_unit == "CMS": + return emitters / 1000.0 + elif str_unit == "MGD": + return emitters * 0.0438126 + + # Unknown unit: log and return original value + print(str_unit) + return emitters + + +def burst_analysis( + name: str, + modify_pattern_start_time: str, + burst_ID: list | str = None, + burst_size: list | float | int = None, + modify_total_duration: int = 900, + modify_fixed_pump_pattern: dict[str, list] = None, + modify_variable_pump_pattern: dict[str, list] = None, + modify_valve_opening: dict[str, float] = None, + scheme_Name: str = None, +) -> None: + """ + 爆管模拟 + :param name: 模型名称,数据库中对应的名字 + :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' + :param burst_ID: 爆管管道的ID,选取的是管道,单独传入一个爆管管道,可以是str或list,传入多个爆管管道是用list + :param burst_size: 爆管管道破裂的孔口面积,和burst_ID列表各位置的ID对应,以cm*cm计算 + :param modify_total_duration: 模拟总历时,秒 + :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern + :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern + :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 + :param scheme_Name: 方案名称 + :return: + """ + scheme_detail: dict = { + "burst_ID": burst_ID, + "burst_size": burst_size, + "modify_total_duration": modify_total_duration, + "modify_fixed_pump_pattern": modify_fixed_pump_pattern, + "modify_variable_pump_pattern": modify_variable_pump_pattern, + "modify_valve_opening": modify_valve_opening, + } + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"burst_Anal_{name}" + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + simulation.run_simulation( + name=new_name, + simulation_type="manually_temporary", + modify_pattern_start_time=modify_pattern_start_time, + ) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + ##step 1 set the emitter coefficient of end node of busrt pipe + if isinstance(burst_ID, list): + if (burst_size is not None) and (type(burst_size) is not list): + return json.dumps("Type mismatch.") + elif isinstance(burst_ID, str): + burst_ID = [burst_ID] + if burst_size is not None: + if isinstance(burst_size, float) or isinstance(burst_size, int): + burst_size = [burst_size] + else: + return json.dumps("Type mismatch.") + else: + return json.dumps("Type mismatch.") + if burst_size is None: + burst_size = [-1] * len(burst_ID) + elif len(burst_size) < len(burst_ID): + burst_size += [-1] * (len(burst_ID) - len(burst_size)) + elif len(burst_size) > len(burst_ID): + # burst_size = burst_size[:len(burst_ID)] + return json.dumps("Length mismatch.") + for burst_ID_, burst_size_ in zip(burst_ID, burst_size): + pipe = get_pipe(new_name, burst_ID_) + str_start_node = pipe["node1"] + str_end_node = pipe["node2"] + d_pipe = pipe["diameter"] / 1000.0 + if burst_size_ <= 0: + burst_size_ = 3.14 * d_pipe * d_pipe / 4 / 8 + else: + burst_size_ = burst_size_ / 10000 + emitter_coeff = ( + 0.65 * burst_size_ * sqrt(19.6) * 1000 + ) # 1/8开口面积作为coeff,单位 L/S + emitter_coeff = convert_to_local_unit(new_name, emitter_coeff) + emitter_node = "" + if is_junction(new_name, str_end_node): + emitter_node = str_end_node + elif is_junction(new_name, str_start_node): + emitter_node = str_start_node + old_emitter = get_emitter(new_name, emitter_node) + if old_emitter != None: + old_emitter["coefficient"] = emitter_coeff # 爆管的emitter coefficient设置 + else: + old_emitter = {"junction": emitter_node, "coefficient": emitter_coeff} + new_emitter = ChangeSet() + new_emitter.append(old_emitter) + set_emitter(new_name, new_emitter) + # step 2. run simulation + # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA + options = get_option(new_name) + options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA + options["REQUIRED PRESSURE"] = "10.0000" + cs_options = ChangeSet() + cs_options.append(options) + set_option(new_name, cs_options) + # valve_control = None + # if modify_valve_opening is not None: + # valve_control = {} + # for valve in modify_valve_opening: + # valve_control[valve] = {'status': 'CLOSED'} + # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, + # end_datetime=modify_pattern_start_time, + # modify_total_duration=modify_total_duration, + # modify_pump_pattern=modify_pump_pattern, + # valve_control=valve_control, + # downloading_prohibition=True) + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + modify_fixed_pump_pattern=modify_fixed_pump_pattern, + modify_variable_pump_pattern=modify_variable_pump_pattern, + modify_valve_opening=modify_valve_opening, + scheme_Type="burst_Analysis", + scheme_Name=scheme_Name, + ) + # step 3. restore the base model status + # execute_undo(name) #有疑惑 + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # return result + store_scheme_info( + name=name, + scheme_name=scheme_Name, + scheme_type="burst_Analysis", + username="admin", + scheme_start_time=modify_pattern_start_time, + scheme_detail=scheme_detail, + ) + + +############################################################ +# valve closing analysis 02 +############################################################ +def valve_close_analysis( + name: str, + modify_pattern_start_time: str, + modify_total_duration: int = 900, + modify_valve_opening: dict[str, float] = None, + scheme_Name: str = None, +) -> None: + """ + 关阀模拟 + :param name: 模型名称,数据库中对应的名字 + :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' + :param modify_total_duration: 模拟总历时,秒 + :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 + :param scheme_Name: 方案名称 + :return: + """ + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"valve_close_Anal_{name}" + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + # step 1. change the valves status to 'closed' + # for valve in valves: + # if not is_valve(new_name,valve): + # result='ID:{}is not a valve type'.format(valve) + # return result + # cs=ChangeSet() + # status=get_status(new_name,valve) + # status['status']='CLOSED' + # cs.append(status) + # set_status(new_name,cs) + # step 2. run simulation + # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA + options = get_option(new_name) + options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA + options["REQUIRED PRESSURE"] = "20.0000" + cs_options = ChangeSet() + cs_options.append(options) + set_option(new_name, cs_options) + # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, + # downloading_prohibition=True) + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + modify_valve_opening=modify_valve_opening, + scheme_Type="valve_close_Analysis", + scheme_Name=scheme_Name, + ) + # step 3. restore the base model + # for valve in valves: + # execute_undo(name) + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # return result + + +############################################################ +# flushing analysis 03 +# Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string +############################################################ +def flushing_analysis( + name: str, + modify_pattern_start_time: str, + modify_total_duration: int = 900, + modify_valve_opening: dict[str, float] = None, + drainage_node_ID: str = None, + flushing_flow: float = 0, + scheme_Name: str = None, +) -> None: + """ + 管道冲洗模拟 + :param name: 模型名称,数据库中对应的名字 + :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' + :param modify_total_duration: 模拟总历时,秒 + :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 + :param drainage_node_ID: 冲洗排放口所在节点ID + :param flushing_flow: 冲洗水量,传入参数单位为m3/h + :param scheme_Name: 方案名称 + :return: + """ + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"flushing_Anal_{name}" + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # if is_project_open(name): + # close_project(name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + if not is_junction(new_name, drainage_node_ID): + return "Wrong Drainage node type" + # step 1. change the valves status to 'closed' + # for valve, valve_k in zip(valves, valves_k): + # cs=ChangeSet() + # status=get_status(new_name,valve) + # # status['status']='CLOSED' + # if valve_k == 0: + # status['status'] = 'CLOSED' + # elif valve_k < 1: + # status['status'] = 'OPEN' + # status['setting'] = 0.1036 * pow(valve_k, -3.105) + # cs.append(status) + # set_status(new_name,cs) + units = get_option(new_name) + # step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node + emitter_demand = get_demand(new_name, drainage_node_ID) + cs = ChangeSet() + if flushing_flow > 0: + for r in emitter_demand["demands"]: + if units == "LPS": + r["demand"] += flushing_flow / 3.6 + elif units == "CMH": + r["demand"] += flushing_flow + cs.append(emitter_demand) + set_demand(new_name, cs) + else: + pipes = get_node_links(new_name, drainage_node_ID) + flush_diameter = 50 + for pipe in pipes: + d = get_pipe(new_name, pipe)["diameter"] + if flush_diameter < d: + flush_diameter = d + flush_diameter /= 1000 + emitter_coeff = ( + 0.65 * 3.14 * (flush_diameter * flush_diameter / 4) * sqrt(19.6) * 1000 + ) # 全开口面积作为coeff + + old_emitter = get_emitter(new_name, drainage_node_ID) + if old_emitter != None: + old_emitter["coefficient"] = emitter_coeff # 爆管的emitter coefficient设置 + else: + old_emitter = {"junction": drainage_node_ID, "coefficient": emitter_coeff} + new_emitter = ChangeSet() + new_emitter.append(old_emitter) + set_emitter(new_name, new_emitter) + # step 3. run simulation + # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA + options = get_option(new_name) + options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA + options["REQUIRED PRESSURE"] = "20.0000" + cs_options = ChangeSet() + cs_options.append(options) + set_option(new_name, cs_options) + # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, + # downloading_prohibition=True) + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + modify_valve_opening=modify_valve_opening, + scheme_Type="flushing_Analysis", + scheme_Name=scheme_Name, + ) + # step 4. restore the base model + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # return result + + +############################################################ +# Contaminant simulation 04 +# +############################################################ +def contaminant_simulation( + name: str, + modify_pattern_start_time: str, # 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' + modify_total_duration: int = 900, # 模拟总历时,秒 + source: str = None, # 污染源节点ID + concentration: float = None, # 污染源浓度,单位mg/L + source_pattern: str = None, # 污染源时间变化模式名称 + scheme_Name: str = None, +) -> None: + """ + 污染模拟 + :param name: 模型名称,数据库中对应的名字 + :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' + :param modify_total_duration: 模拟总历时,秒 + :param source: 污染源所在的节点ID + :param concentration: 污染源位置处的浓度,单位mg/L,即默认的污染模拟setting为concentration(应改为 Set point booster) + :param source_pattern: 污染源的时间变化模式,若不传入则默认以恒定浓度持续模拟,时间长度等于duration; + 若传入,则格式为{1.0,0.5,1.1}等系数列表pattern_step模拟等于模型的hydraulic time step + :param scheme_Name: 方案名称 + :return: + """ + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"contaminant_Sim_{name}" + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # if is_project_open(name): + # close_project(name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + dic_time = get_time(new_name) + dic_time["QUALITY TIMESTEP"] = "0:05:00" + cs = ChangeSet() + cs.operations.append(dic_time) + set_time(new_name, cs) # set QUALITY TIMESTEP + time_option = get_time(new_name) + hydraulic_step = time_option["HYDRAULIC TIMESTEP"] + secs = from_clock_to_seconds_2(hydraulic_step) + operation_step = 0 + # step 1. set duration + if modify_total_duration == None: + modify_total_duration = secs + # step 2. set pattern + if source_pattern != None: + pt = get_pattern(new_name, source_pattern) + if pt == None: + str_response = str("cant find source_pattern") + return str_response + else: + cs_pattern = ChangeSet() + pt = {} + factors = [] + tmp_duration = modify_total_duration + while tmp_duration > 0: + factors.append(1.0) + tmp_duration = tmp_duration - secs + pt["id"] = "contam_pt" + pt["factors"] = factors + cs_pattern.append(pt) + add_pattern(new_name, cs_pattern) + operation_step += 1 + # step 3. set source/initial quality + # source quality + cs_source = ChangeSet() + source_schema = { + "node": source, + "s_type": SOURCE_TYPE_CONCEN, + "strength": concentration, + "pattern": pt["id"], + } + cs_source.append(source_schema) + source_node = get_source(new_name, source) + if len(source_node) == 0: + add_source(new_name, cs_source) + else: + set_source(new_name, cs_source) + dict_demand = get_demand(new_name, source) + for demands in dict_demand["demands"]: + dict_demand["demands"][dict_demand["demands"].index(demands)]["demand"] = -1 + dict_demand["demands"][dict_demand["demands"].index(demands)]["pattern"] = None + cs = ChangeSet() + cs.append(dict_demand) + set_demand(new_name, cs) # set inflow node + # # initial quality + # dict_quality = get_quality(new_name, source) + # dict_quality['quality'] = concentration + # cs = ChangeSet() + # cs.append(dict_quality) + # set_quality(new_name, cs) + operation_step += 1 + # step 4 set option of quality to chemical + opt = get_option(new_name) + opt["QUALITY"] = OPTION_QUALITY_CHEMICAL + cs_option = ChangeSet() + cs_option.append(opt) + set_option(new_name, cs_option) + operation_step += 1 + # step 5. run simulation + # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, + # downloading_prohibition=True) + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + scheme_Type="contaminant_Analysis", + scheme_Name=scheme_Name, + ) + + # for i in range(1,operation_step): + # execute_undo(name) + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # return result + + +############################################################ +# age analysis 05 ***水龄模拟目前还没和实时模拟打通,不确定是否需要,先不要使用*** +############################################################ + + +def age_analysis( + name: str, modify_pattern_start_time: str, modify_total_duration: int = 900 +) -> None: + """ + 水龄模拟 + :param name: 模型名称,数据库中对应的名字 + :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' + :param modify_total_duration: 模拟总历时,秒 + :return: + """ + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"age_Anal_{name}" + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # if is_project_open(name): + # close_project(name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + # step 1. run simulation + + result = run_simulation_ex( + new_name, + "realtime", + modify_pattern_start_time, + modify_total_duration, + downloading_prohibition=True, + ) + # step 2. restore the base model status + # execute_undo(name) #有疑惑 + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + output = Output("./temp/{}.db.out".format(new_name)) + # element_name = output.element_name() + # node_name = element_name['nodes'] + # link_name = element_name['links'] + nodes_age = [] + node_result = output.node_results() + for node in node_result: + nodes_age.append(node["result"][-1]["quality"]) + links_age = [] + link_result = output.link_results() + for link in link_result: + links_age.append(link["result"][-1]["quality"]) + age_result = {"nodes": nodes_age, "links": links_age} + # age_result = {'nodes': nodes_age, 'links': links_age, 'nodeIDs': node_name, 'linkIDs': link_name} + return json.dumps(age_result) + + +############################################################ +# pressure regulation 06 +############################################################ + + +def pressure_regulation( + name: str, + modify_pattern_start_time: str, + modify_total_duration: int = 900, + modify_tank_initial_level: dict[str, float] = None, + modify_fixed_pump_pattern: dict[str, list] = None, + modify_variable_pump_pattern: dict[str, list] = None, + scheme_Name: str = None, +) -> None: + """ + 区域调压模拟,用来模拟未来15分钟内,开关水泵对区域压力的影响 + :param name: 模型名称,数据库中对应的名字 + :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' + :param modify_total_duration: 模拟总历时,秒 + :param modify_tank_initial_level: dict中包含多个水塔,str为水塔的id,float为修改后的initial_level + :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern + :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern + :param scheme_Name: 模拟方案名称 + :return: + """ + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"pressure_regulation_{name}" + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # if is_project_open(name): + # close_project(name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + # 全部关泵后,压力计算不合理,改为压力驱动PDA + options = get_option(new_name) + options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA + options["REQUIRED PRESSURE"] = "15.0000" + cs_options = ChangeSet() + cs_options.append(options) + set_option(new_name, cs_options) + # result = run_simulation_ex(name=new_name, + # simulation_type='realtime', + # start_datetime=start_datetime, + # duration=900, + # pump_control=pump_control, + # tank_initial_level_control=tank_initial_level_control, + # downloading_prohibition=True) + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + modify_tank_initial_level=modify_tank_initial_level, + modify_fixed_pump_pattern=modify_fixed_pump_pattern, + modify_variable_pump_pattern=modify_variable_pump_pattern, + scheme_Type="pressure_regulation", + scheme_Name=scheme_Name, + ) + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # return result diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index 792d8f8..c00a5a7 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -8,7 +8,7 @@ from app.services.tjnetwork import ( run_inp, dump_output, ) -from app.algorithms.online_Analysis import ( +from app.algorithms.simulations import ( burst_analysis, valve_close_analysis, flushing_analysis, @@ -16,13 +16,13 @@ from app.algorithms.online_Analysis import ( age_analysis, # scheduling_analysis, pressure_regulation, - project_management, - # daily_scheduling_analysis, - network_update, - # pump_failure, +) +from app.algorithms.sensors import ( pressure_sensor_placement_sensitivity, pressure_sensor_placement_kmeans, ) +from app.services.network_import import network_update +from app.services.simulation_ops import project_management router = APIRouter() diff --git a/app/services/__init__.py b/app/services/__init__.py index e69de29..e4f4031 100644 --- a/app/services/__init__.py +++ b/app/services/__init__.py @@ -0,0 +1,32 @@ +from app.services.network_import import network_update, submit_scada_info +from app.services.scheme_management import ( + create_user, + delete_user, + scheme_name_exists, + store_scheme_info, + delete_scheme_info, + query_scheme_list, + upload_shp_to_pg, + submit_risk_probability_result, +) +from app.services.simulation_ops import ( + project_management, + scheduling_simulation, + daily_scheduling_simulation, +) + +__all__ = [ + "network_update", + "submit_scada_info", + "create_user", + "delete_user", + "scheme_name_exists", + "store_scheme_info", + "delete_scheme_info", + "query_scheme_list", + "upload_shp_to_pg", + "submit_risk_probability_result", + "project_management", + "scheduling_simulation", + "daily_scheduling_simulation", +] diff --git a/app/services/network_import.py b/app/services/network_import.py new file mode 100644 index 0000000..3ac24d7 --- /dev/null +++ b/app/services/network_import.py @@ -0,0 +1,197 @@ +import csv +import os + +import chardet +import psycopg +from psycopg import sql + +import app.services.project_info as project_info +from app.native.api.postgresql_info import get_pgconn_string +from app.services.tjnetwork import read_inp + + +############################################################ +# network_update 10 +############################################################ + + +def network_update(file_path: str) -> None: + """ + 更新pg数据库中的inp文件 + :param file_path: inp文件 + :return: + """ + read_inp("szh", file_path) + + csv_path = "./history_pattern_flow.csv" + + # # 检查文件是否存在 + # if os.path.exists(csv_path): + # print(f"history_patterns_flows文件存在,开始处理...") + # + # # 读取 CSV 文件 + # df = pd.read_csv(csv_path) + # + # # 连接到 PostgreSQL 数据库(这里是数据库 "bb") + # with psycopg.connect("dbname=bb host=127.0.0.1") as conn: + # with conn.cursor() as cur: + # for index, row in df.iterrows(): + # # 直接将数据插入,不进行唯一性检查 + # insert_sql = sql.SQL(""" + # INSERT INTO history_patterns_flows (id, factor, flow) + # VALUES (%s, %s, %s); + # """) + # # 将数据插入数据库 + # cur.execute(insert_sql, (row['id'], row['factor'], row['flow'])) + # conn.commit() + # print("数据成功导入到 'history_patterns_flows' 表格。") + # else: + # print(f"history_patterns_flows文件不存在。") + # 检查文件是否存在 + if os.path.exists(csv_path): + print(f"history_patterns_flows文件存在,开始处理...") + + # 连接到 PostgreSQL 数据库(这里是数据库 "bb") + with psycopg.connect(f"dbname={project_info.name} host=127.0.0.1") as conn: + with conn.cursor() as cur: + with open(csv_path, newline="", encoding="utf-8-sig") as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + # 直接将数据插入,不进行唯一性检查 + insert_sql = sql.SQL( + """ + INSERT INTO history_patterns_flows (id, factor, flow) + VALUES (%s, %s, %s); + """ + ) + # 将数据插入数据库 + cur.execute(insert_sql, (row["id"], row["factor"], row["flow"])) + conn.commit() + print("数据成功导入到 'history_patterns_flows' 表格。") + else: + print(f"history_patterns_flows文件不存在。") + + +def submit_scada_info(name: str, coord_id: str) -> None: + """ + 将scada信息表导入pg数据库 + :param name: 项目名称(数据库名称) + :param coord_id: 坐标系的id,如4326,根据原始坐标信息输入 + :return: + """ + scada_info_path = "./scada_info.csv" + # 检查文件是否存在 + if os.path.exists(scada_info_path): + print(f"scada_info文件存在,开始处理...") + + # 自动检测文件编码 + with open(scada_info_path, "rb") as file: + raw_data = file.read() + detected = chardet.detect(raw_data) + file_encoding = detected["encoding"] + print(f"检测到的文件编码:{file_encoding}") + try: + # 动态替换数据库名称 + conn_string = get_pgconn_string(db_name=name) + + # 连接到 PostgreSQL 数据库(这里是数据库 "bb") + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 检查 scada_info 表是否为空 + cur.execute("SELECT COUNT(*) FROM scada_info;") + count = cur.fetchone()[0] + + if count > 0: + print("scada_info表中已有数据,正在清空记录...") + cur.execute("DELETE FROM scada_info;") + print("表记录已清空。") + + with open( + scada_info_path, newline="", encoding=file_encoding + ) as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + # 将CSV单元格值为空的字段转换为 None + cleaned_row = { + key: (value if value.strip() else None) + for key, value in row.items() + } + + # 处理 associated_source_outflow_id 列动态变化 + associated_columns = [ + f"associated_source_outflow_id{i}" for i in range(1, 21) + ] + associated_values = [ + ( + cleaned_row.get(col).strip() + if cleaned_row.get(col) + and cleaned_row.get(col).strip() + else None + ) + for col in associated_columns + ] + + # 将 X_coor 和 Y_coor 转换为 geometry 类型 + x_coor = ( + float(cleaned_row["X_coor"]) + if cleaned_row["X_coor"] + else None + ) + y_coor = ( + float(cleaned_row["Y_coor"]) + if cleaned_row["Y_coor"] + else None + ) + coord = ( + f"SRID={coord_id};POINT({x_coor} {y_coor})" + if x_coor and y_coor + else None + ) + + # 准备插入 SQL 语句 + insert_sql = sql.SQL( + """ + INSERT INTO scada_info ( + id, type, associated_element_id, associated_pattern, + associated_pipe_flow_id, {associated_columns}, + API_query_id, transmission_mode, transmission_frequency, + reliability, X_coor, Y_coor, coord + ) + VALUES ( + %s, %s, %s, %s, %s, {associated_placeholders}, + %s, %s, %s, %s, %s, %s, %s + ); + """ + ).format( + associated_columns=sql.SQL(", ").join( + sql.Identifier(col) for col in associated_columns + ), + associated_placeholders=sql.SQL(", ").join( + sql.Placeholder() for _ in associated_columns + ), + ) + # 将数据插入数据库 + cur.execute( + insert_sql, + ( + cleaned_row["id"], + cleaned_row["type"], + cleaned_row["associated_element_id"], + cleaned_row.get("associated_pattern"), + cleaned_row.get("associated_pipe_flow_id"), + *associated_values, + cleaned_row.get("API_query_id"), + cleaned_row["transmission_mode"], + cleaned_row["transmission_frequency"], + cleaned_row["reliability"], + x_coor, + y_coor, + coord, + ), + ) + conn.commit() + print("数据成功导入到 'scada_info' 表格。") + except Exception as e: + print(f"导入时出错:{e}") + else: + print(f"scada_info文件不存在。") diff --git a/app/services/scheme_management.py b/app/services/scheme_management.py new file mode 100644 index 0000000..1fe7cc3 --- /dev/null +++ b/app/services/scheme_management.py @@ -0,0 +1,266 @@ +import ast +import json + +import geopandas as gpd +import pandas as pd +import psycopg +from sqlalchemy import create_engine + +from app.native.api.postgresql_info import get_pgconn_string + + +# 2025/03/23 +def create_user(name: str, username: str, password: str): + """ + 创建用户 + :param name: 数据库名称 + :param username: 用户名 + :param password: 密码 + :return: + """ + try: + # 动态替换数据库名称 + conn_string = get_pgconn_string(db_name=name) + # 连接到 PostgreSQL 数据库(这里是数据库 "bb") + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + cur.execute( + "INSERT INTO users (username, password) VALUES (%s, %s)", + (username, password), + ) + # 提交事务 + conn.commit() + print("新用户创建成功!") + except Exception as e: + print(f"创建用户出错:{e}") + + +# 2025/03/23 +def delete_user(name: str, username: str): + """ + 删除用户 + :param name: 数据库名称 + :param username: 用户名 + :return: + """ + try: + # 动态替换数据库名称 + conn_string = get_pgconn_string(db_name=name) + # 连接到 PostgreSQL 数据库(这里是数据库 "bb") + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + cur.execute("DELETE FROM users WHERE username = %s", (username,)) + conn.commit() + print(f"用户 {username} 删除成功!") + except Exception as e: + print(f"删除用户出错:{e}") + + +# 2025/03/23 +def scheme_name_exists(name: str, scheme_name: str) -> bool: + """ + 判断传入的 scheme_name 是否已存在于 scheme_list 表中,用于输入框判断 + :param name: 数据库名称 + :param scheme_name: 需要判断的方案名称 + :return: 如果存在返回 True,否则返回 False + """ + try: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT COUNT(*) FROM scheme_list WHERE scheme_name = %s", + (scheme_name,), + ) + result = cur.fetchone() + if result is not None and result[0] > 0: + return True + else: + return False + except Exception as e: + print(f"查询 scheme_name 时出错:{e}") + return False + + +# 2025/03/23 +def store_scheme_info( + name: str, + scheme_name: str, + scheme_type: str, + username: str, + scheme_start_time: str, + scheme_detail: dict, +): + """ + 将一条方案记录插入 scheme_list 表中 + :param name: 数据库名称 + :param scheme_name: 方案名称 + :param scheme_type: 方案类型 + :param username: 用户名(需在 users 表中已存在) + :param scheme_start_time: 方案起始时间(字符串) + :param scheme_detail: 方案详情(字典,会转换为 JSON) + :return: + """ + try: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + sql = """ + INSERT INTO scheme_list (scheme_name, scheme_type, username, scheme_start_time, scheme_detail) + VALUES (%s, %s, %s, %s, %s) + """ + # 将字典转换为 JSON 字符串 + scheme_detail_json = json.dumps(scheme_detail) + cur.execute( + sql, + ( + scheme_name, + scheme_type, + username, + scheme_start_time, + scheme_detail_json, + ), + ) + conn.commit() + print("方案信息存储成功!") + except Exception as e: + print(f"存储方案信息时出错:{e}") + + +# 2025/03/23 +def delete_scheme_info(name: str, scheme_name: str) -> None: + """ + 从 scheme_list 表中删除指定的方案 + :param name: 数据库名称 + :param scheme_name: 要删除的方案名称 + """ + try: + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 使用参数化查询删除方案记录 + cur.execute( + "DELETE FROM scheme_list WHERE scheme_name = %s", (scheme_name,) + ) + conn.commit() + print(f"方案 {scheme_name} 删除成功!") + except Exception as e: + print(f"删除方案时出错:{e}") + + +# 2025/03/23 +def query_scheme_list(name: str) -> list: + """ + 查询pg数据库中的scheme_list,按照 create_time 降序排列,离现在时间最近的记录排在最前面 + :param name: 项目名称(数据库名称) + :return: 返回查询结果的所有行 + """ + try: + # 动态替换数据库名称 + conn_string = get_pgconn_string(db_name=name) + # 连接到 PostgreSQL 数据库(这里是数据库 "bb") + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 按 create_time 降序排列 + cur.execute("SELECT * FROM scheme_list ORDER BY create_time DESC") + rows = cur.fetchall() + return rows + + except Exception as e: + print(f"查询错误:{e}") + + +# 2025/03/23 +def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str): + """ + 将 Shapefile 文件上传到 PostgreSQL 数据库 + :param name: 项目名称(数据库名称) + :param table_name: 创建表的名字 + :param role: 数据库角色名,位于c盘user中查看 + :param shp_file_path: shp文件的路径 + :return: + """ + try: + # 动态连接到指定的数据库 + conn_string = get_pgconn_string(db_name=name) + with psycopg.connect(conn_string) as conn: + # 读取 Shapefile 文件 + gdf = gpd.read_file(shp_file_path) + + # 检查投影坐标系(CRS),并确保是 EPSG:4326 + if gdf.crs.to_string() != "EPSG:4490": + gdf = gdf.to_crs(epsg=4490) + + # 使用 GeoDataFrame 的 .to_postgis 方法将数据写入 PostgreSQL + # 需要在数据库中提前安装 PostGIS 扩展 + engine = create_engine(f"postgresql+psycopg2://{role}:@127.0.0.1/{name}") + gdf.to_postgis( + table_name, engine, if_exists="replace", index=True, index_label="id" + ) + + print( + f"Shapefile 文件成功上传到 PostgreSQL 数据库 '{name}' 的表 '{table_name}'." + ) + + except Exception as e: + print(f"上传 Shapefile 到 PostgreSQL 时出错:{e}") + + +def submit_risk_probability_result(name: str, result_file_path: str) -> None: + """ + 将管网风险评估结果导入pg数据库 + :param name: 项目名称(数据库名称) + :param result_file_path: 结果文件路径 + :return: + """ + # 自动检测文件编码 + # with open({result_file_path}, 'rb') as file: + # raw_data = file.read() + # detected = chardet.detect(raw_data) + # file_encoding = detected['encoding'] + # print(f"检测到的文件编码:{file_encoding}") + + try: + # 动态替换数据库名称 + conn_string = get_pgconn_string(db_name=name) + + # 连接到 PostgreSQL 数据库 + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + # 检查 scada_info 表是否为空 + cur.execute("SELECT COUNT(*) FROM pipe_risk_probability;") + count = cur.fetchone()[0] + + if count > 0: + print("pipe_risk_probability表中已有数据,正在清空记录...") + cur.execute("DELETE FROM pipe_risk_probability;") + print("表记录已清空。") + + # 读取Excel并转换x/y列为列表 + df = pd.read_excel(result_file_path, sheet_name="Sheet1") + df["x"] = df["x"].apply(ast.literal_eval) + df["y"] = df["y"].apply(ast.literal_eval) + + # 批量插入数据 + for index, row in df.iterrows(): + insert_query = """ + INSERT INTO pipe_risk_probability + (pipeID, pipeage, risk_probability_now, x, y) + VALUES (%s, %s, %s, %s, %s) + """ + cur.execute( + insert_query, + ( + row["pipeID"], + row["pipeage"], + row["risk_probability_now"], + row["x"], # 直接传递列表 + row["y"], # 同上 + ), + ) + + conn.commit() + print("风险评估结果导入成功") + + except Exception as e: + print(f"导入时出错:{e}") diff --git a/app/services/simulation_ops.py b/app/services/simulation_ops.py new file mode 100644 index 0000000..d4723a6 --- /dev/null +++ b/app/services/simulation_ops.py @@ -0,0 +1,233 @@ +import json +from datetime import datetime +from math import pi + +import pytz + +from app.algorithms.api_ex.run_simulation import run_simulation_ex +from app.native.api.project import copy_project +from app.services.epanet.epanet import Output +from app.services.tjnetwork import * + + +############################################################ +# project management 07 ***暂时不使用,与业务需求无关*** +############################################################ + + +def project_management( + prj_name, + start_datetime, + pump_control, + tank_initial_level_control=None, + region_demand_control=None, +) -> str: + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"project_management_{prj_name}" + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # if is_project_open(prj_name): + # close_project(prj_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(prj_name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(prj_name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + result = run_simulation_ex( + name=new_name, + simulation_type="realtime", + start_datetime=start_datetime, + duration=86400, + pump_control=pump_control, + tank_initial_level_control=tank_initial_level_control, + region_demand_control=region_demand_control, + downloading_prohibition=True, + ) + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + return result + + +############################################################ +# scheduling analysis 08 ***暂时不使用,与业务需求无关*** +############################################################ + + +def scheduling_simulation( + prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300 +) -> str: + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"scheduling_{prj_name}" + + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # if is_project_open(prj_name): + # close_project(prj_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(prj_name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(prj_name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + + run_simulation_ex( + new_name, "realtime", start_time, duration=0, pump_control=pump_control + ) + + if not is_project_open(new_name): + open_project(new_name) + + tank = get_tank(new_name, tank_id) # 水塔信息 + tank_floor_space = pi * pow(tank["diameter"] / 2, 2) # 水塔底面积(m^2) + tank_init_level = tank["init_level"] # 水塔初始水位(m) + tank_pipes_id = tank["links"] # pipes list + + tank_pipe_flow_direction = ( + {} + ) # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1 + for pipe_id in tank_pipes_id: + if get_pipe(new_name, pipe_id)["node2"] == tank_id: # 水塔为下游节点 + tank_pipe_flow_direction[pipe_id] = 1 + else: + tank_pipe_flow_direction[pipe_id] = -1 + + output = Output("./temp/{}.db.out".format(new_name)) + + node_results = ( + output.node_results() + ) # [{'node': str, 'result': [{'pressure': float}]}] + water_plant_output_pressure = 0 + for node_result in node_results: + if node_result["node"] == water_plant_output_id: # 水厂出水压力(m) + water_plant_output_pressure = node_result["result"][-1]["pressure"] + water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa) + + pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}] + tank_inflow = 0 + for pipe_result in pipe_results: + for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道 + if pipe_result["link"] == pipe_id: # 水塔入流流量(L/s) + tank_inflow += ( + pipe_result["result"][-1]["flow"] + * tank_pipe_flow_direction[pipe_id] + ) + tank_inflow /= 1000 # 水塔入流流量(m^3/s) + tank_level_delta = tank_inflow * time_delta / tank_floor_space # 水塔水位改变值(m) + tank_level = tank_init_level + tank_level_delta # 预计水塔水位(m) + + simulation_results = { + "water_plant_output_pressure": water_plant_output_pressure, + "tank_init_level": tank_init_level, + "tank_level": tank_level, + } + + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + + return json.dumps(simulation_results) + + +def daily_scheduling_simulation( + prj_name, start_time, pump_control, reservoir_id, tank_id, water_plant_output_id +) -> str: + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"daily_scheduling_{prj_name}" + + if have_project(new_name): + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + # if is_project_open(prj_name): + # close_project(prj_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(prj_name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(prj_name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) + open_project(new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + + run_simulation_ex( + new_name, "realtime", start_time, duration=86400, pump_control=pump_control + ) + + if not is_project_open(new_name): + open_project(new_name) + + output = Output("./temp/{}.db.out".format(new_name)) + + node_results = ( + output.node_results() + ) # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] + water_plant_output_pressure = [] + reservoir_level = [] + tank_level = [] + for node_result in node_results: + if node_result["node"] == water_plant_output_id: + for result in node_result["result"]: + water_plant_output_pressure.append( + result["pressure"] / 100 + ) # 水厂出水压力(Mpa) + elif node_result["node"] == reservoir_id: + for result in node_result["result"]: + reservoir_level.append(result["head"] - 250.35) # 清水池液位(m) + elif node_result["node"] == tank_id: + for result in node_result["result"]: + tank_level.append(result["pressure"]) # 调节池液位(m) + + simulation_results = { + "water_plant_output_pressure": water_plant_output_pressure, + "reservoir_level": reservoir_level, + "tank_level": tank_level, + } + + if is_project_open(new_name): + close_project(new_name) + delete_project(new_name) + + return json.dumps(simulation_results)