diff --git a/online_Analysis.py b/online_Analysis.py index 729d808..83d5240 100644 --- a/online_Analysis.py +++ b/online_Analysis.py @@ -1,6 +1,6 @@ import os from tjnetwork import * -from api.project import CopyProjectEx +from api.project import copy_project from run_simulation import run_simulation_ex, from_clock_to_seconds_2 from math import sqrt, pi from epanet.epanet import Output @@ -24,21 +24,22 @@ import api_ex.Fdataclean import api_ex.Pdataclean from api.postgresql_info import get_pgconn_string + ############################################################ # burst analysis 01 ############################################################ -def convert_to_local_unit(proj:str,emitters:float)->float: +def convert_to_local_unit(proj: str, emitters: float) -> float: open_project(proj) - proj_opt=get_option(proj) - str_unit=proj_opt.get('UNITS') + proj_opt = get_option(proj) + str_unit = proj_opt.get("UNITS") - if str_unit == 'CMH': + if str_unit == "CMH": return emitters * 3.6 - elif str_unit == 'LPS': + elif str_unit == "LPS": return emitters - elif str_unit == 'CMS': + elif str_unit == "CMS": return emitters / 1000.0 - elif str_unit == 'MGD': + elif str_unit == "MGD": return emitters * 0.0438126 # Unknown unit: log and return original value @@ -46,10 +47,17 @@ def convert_to_local_unit(proj:str,emitters:float)->float: return emitters -def burst_analysis(name: str, modify_pattern_start_time: str, burst_ID: list | str = None, burst_size: list | float | int = None, - modify_total_duration: int=900, modify_fixed_pump_pattern: dict[str, list] = None, - modify_variable_pump_pattern: dict[str, list] = None, modify_valve_opening: dict[str, float] = None, - scheme_Name: str = None) -> None: +def burst_analysis( + name: str, + modify_pattern_start_time: str, + burst_ID: list | str = None, + burst_size: list | float | int = None, + modify_total_duration: int = 900, + modify_fixed_pump_pattern: dict[str, list] = None, + modify_variable_pump_pattern: dict[str, list] = None, + modify_valve_opening: dict[str, float] = None, + scheme_Name: str = None, +) -> None: """ 爆管模拟 :param name: 模型名称,数据库中对应的名字 @@ -71,68 +79,87 @@ def burst_analysis(name: str, modify_pattern_start_time: str, burst_ID: list | s "modify_variable_pump_pattern": modify_variable_pump_pattern, "modify_valve_opening": modify_valve_opening, } - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'burst_Anal_{name}' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"burst_Anal_{name}" if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - simulation.run_simulation(name=new_name, simulation_type='manually_temporary', modify_pattern_start_time=modify_pattern_start_time) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") + simulation.run_simulation( + name=new_name, + simulation_type="manually_temporary", + modify_pattern_start_time=modify_pattern_start_time, + ) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) ##step 1 set the emitter coefficient of end node of busrt pipe if isinstance(burst_ID, list): if (burst_size is not None) and (type(burst_size) is not list): - return json.dumps('Type mismatch.') + return json.dumps("Type mismatch.") elif isinstance(burst_ID, str): burst_ID = [burst_ID] if burst_size is not None: if isinstance(burst_size, float) or isinstance(burst_size, int): burst_size = [burst_size] else: - return json.dumps('Type mismatch.') + return json.dumps("Type mismatch.") else: - return json.dumps('Type mismatch.') + return json.dumps("Type mismatch.") if burst_size is None: burst_size = [-1] * len(burst_ID) elif len(burst_size) < len(burst_ID): burst_size += [-1] * (len(burst_ID) - len(burst_size)) elif len(burst_size) > len(burst_ID): # burst_size = burst_size[:len(burst_ID)] - return json.dumps('Length mismatch.') + return json.dumps("Length mismatch.") for burst_ID_, burst_size_ in zip(burst_ID, burst_size): pipe = get_pipe(new_name, burst_ID_) - str_start_node = pipe['node1'] - str_end_node = pipe['node2'] - d_pipe = pipe['diameter'] / 1000.0 + str_start_node = pipe["node1"] + str_end_node = pipe["node2"] + d_pipe = pipe["diameter"] / 1000.0 if burst_size_ <= 0: burst_size_ = 3.14 * d_pipe * d_pipe / 4 / 8 else: burst_size_ = burst_size_ / 10000 - emitter_coeff = 0.65 * burst_size_ * sqrt(19.6) * 1000#1/8开口面积作为coeff,单位 L/S - emitter_coeff =convert_to_local_unit(new_name, emitter_coeff) - emitter_node = '' + emitter_coeff = ( + 0.65 * burst_size_ * sqrt(19.6) * 1000 + ) # 1/8开口面积作为coeff,单位 L/S + emitter_coeff = convert_to_local_unit(new_name, emitter_coeff) + emitter_node = "" if is_junction(new_name, str_end_node): emitter_node = str_end_node elif is_junction(new_name, str_start_node): emitter_node = str_start_node old_emitter = get_emitter(new_name, emitter_node) - if(old_emitter != None): - old_emitter['coefficient'] = emitter_coeff #爆管的emitter coefficient设置 + if old_emitter != None: + old_emitter["coefficient"] = emitter_coeff # 爆管的emitter coefficient设置 else: - old_emitter = {'junction': emitter_node, 'coefficient': emitter_coeff} + old_emitter = {"junction": emitter_node, "coefficient": emitter_coeff} new_emitter = ChangeSet() new_emitter.append(old_emitter) set_emitter(new_name, new_emitter) - #step 2. run simulation + # step 2. run simulation # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA options = get_option(new_name) - options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA - options['REQUIRED PRESSURE'] = '10.0000' + options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA + options["REQUIRED PRESSURE"] = "10.0000" cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) @@ -147,26 +174,43 @@ def burst_analysis(name: str, modify_pattern_start_time: str, burst_ID: list | s # modify_pump_pattern=modify_pump_pattern, # valve_control=valve_control, # downloading_prohibition=True) - simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, modify_fixed_pump_pattern=modify_fixed_pump_pattern, - modify_variable_pump_pattern=modify_variable_pump_pattern, modify_valve_opening=modify_valve_opening, - scheme_Type='burst_Analysis', scheme_Name=scheme_Name) - #step 3. restore the base model status + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + modify_fixed_pump_pattern=modify_fixed_pump_pattern, + modify_variable_pump_pattern=modify_variable_pump_pattern, + modify_valve_opening=modify_valve_opening, + scheme_Type="burst_Analysis", + scheme_Name=scheme_Name, + ) + # step 3. restore the base model status # execute_undo(name) #有疑惑 if is_project_open(new_name): close_project(new_name) delete_project(new_name) # return result - store_scheme_info(name=name, scheme_name=scheme_Name, scheme_type='burst_Analysis', username='admin', - scheme_start_time=modify_pattern_start_time, scheme_detail=scheme_detail) - + store_scheme_info( + name=name, + scheme_name=scheme_Name, + scheme_type="burst_Analysis", + username="admin", + scheme_start_time=modify_pattern_start_time, + scheme_detail=scheme_detail, + ) ############################################################ # valve closing analysis 02 ############################################################ -def valve_close_analysis(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900, - modify_valve_opening: dict[str, float] = None, scheme_Name: str = None) -> None: +def valve_close_analysis( + name: str, + modify_pattern_start_time: str, + modify_total_duration: int = 900, + modify_valve_opening: dict[str, float] = None, + scheme_Name: str = None, +) -> None: """ 关阀模拟 :param name: 模型名称,数据库中对应的名字 @@ -176,19 +220,32 @@ def valve_close_analysis(name: str, modify_pattern_start_time: str, modify_total :param scheme_Name: 方案名称 :return: """ - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'valve_close_Anal_{name}' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"valve_close_Anal_{name}" if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") - #step 1. change the valves status to 'closed' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + # step 1. change the valves status to 'closed' # for valve in valves: # if not is_valve(new_name,valve): # result='ID:{}is not a valve type'.format(valve) @@ -198,20 +255,26 @@ def valve_close_analysis(name: str, modify_pattern_start_time: str, modify_total # status['status']='CLOSED' # cs.append(status) # set_status(new_name,cs) - #step 2. run simulation + # step 2. run simulation # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA options = get_option(new_name) - options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA - options['REQUIRED PRESSURE'] = '20.0000' + options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA + options["REQUIRED PRESSURE"] = "20.0000" cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, # downloading_prohibition=True) - simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, modify_valve_opening=modify_valve_opening, - scheme_Type='valve_close_Analysis', scheme_Name=scheme_Name) - #step 3. restore the base model + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + modify_valve_opening=modify_valve_opening, + scheme_Type="valve_close_Analysis", + scheme_Name=scheme_Name, + ) + # step 3. restore the base model # for valve in valves: # execute_undo(name) if is_project_open(new_name): @@ -222,11 +285,17 @@ def valve_close_analysis(name: str, modify_pattern_start_time: str, modify_total ############################################################ # flushing analysis 03 -#Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string +# Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string ############################################################ -def flushing_analysis(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900, - modify_valve_opening: dict[str, float] = None, drainage_node_ID: str = None, - flushing_flow: float = 0, scheme_Name: str = None) -> None: +def flushing_analysis( + name: str, + modify_pattern_start_time: str, + modify_total_duration: int = 900, + modify_valve_opening: dict[str, float] = None, + drainage_node_ID: str = None, + flushing_flow: float = 0, + scheme_Name: str = None, +) -> None: """ 管道冲洗模拟 :param name: 模型名称,数据库中对应的名字 @@ -238,23 +307,36 @@ def flushing_analysis(name: str, modify_pattern_start_time: str, modify_total_du :param scheme_Name: 方案名称 :return: """ - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'flushing_Anal_{name}' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"flushing_Anal_{name}" if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) # if is_project_open(name): # close_project(name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") - if not is_junction(new_name,drainage_node_ID): - return 'Wrong Drainage node type' - #step 1. change the valves status to 'closed' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + if not is_junction(new_name, drainage_node_ID): + return "Wrong Drainage node type" + # step 1. change the valves status to 'closed' # for valve, valve_k in zip(valves, valves_k): # cs=ChangeSet() # status=get_status(new_name,valve) @@ -267,61 +349,76 @@ def flushing_analysis(name: str, modify_pattern_start_time: str, modify_total_du # cs.append(status) # set_status(new_name,cs) units = get_option(new_name) - #step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node - emitter_demand=get_demand(new_name,drainage_node_ID) + # step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node + emitter_demand = get_demand(new_name, drainage_node_ID) cs = ChangeSet() if flushing_flow > 0: - for r in emitter_demand['demands']: - if units == 'LPS': - r['demand'] += (flushing_flow/3.6) - elif units == 'CMH': - r['demand'] += flushing_flow + for r in emitter_demand["demands"]: + if units == "LPS": + r["demand"] += flushing_flow / 3.6 + elif units == "CMH": + r["demand"] += flushing_flow cs.append(emitter_demand) - set_demand(new_name,cs) + set_demand(new_name, cs) else: - pipes=get_node_links(new_name,drainage_node_ID) - flush_diameter=50 + pipes = get_node_links(new_name, drainage_node_ID) + flush_diameter = 50 for pipe in pipes: - d=get_pipe(new_name,pipe)['diameter'] - if flush_diameter None: +def contaminant_simulation( + name: str, + modify_pattern_start_time: str, + modify_total_duration: int = 900, + source: str = None, + concentration: float = None, + source_pattern: str = None, + scheme_Name: str = None, +) -> None: """ 污染模拟 :param name: 模型名称,数据库中对应的名字 @@ -334,65 +431,83 @@ def contaminant_simulation(name: str, modify_pattern_start_time: str, modify_tot :param scheme_Name: 方案名称 :return: """ - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'contaminant_Sim_{name}' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"contaminant_Sim_{name}" if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) # if is_project_open(name): # close_project(name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) dic_time = get_time(new_name) - dic_time['QUALITY TIMESTEP'] = '0:05:00' + dic_time["QUALITY TIMESTEP"] = "0:05:00" cs = ChangeSet() cs.operations.append(dic_time) set_time(new_name, cs) # set QUALITY TIMESTEP - time_option=get_time(new_name) - hydraulic_step=time_option['HYDRAULIC TIMESTEP'] - secs=from_clock_to_seconds_2(hydraulic_step) - operation_step=0 - #step 1. set duration - if modify_total_duration==None: - modify_total_duration =secs - #step 2. set pattern - if source_pattern!=None: - pt=get_pattern(new_name,source_pattern) - if pt==None: - str_response=str('cant find source_pattern') + time_option = get_time(new_name) + hydraulic_step = time_option["HYDRAULIC TIMESTEP"] + secs = from_clock_to_seconds_2(hydraulic_step) + operation_step = 0 + # step 1. set duration + if modify_total_duration == None: + modify_total_duration = secs + # step 2. set pattern + if source_pattern != None: + pt = get_pattern(new_name, source_pattern) + if pt == None: + str_response = str("cant find source_pattern") return str_response else: - cs_pattern=ChangeSet() - pt={} - factors=[] - tmp_duration= modify_total_duration - while tmp_duration>0: + cs_pattern = ChangeSet() + pt = {} + factors = [] + tmp_duration = modify_total_duration + while tmp_duration > 0: factors.append(1.0) - tmp_duration=tmp_duration-secs - pt['id']='contam_pt' - pt['factors']=factors + tmp_duration = tmp_duration - secs + pt["id"] = "contam_pt" + pt["factors"] = factors cs_pattern.append(pt) - add_pattern(new_name,cs_pattern) - operation_step+=1 - #step 3. set source/initial quality + add_pattern(new_name, cs_pattern) + operation_step += 1 + # step 3. set source/initial quality # source quality - cs_source=ChangeSet() - source_schema={'node':source,'s_type':SOURCE_TYPE_CONCEN,'strength':concentration,'pattern':pt['id']} + cs_source = ChangeSet() + source_schema = { + "node": source, + "s_type": SOURCE_TYPE_CONCEN, + "strength": concentration, + "pattern": pt["id"], + } cs_source.append(source_schema) - source_node=get_source(new_name,source) - if len(source_node)==0: - add_source(new_name,cs_source) + source_node = get_source(new_name, source) + if len(source_node) == 0: + add_source(new_name, cs_source) else: - set_source(new_name,cs_source) + set_source(new_name, cs_source) dict_demand = get_demand(new_name, source) - for demands in dict_demand['demands']: - dict_demand['demands'][dict_demand['demands'].index(demands)]['demand'] = -1 - dict_demand['demands'][dict_demand['demands'].index(demands)]['pattern'] = None + for demands in dict_demand["demands"]: + dict_demand["demands"][dict_demand["demands"].index(demands)]["demand"] = -1 + dict_demand["demands"][dict_demand["demands"].index(demands)]["pattern"] = None cs = ChangeSet() cs.append(dict_demand) set_demand(new_name, cs) # set inflow node @@ -402,20 +517,26 @@ def contaminant_simulation(name: str, modify_pattern_start_time: str, modify_tot # cs = ChangeSet() # cs.append(dict_quality) # set_quality(new_name, cs) - operation_step+=1 - #step 4 set option of quality to chemical - opt=get_option(new_name) - opt['QUALITY']=OPTION_QUALITY_CHEMICAL - cs_option=ChangeSet() + operation_step += 1 + # step 4 set option of quality to chemical + opt = get_option(new_name) + opt["QUALITY"] = OPTION_QUALITY_CHEMICAL + cs_option = ChangeSet() cs_option.append(opt) - set_option(new_name,cs_option) - operation_step+=1 - #step 5. run simulation + set_option(new_name, cs_option) + operation_step += 1 + # step 5. run simulation # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, # downloading_prohibition=True) - simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, scheme_Type="contaminant_Analysis", scheme_Name=scheme_Name) - + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + scheme_Type="contaminant_Analysis", + scheme_Name=scheme_Name, + ) + # for i in range(1,operation_step): # execute_undo(name) if is_project_open(new_name): @@ -423,12 +544,15 @@ def contaminant_simulation(name: str, modify_pattern_start_time: str, modify_tot delete_project(new_name) # return result + ############################################################ # age analysis 05 ***水龄模拟目前还没和实时模拟打通,不确定是否需要,先不要使用*** ############################################################ -def age_analysis(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900) -> None: +def age_analysis( + name: str, modify_pattern_start_time: str, modify_total_duration: int = 900 +) -> None: """ 水龄模拟 :param name: 模型名称,数据库中对应的名字 @@ -436,24 +560,42 @@ def age_analysis(name: str, modify_pattern_start_time: str, modify_total_duratio :param modify_total_duration: 模拟总历时,秒 :return: """ - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'age_Anal_{name}' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"age_Anal_{name}" if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) # if is_project_open(name): # close_project(name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) # step 1. run simulation - result = run_simulation_ex(new_name, 'realtime', modify_pattern_start_time, modify_total_duration, - downloading_prohibition=True) + result = run_simulation_ex( + new_name, + "realtime", + modify_pattern_start_time, + modify_total_duration, + downloading_prohibition=True, + ) # step 2. restore the base model status # execute_undo(name) #有疑惑 if is_project_open(new_name): @@ -466,12 +608,12 @@ def age_analysis(name: str, modify_pattern_start_time: str, modify_total_duratio nodes_age = [] node_result = output.node_results() for node in node_result: - nodes_age.append(node['result'][-1]['quality']) + nodes_age.append(node["result"][-1]["quality"]) links_age = [] link_result = output.link_results() for link in link_result: - links_age.append(link['result'][-1]['quality']) - age_result = {'nodes': nodes_age, 'links': links_age} + links_age.append(link["result"][-1]["quality"]) + age_result = {"nodes": nodes_age, "links": links_age} # age_result = {'nodes': nodes_age, 'links': links_age, 'nodeIDs': node_name, 'linkIDs': link_name} return json.dumps(age_result) @@ -481,9 +623,15 @@ def age_analysis(name: str, modify_pattern_start_time: str, modify_total_duratio ############################################################ -def pressure_regulation(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900, - modify_tank_initial_level: dict[str, float] = None, modify_fixed_pump_pattern: dict[str, list] = None, - modify_variable_pump_pattern: dict[str, list] = None, scheme_Name: str = None) -> None: +def pressure_regulation( + name: str, + modify_pattern_start_time: str, + modify_total_duration: int = 900, + modify_tank_initial_level: dict[str, float] = None, + modify_fixed_pump_pattern: dict[str, list] = None, + modify_variable_pump_pattern: dict[str, list] = None, + scheme_Name: str = None, +) -> None: """ 区域调压模拟,用来模拟未来15分钟内,开关水泵对区域压力的影响 :param name: 模型名称,数据库中对应的名字 @@ -495,24 +643,37 @@ def pressure_regulation(name: str, modify_pattern_start_time: str, modify_total_ :param scheme_Name: 模拟方案名称 :return: """ - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'pressure_regulation_{name}' + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"pressure_regulation_{name}" if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) # if is_project_open(name): # close_project(name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) # 全部关泵后,压力计算不合理,改为压力驱动PDA options = get_option(new_name) - options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA - options['REQUIRED PRESSURE'] = '15.0000' + options["DEMAND MODEL"] = OPTION_DEMAND_MODEL_PDA + options["REQUIRED PRESSURE"] = "15.0000" cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) @@ -523,10 +684,17 @@ def pressure_regulation(name: str, modify_pattern_start_time: str, modify_total_ # pump_control=pump_control, # tank_initial_level_control=tank_initial_level_control, # downloading_prohibition=True) - simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, - modify_total_duration=modify_total_duration, modify_tank_initial_level=modify_tank_initial_level, - modify_fixed_pump_pattern=modify_fixed_pump_pattern, modify_variable_pump_pattern=modify_variable_pump_pattern, - scheme_Type="pressure_regulation", scheme_Name=scheme_Name) + simulation.run_simulation( + name=new_name, + simulation_type="extended", + modify_pattern_start_time=modify_pattern_start_time, + modify_total_duration=modify_total_duration, + modify_tank_initial_level=modify_tank_initial_level, + modify_fixed_pump_pattern=modify_fixed_pump_pattern, + modify_variable_pump_pattern=modify_variable_pump_pattern, + scheme_Type="pressure_regulation", + scheme_Name=scheme_Name, + ) if is_project_open(new_name): close_project(new_name) delete_project(new_name) @@ -538,30 +706,50 @@ def pressure_regulation(name: str, modify_pattern_start_time: str, modify_total_ ############################################################ -def project_management(prj_name, start_datetime, pump_control, - tank_initial_level_control=None, region_demand_control=None) -> str: - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'project_management_{prj_name}' +def project_management( + prj_name, + start_datetime, + pump_control, + tank_initial_level_control=None, + region_demand_control=None, +) -> str: + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"project_management_{prj_name}" if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) # if is_project_open(prj_name): # close_project(prj_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(prj_name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(prj_name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(prj_name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") - result = run_simulation_ex(name=new_name, - simulation_type='realtime', - start_datetime=start_datetime, - duration=86400, - pump_control=pump_control, - tank_initial_level_control=tank_initial_level_control, - region_demand_control=region_demand_control, - downloading_prohibition=True) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) + result = run_simulation_ex( + name=new_name, + simulation_type="realtime", + start_datetime=start_datetime, + duration=86400, + pump_control=pump_control, + tank_initial_level_control=tank_initial_level_control, + region_demand_control=region_demand_control, + downloading_prohibition=True, + ) if is_project_open(new_name): close_project(new_name) delete_project(new_name) @@ -573,9 +761,14 @@ def project_management(prj_name, start_datetime, pump_control, ############################################################ -def scheduling_simulation(prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300) -> str: - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'scheduling_{prj_name}' +def scheduling_simulation( + prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300 +) -> str: + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"scheduling_{prj_name}" if have_project(new_name): if is_project_open(new_name): @@ -583,52 +776,73 @@ def scheduling_simulation(prj_name, start_time, pump_control, tank_id, water_pla delete_project(new_name) # if is_project_open(prj_name): # close_project(prj_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(prj_name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(prj_name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(prj_name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) - run_simulation_ex(new_name, 'realtime', start_time, duration=0, pump_control=pump_control) + run_simulation_ex( + new_name, "realtime", start_time, duration=0, pump_control=pump_control + ) if not is_project_open(new_name): open_project(new_name) tank = get_tank(new_name, tank_id) # 水塔信息 - tank_floor_space = pi * pow(tank['diameter'] / 2, 2) # 水塔底面积(m^2) - tank_init_level = tank['init_level'] # 水塔初始水位(m) - tank_pipes_id = tank['links'] # pipes list + tank_floor_space = pi * pow(tank["diameter"] / 2, 2) # 水塔底面积(m^2) + tank_init_level = tank["init_level"] # 水塔初始水位(m) + tank_pipes_id = tank["links"] # pipes list - tank_pipe_flow_direction = {} # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1 + tank_pipe_flow_direction = ( + {} + ) # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1 for pipe_id in tank_pipes_id: - if get_pipe(new_name, pipe_id)['node2'] == tank_id: # 水塔为下游节点 + if get_pipe(new_name, pipe_id)["node2"] == tank_id: # 水塔为下游节点 tank_pipe_flow_direction[pipe_id] = 1 else: tank_pipe_flow_direction[pipe_id] = -1 output = Output("./temp/{}.db.out".format(new_name)) - node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float}]}] + node_results = ( + output.node_results() + ) # [{'node': str, 'result': [{'pressure': float}]}] water_plant_output_pressure = 0 for node_result in node_results: - if node_result['node'] == water_plant_output_id: # 水厂出水压力(m) - water_plant_output_pressure = node_result['result'][-1]['pressure'] + if node_result["node"] == water_plant_output_id: # 水厂出水压力(m) + water_plant_output_pressure = node_result["result"][-1]["pressure"] water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa) pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}] tank_inflow = 0 for pipe_result in pipe_results: for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道 - if pipe_result['link'] == pipe_id: # 水塔入流流量(L/s) - tank_inflow += pipe_result['result'][-1]['flow'] * tank_pipe_flow_direction[pipe_id] - tank_inflow /= 1000 # 水塔入流流量(m^3/s) + if pipe_result["link"] == pipe_id: # 水塔入流流量(L/s) + tank_inflow += ( + pipe_result["result"][-1]["flow"] + * tank_pipe_flow_direction[pipe_id] + ) + tank_inflow /= 1000 # 水塔入流流量(m^3/s) tank_level_delta = tank_inflow * time_delta / tank_floor_space # 水塔水位改变值(m) tank_level = tank_init_level + tank_level_delta # 预计水塔水位(m) - simulation_results = {'water_plant_output_pressure': water_plant_output_pressure, - 'tank_init_level': tank_init_level, - 'tank_level': tank_level} + simulation_results = { + "water_plant_output_pressure": water_plant_output_pressure, + "tank_init_level": tank_init_level, + "tank_level": tank_level, + } if is_project_open(new_name): close_project(new_name) @@ -637,10 +851,14 @@ def scheduling_simulation(prj_name, start_time, pump_control, tank_id, water_pla return json.dumps(simulation_results) -def daily_scheduling_simulation(prj_name, start_time, pump_control, - reservoir_id, tank_id, water_plant_output_id) -> str: - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") - new_name = f'daily_scheduling_{prj_name}' +def daily_scheduling_simulation( + prj_name, start_time, pump_control, reservoir_id, tank_id, water_plant_output_id +) -> str: + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Analysis." + ) + new_name = f"daily_scheduling_{prj_name}" if have_project(new_name): if is_project_open(new_name): @@ -648,38 +866,56 @@ def daily_scheduling_simulation(prj_name, start_time, pump_control, delete_project(new_name) # if is_project_open(prj_name): # close_project(prj_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") - CopyProjectEx()(prj_name, new_name, - ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Copying Database." + ) + # CopyProjectEx()(prj_name, new_name, + # ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) + copy_project(prj_name + "_template", new_name) + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Start Opening Database." + ) open_project(new_name) - print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") + print( + datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + + " -- Database Loading OK." + ) - run_simulation_ex(new_name, 'realtime', start_time, duration=86400, pump_control=pump_control) + run_simulation_ex( + new_name, "realtime", start_time, duration=86400, pump_control=pump_control + ) if not is_project_open(new_name): open_project(new_name) output = Output("./temp/{}.db.out".format(new_name)) - node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] + node_results = ( + output.node_results() + ) # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] water_plant_output_pressure = [] reservoir_level = [] tank_level = [] for node_result in node_results: - if node_result['node'] == water_plant_output_id: - for result in node_result['result']: - water_plant_output_pressure.append(result['pressure'] / 100) # 水厂出水压力(Mpa) - elif node_result['node'] == reservoir_id: - for result in node_result['result']: - reservoir_level.append(result['head'] - 250.35) # 清水池液位(m) - elif node_result['node'] == tank_id: - for result in node_result['result']: - tank_level.append(result['pressure']) # 调节池液位(m) + if node_result["node"] == water_plant_output_id: + for result in node_result["result"]: + water_plant_output_pressure.append( + result["pressure"] / 100 + ) # 水厂出水压力(Mpa) + elif node_result["node"] == reservoir_id: + for result in node_result["result"]: + reservoir_level.append(result["head"] - 250.35) # 清水池液位(m) + elif node_result["node"] == tank_id: + for result in node_result["result"]: + tank_level.append(result["pressure"]) # 调节池液位(m) - simulation_results = {'water_plant_output_pressure': water_plant_output_pressure, - 'reservoir_level': reservoir_level, - 'tank_level': tank_level} + simulation_results = { + "water_plant_output_pressure": water_plant_output_pressure, + "reservoir_level": reservoir_level, + "tank_level": tank_level, + } if is_project_open(new_name): close_project(new_name) @@ -687,19 +923,21 @@ def daily_scheduling_simulation(prj_name, start_time, pump_control, return json.dumps(simulation_results) + ############################################################ # network_update 10 ############################################################ + def network_update(file_path: str) -> None: """ 更新pg数据库中的inp文件 :param file_path: inp文件 :return: """ - read_inp('szh', file_path) + read_inp("szh", file_path) - csv_path = './history_pattern_flow.csv' + csv_path = "./history_pattern_flow.csv" # # 检查文件是否存在 # if os.path.exists(csv_path): @@ -730,16 +968,18 @@ def network_update(file_path: str) -> None: # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect(f"dbname={project_info.name} host=127.0.0.1") as conn: with conn.cursor() as cur: - with open(csv_path, newline='', encoding='utf-8-sig') as csvfile: + with open(csv_path, newline="", encoding="utf-8-sig") as csvfile: reader = csv.DictReader(csvfile) for row in reader: # 直接将数据插入,不进行唯一性检查 - insert_sql = sql.SQL(""" + insert_sql = sql.SQL( + """ INSERT INTO history_patterns_flows (id, factor, flow) VALUES (%s, %s, %s); - """) + """ + ) # 将数据插入数据库 - cur.execute(insert_sql, (row['id'], row['factor'], row['flow'])) + cur.execute(insert_sql, (row["id"], row["factor"], row["flow"])) conn.commit() print("数据成功导入到 'history_patterns_flows' 表格。") else: @@ -753,16 +993,16 @@ def submit_scada_info(name: str, coord_id: str) -> None: :param coord_id: 坐标系的id,如4326,根据原始坐标信息输入 :return: """ - scada_info_path = './scada_info.csv' + scada_info_path = "./scada_info.csv" # 检查文件是否存在 if os.path.exists(scada_info_path): print(f"scada_info文件存在,开始处理...") # 自动检测文件编码 - with open(scada_info_path, 'rb') as file: + with open(scada_info_path, "rb") as file: raw_data = file.read() detected = chardet.detect(raw_data) - file_encoding = detected['encoding'] + file_encoding = detected["encoding"] print(f"检测到的文件编码:{file_encoding}") try: # 动态替换数据库名称 @@ -780,27 +1020,51 @@ def submit_scada_info(name: str, coord_id: str) -> None: cur.execute("DELETE FROM scada_info;") print("表记录已清空。") - with open(scada_info_path, newline='', encoding=file_encoding) as csvfile: + with open( + scada_info_path, newline="", encoding=file_encoding + ) as csvfile: reader = csv.DictReader(csvfile) for row in reader: # 将CSV单元格值为空的字段转换为 None - cleaned_row = {key: (value if value.strip() else None) for key, value in row.items()} + cleaned_row = { + key: (value if value.strip() else None) + for key, value in row.items() + } # 处理 associated_source_outflow_id 列动态变化 - associated_columns = [f"associated_source_outflow_id{i}" for i in range(1, 21)] + associated_columns = [ + f"associated_source_outflow_id{i}" for i in range(1, 21) + ] associated_values = [ - (cleaned_row.get(col).strip() if cleaned_row.get(col) and cleaned_row.get( - col).strip() else None) + ( + cleaned_row.get(col).strip() + if cleaned_row.get(col) + and cleaned_row.get(col).strip() + else None + ) for col in associated_columns ] # 将 X_coor 和 Y_coor 转换为 geometry 类型 - x_coor = float(cleaned_row['X_coor']) if cleaned_row['X_coor'] else None - y_coor = float(cleaned_row['Y_coor']) if cleaned_row['Y_coor'] else None - coord = f"SRID={coord_id};POINT({x_coor} {y_coor})" if x_coor and y_coor else None + x_coor = ( + float(cleaned_row["X_coor"]) + if cleaned_row["X_coor"] + else None + ) + y_coor = ( + float(cleaned_row["Y_coor"]) + if cleaned_row["Y_coor"] + else None + ) + coord = ( + f"SRID={coord_id};POINT({x_coor} {y_coor})" + if x_coor and y_coor + else None + ) # 准备插入 SQL 语句 - insert_sql = sql.SQL(""" + insert_sql = sql.SQL( + """ INSERT INTO scada_info ( id, type, associated_element_id, associated_pattern, associated_pipe_flow_id, {associated_columns}, @@ -811,18 +1075,34 @@ def submit_scada_info(name: str, coord_id: str) -> None: %s, %s, %s, %s, %s, {associated_placeholders}, %s, %s, %s, %s, %s, %s, %s ); - """).format( - associated_columns=sql.SQL(", ").join(sql.Identifier(col) for col in associated_columns), - associated_placeholders=sql.SQL(", ").join(sql.Placeholder() for _ in associated_columns) + """ + ).format( + associated_columns=sql.SQL(", ").join( + sql.Identifier(col) for col in associated_columns + ), + associated_placeholders=sql.SQL(", ").join( + sql.Placeholder() for _ in associated_columns + ), ) # 将数据插入数据库 - cur.execute(insert_sql, ( - cleaned_row['id'], cleaned_row['type'], cleaned_row['associated_element_id'], - cleaned_row.get('associated_pattern'), cleaned_row.get('associated_pipe_flow_id'), - *associated_values, cleaned_row.get('API_query_id'), - cleaned_row['transmission_mode'], cleaned_row['transmission_frequency'], - cleaned_row['reliability'], x_coor, y_coor, coord - )) + cur.execute( + insert_sql, + ( + cleaned_row["id"], + cleaned_row["type"], + cleaned_row["associated_element_id"], + cleaned_row.get("associated_pattern"), + cleaned_row.get("associated_pipe_flow_id"), + *associated_values, + cleaned_row.get("API_query_id"), + cleaned_row["transmission_mode"], + cleaned_row["transmission_frequency"], + cleaned_row["reliability"], + x_coor, + y_coor, + coord, + ), + ) conn.commit() print("数据成功导入到 'scada_info' 表格。") except Exception as e: @@ -848,7 +1128,7 @@ def create_user(name: str, username: str, password: str): with conn.cursor() as cur: cur.execute( "INSERT INTO users (username, password) VALUES (%s, %s)", - (username, password) + (username, password), ) # 提交事务 conn.commit() @@ -890,7 +1170,10 @@ def scheme_name_exists(name: str, scheme_name: str) -> bool: conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: - cur.execute("SELECT COUNT(*) FROM scheme_list WHERE scheme_name = %s", (scheme_name,)) + cur.execute( + "SELECT COUNT(*) FROM scheme_list WHERE scheme_name = %s", + (scheme_name,), + ) result = cur.fetchone() if result is not None and result[0] > 0: return True @@ -902,7 +1185,14 @@ def scheme_name_exists(name: str, scheme_name: str) -> bool: # 2025/03/23 -def store_scheme_info(name: str, scheme_name: str, scheme_type: str, username: str, scheme_start_time: str, scheme_detail: dict): +def store_scheme_info( + name: str, + scheme_name: str, + scheme_type: str, + username: str, + scheme_start_time: str, + scheme_detail: dict, +): """ 将一条方案记录插入 scheme_list 表中 :param name: 数据库名称 @@ -923,7 +1213,16 @@ def store_scheme_info(name: str, scheme_name: str, scheme_type: str, username: s """ # 将字典转换为 JSON 字符串 scheme_detail_json = json.dumps(scheme_detail) - cur.execute(sql, (scheme_name, scheme_type, username, scheme_start_time, scheme_detail_json)) + cur.execute( + sql, + ( + scheme_name, + scheme_type, + username, + scheme_start_time, + scheme_detail_json, + ), + ) conn.commit() print("方案信息存储成功!") except Exception as e: @@ -942,7 +1241,9 @@ def delete_scheme_info(name: str, scheme_name: str) -> None: with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 使用参数化查询删除方案记录 - cur.execute("DELETE FROM scheme_list WHERE scheme_name = %s", (scheme_name,)) + cur.execute( + "DELETE FROM scheme_list WHERE scheme_name = %s", (scheme_name,) + ) conn.commit() print(f"方案 {scheme_name} 删除成功!") except Exception as e: @@ -989,15 +1290,19 @@ def upload_shp_to_pg(name: str, table_name: str, role: str, shp_file_path: str): gdf = gpd.read_file(shp_file_path) # 检查投影坐标系(CRS),并确保是 EPSG:4326 - if gdf.crs.to_string() != 'EPSG:4490': + if gdf.crs.to_string() != "EPSG:4490": gdf = gdf.to_crs(epsg=4490) # 使用 GeoDataFrame 的 .to_postgis 方法将数据写入 PostgreSQL # 需要在数据库中提前安装 PostGIS 扩展 engine = create_engine(f"postgresql+psycopg2://{role}:@127.0.0.1/{name}") - gdf.to_postgis(table_name, engine, if_exists='replace', index=True, index_label='id') + gdf.to_postgis( + table_name, engine, if_exists="replace", index=True, index_label="id" + ) - print(f"Shapefile 文件成功上传到 PostgreSQL 数据库 '{name}' 的表 '{table_name}'.") + print( + f"Shapefile 文件成功上传到 PostgreSQL 数据库 '{name}' 的表 '{table_name}'." + ) except Exception as e: print(f"上传 Shapefile 到 PostgreSQL 时出错:{e}") @@ -1034,9 +1339,9 @@ def submit_risk_probability_result(name: str, result_file_path: str) -> None: print("表记录已清空。") # 读取Excel并转换x/y列为列表 - df = pd.read_excel(result_file_path, sheet_name='Sheet1') - df['x'] = df['x'].apply(ast.literal_eval) - df['y'] = df['y'].apply(ast.literal_eval) + df = pd.read_excel(result_file_path, sheet_name="Sheet1") + df["x"] = df["x"].apply(ast.literal_eval) + df["y"] = df["y"].apply(ast.literal_eval) # 批量插入数据 for index, row in df.iterrows(): @@ -1045,13 +1350,16 @@ def submit_risk_probability_result(name: str, result_file_path: str) -> None: (pipeID, pipeage, risk_probability_now, x, y) VALUES (%s, %s, %s, %s, %s) """ - cur.execute(insert_query, ( - row['pipeID'], - row['pipeage'], - row['risk_probability_now'], - row['x'], # 直接传递列表 - row['y'] # 同上 - )) + cur.execute( + insert_query, + ( + row["pipeID"], + row["pipeage"], + row["risk_probability_now"], + row["x"], # 直接传递列表 + row["y"], # 同上 + ), + ) conn.commit() print("风险评估结果导入成功") @@ -1060,8 +1368,9 @@ def submit_risk_probability_result(name: str, result_file_path: str) -> None: print(f"导入时出错:{e}") -def pressure_sensor_placement_sensitivity(name: str, scheme_name: str, sensor_number: int, - min_diameter: int, username: str) -> None: +def pressure_sensor_placement_sensitivity( + name: str, scheme_name: str, sensor_number: int, min_diameter: int, username: str +) -> None: """ 基于改进灵敏度法进行压力监测点优化布置 :param name: 数据库名称 @@ -1071,7 +1380,9 @@ def pressure_sensor_placement_sensitivity(name: str, scheme_name: str, sensor_nu :param username: 用户名 :return: """ - sensor_location = sensitivity.get_ID(name=name, sensor_num=sensor_number, min_diameter=min_diameter) + sensor_location = sensitivity.get_ID( + name=name, sensor_num=sensor_number, min_diameter=min_diameter + ) try: conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: @@ -1081,16 +1392,27 @@ def pressure_sensor_placement_sensitivity(name: str, scheme_name: str, sensor_nu VALUES (%s, %s, %s, %s, %s) """ - cur.execute(sql, (scheme_name, sensor_number, min_diameter, username, sensor_location)) + cur.execute( + sql, + ( + scheme_name, + sensor_number, + min_diameter, + username, + sensor_location, + ), + ) conn.commit() print("方案信息存储成功!") except Exception as e: print(f"存储方案信息时出错:{e}") -#2025/08/21 + +# 2025/08/21 # 基于kmeans聚类法进行压力监测点优化布置 -def pressure_sensor_placement_kmeans(name: str, scheme_name: str, sensor_number: int, - min_diameter: int, username: str) -> None: +def pressure_sensor_placement_kmeans( + name: str, scheme_name: str, sensor_number: int, min_diameter: int, username: str +) -> None: """ 基于聚类法进行压力监测点优化布置 :param name: 数据库名称(注意,此处数据库名称也是inp文件名称,inp文件与pg库名要一样) @@ -1100,10 +1422,12 @@ def pressure_sensor_placement_kmeans(name: str, scheme_name: str, sensor_number: :param username: 用户名 :return: """ - #dump_inp - inp_name = f'./db_inp/{name}.db.inp' - dump_inp(name,inp_name,'2') - sensor_location = api_ex.kmeans_sensor.kmeans_sensor_placement(name=name, sensor_num=sensor_number, min_diameter=min_diameter) + # dump_inp + inp_name = f"./db_inp/{name}.db.inp" + dump_inp(name, inp_name, "2") + sensor_location = api_ex.kmeans_sensor.kmeans_sensor_placement( + name=name, sensor_num=sensor_number, min_diameter=min_diameter + ) try: conn_string = get_pgconn_string(db_name=name) with psycopg.connect(conn_string) as conn: @@ -1113,78 +1437,90 @@ def pressure_sensor_placement_kmeans(name: str, scheme_name: str, sensor_number: VALUES (%s, %s, %s, %s, %s) """ - cur.execute(sql, (scheme_name, sensor_number, min_diameter, username, sensor_location)) + cur.execute( + sql, + ( + scheme_name, + sensor_number, + min_diameter, + username, + sensor_location, + ), + ) conn.commit() print("方案信息存储成功!") except Exception as e: print(f"存储方案信息时出错:{e}") + ############################################################ # 流量监测数据清洗 ***卡尔曼滤波法*** ############################################################ -#2025/08/21 hxyan +# 2025/08/21 hxyan + def flow_data_clean(input_csv_file: str) -> str: """ 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 - :param: input_csv_file: 输入的 CSV 文件明或路径 + :param: input_csv_file: 输入的 CSV 文件明或路径 :return: 输出文件的绝对路径 """ - # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 script_dir = os.path.dirname(os.path.abspath(__file__)) - input_csv_path= os.path.join(script_dir, input_csv_file) + input_csv_path = os.path.join(script_dir, input_csv_file) # 检查文件是否存在 if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 out_xlsx_path = api_ex.Fdataclean.clean_flow_data_kf(input_csv_path) - print("清洗后的数据已保存到:", out_xlsx_path ) + print("清洗后的数据已保存到:", out_xlsx_path) ############################################################ # 压力监测数据清洗 ***kmean++法*** ############################################################ -#2025/08/21 hxyan +# 2025/08/21 hxyan + def pressure_data_clean(input_csv_file: str) -> str: """ 读取 input_csv_path 中的每列时间序列,使用Kmean++清洗数据。 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 - :param input_csv_path: 输入的 CSV 文件路径 + :param input_csv_path: 输入的 CSV 文件路径 :return: 输出文件的绝对路径 """ # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 script_dir = os.path.dirname(os.path.abspath(__file__)) - input_csv_path= os.path.join(script_dir, input_csv_file) + input_csv_path = os.path.join(script_dir, input_csv_file) # 检查文件是否存在 if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 out_xlsx_path = api_ex.Pdataclean.clean_pressure_data_km(input_csv_path) - print("清洗后的数据已保存到:", out_xlsx_path ) + print("清洗后的数据已保存到:", out_xlsx_path) -if __name__ == '__main__': - # contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800) - # flushing_analysis('bb_model','2024-04-01T08:00:00Z',{'GSD230719205857733F8F5214FF','GSD230719205857C0AF65B6A170'},'GSD2307192058570DEDF28E4F73',0,duration=900) - # flushing_analysis('bb_model', '2024-08-26T08:00:00Z', ['GSD2307192058572E5C0E14D83E'], [0.5], 'ZBBDTZDP009410', 0, - # duration=1800) - # valve_close_analysis('bb_model','2024-04-01T08:00:00Z',['GSD2307192058576122D929EE99(L)'],duration=1800) - # burst_analysis('bb','2024-04-01T08:00:00Z','ZBBGXSZW000001',burst_size=200,duration=1800) - #run_simulation('beibeizone','2024-04-01T08:00:00Z') -# str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_no_burst.out') -# with open("out_dump.txt", "w") as f: -# f.write(str_dump) -# str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_busrtID(ZBBGXSZW000001).out') -# with open("burst_out_dump.txt", "w") as f: -# f.write(str_dump) +if __name__ == "__main__": + # contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800) + # flushing_analysis('bb_model','2024-04-01T08:00:00Z',{'GSD230719205857733F8F5214FF','GSD230719205857C0AF65B6A170'},'GSD2307192058570DEDF28E4F73',0,duration=900) + # flushing_analysis('bb_model', '2024-08-26T08:00:00Z', ['GSD2307192058572E5C0E14D83E'], [0.5], 'ZBBDTZDP009410', 0, + # duration=1800) + # valve_close_analysis('bb_model','2024-04-01T08:00:00Z',['GSD2307192058576122D929EE99(L)'],duration=1800) + + # burst_analysis('bb','2024-04-01T08:00:00Z','ZBBGXSZW000001',burst_size=200,duration=1800) + # run_simulation('beibeizone','2024-04-01T08:00:00Z') + # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_no_burst.out') + # with open("out_dump.txt", "w") as f: + # f.write(str_dump) + # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_busrtID(ZBBGXSZW000001).out') + # with open("burst_out_dump.txt", "w") as f: + # f.write(str_dump) # # 更新inp文件,并插入history_patterns_flows # network_update('fx0217-mass injection.inp') @@ -1229,4 +1565,4 @@ if __name__ == '__main__': # pressure_sensor_placement_kmeans(name=project_info.name, scheme_name='sensor_1103', sensor_number=35, min_diameter=300, username='admin') # 测试:convert emitters coefficients - convert_to_local_unit("szh",100) \ No newline at end of file + convert_to_local_unit("szh", 100)