import os from tjnetwork import * from api.project import CopyProjectEx from run_simulation import run_simulation_ex, from_clock_to_seconds_2 from math import sqrt, pi from epanet.epanet import Output import json from datetime import datetime import time import pytz import psycopg from psycopg import sql import pandas as pd import csv import chardet import simulation ############################################################ # burst analysis 01 ############################################################ def burst_analysis(name: str, modify_pattern_start_time: str, burst_ID: list | str = None, burst_size: list | float | int = None, modify_total_duration: int=900, modify_fixed_pump_pattern: dict[str, list] = None, modify_variable_pump_pattern: dict[str, list] = None, modify_valve_opening: dict[str, float] = None, scheme_Name: str = None) -> None: """ 爆管模拟 :param name: 模型名称,数据库中对应的名字 :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' :param burst_ID: 爆管管道的ID,选取的是管道,单独传入一个爆管管道,可以是str或list,传入多个爆管管道是用list :param burst_size: 爆管管道破裂的孔口面积,和burst_ID列表各位置的ID对应,以cm*cm计算 :param modify_total_duration: 模拟总历时,秒 :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 :param scheme_Name: 方案名称 :return: """ print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'burst_Anal_{name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(name): close_project(name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") ##step 1 set the emitter coefficient of end node of busrt pipe if isinstance(burst_ID, list): if (burst_size is not None) and (type(burst_size) is not list): return json.dumps('Type mismatch.') elif isinstance(burst_ID, str): burst_ID = [burst_ID] if burst_size is not None: if isinstance(burst_size, float) or isinstance(burst_size, int): burst_size = [burst_size] else: return json.dumps('Type mismatch.') else: return json.dumps('Type mismatch.') if burst_size is None: burst_size = [-1] * len(burst_ID) elif len(burst_size) < len(burst_ID): burst_size += [-1] * (len(burst_ID) - len(burst_size)) elif len(burst_size) > len(burst_ID): # burst_size = burst_size[:len(burst_ID)] return json.dumps('Length mismatch.') for burst_ID_, burst_size_ in zip(burst_ID, burst_size): pipe = get_pipe(new_name, burst_ID_) str_start_node = pipe['node1'] str_end_node = pipe['node2'] d_pipe = pipe['diameter'] / 1000.0 if burst_size_ <= 0: burst_size_ = 3.14 * d_pipe * d_pipe / 4 / 8 else: burst_size_ = burst_size_ / 10000 emitter_coeff = 0.65 * burst_size_ * sqrt(19.6) * 1000#1/8开口面积作为coeff emitter_node = '' if is_junction(new_name, str_end_node): emitter_node = str_end_node elif is_junction(new_name, str_start_node): emitter_node = str_start_node old_emitter = get_emitter(new_name, emitter_node) if(old_emitter != None): old_emitter['coefficient'] = emitter_coeff #爆管的emitter coefficient设置 else: old_emitter = {'junction': emitter_node, 'coefficient': emitter_coeff} new_emitter = ChangeSet() new_emitter.append(old_emitter) set_emitter(new_name, new_emitter) #step 2. run simulation # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA options = get_option(new_name) options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA options['REQUIRED PRESSURE'] = '20.0000' cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) # valve_control = None # if modify_valve_opening is not None: # valve_control = {} # for valve in modify_valve_opening: # valve_control[valve] = {'status': 'CLOSED'} # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, # end_datetime=modify_pattern_start_time, # modify_total_duration=modify_total_duration, # modify_pump_pattern=modify_pump_pattern, # valve_control=valve_control, # downloading_prohibition=True) simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, modify_total_duration=modify_total_duration, modify_fixed_pump_pattern=modify_fixed_pump_pattern, modify_variable_pump_pattern=modify_variable_pump_pattern, modify_valve_opening=modify_valve_opening, scheme_Type='burst_Analysis', scheme_Name=scheme_Name) #step 3. restore the base model status # execute_undo(name) #有疑惑 if is_project_open(new_name): close_project(new_name) delete_project(new_name) # return result ############################################################ # valve closing analysis 02 ############################################################ def valve_close_analysis(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900, modify_valve_opening: dict[str, float] = None, scheme_Name: str = None) -> None: """ 关阀模拟 :param name: 模型名称,数据库中对应的名字 :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' :param modify_total_duration: 模拟总历时,秒 :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 :param scheme_Name: 方案名称 :return: """ print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'valve_close_Anal_{name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(name): close_project(name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") #step 1. change the valves status to 'closed' # for valve in valves: # if not is_valve(new_name,valve): # result='ID:{}is not a valve type'.format(valve) # return result # cs=ChangeSet() # status=get_status(new_name,valve) # status['status']='CLOSED' # cs.append(status) # set_status(new_name,cs) #step 2. run simulation # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA options = get_option(new_name) options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA options['REQUIRED PRESSURE'] = '20.0000' cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, # downloading_prohibition=True) simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, modify_total_duration=duration, modify_valve_opening=modify_valve_opening, scheme_Type='valve_close_Analysis', scheme_Name=scheme_Name) #step 3. restore the base model # for valve in valves: # execute_undo(name) if is_project_open(new_name): close_project(new_name) delete_project(new_name) # return result ############################################################ # flushing analysis 03 #Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string ############################################################ def flushing_analysis(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900, modify_valve_opening: dict[str, float] = None, drainage_node_ID: str = None, flushing_flow: float = 0, scheme_Name: str = None) -> None: """ 管道冲洗模拟 :param name: 模型名称,数据库中对应的名字 :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' :param modify_total_duration: 模拟总历时,秒 :param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度 :param drainage_node_ID: 冲洗排放口所在节点ID :param flushing_flow: 冲洗水量,传入参数单位为m3/h :param scheme_Name: 方案名称 :return: """ print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'flushing_Anal_{name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(name): close_project(name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") if not is_junction(new_name,drainage_node_ID): return 'Wrong Drainage node type' #step 1. change the valves status to 'closed' # for valve, valve_k in zip(valves, valves_k): # cs=ChangeSet() # status=get_status(new_name,valve) # # status['status']='CLOSED' # if valve_k == 0: # status['status'] = 'CLOSED' # elif valve_k < 1: # status['status'] = 'OPEN' # status['setting'] = 0.1036 * pow(valve_k, -3.105) # cs.append(status) # set_status(new_name,cs) units = get_option(new_name) #step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node emitter_demand=get_demand(new_name,drainage_node_ID) cs = ChangeSet() if flushing_flow > 0: for r in emitter_demand['demands']: if units == 'LPS': r['demand'] += (flushing_flow/3.6) elif units == 'CMH': r['demand'] += flushing_flow cs.append(emitter_demand) set_demand(new_name,cs) else: pipes=get_node_links(new_name,drainage_node_ID) flush_diameter=50 for pipe in pipes: d=get_pipe(new_name,pipe)['diameter'] if flush_diameter None: """ 污染模拟 :param name: 模型名称,数据库中对应的名字 :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' :param modify_total_duration: 模拟总历时,秒 :param source: 污染源所在的节点ID :param concentration: 污染源位置处的浓度,单位mg/L,即默认的污染模拟setting为concentration :param source_pattern: 污染源的时间变化模式,若不传入则默认以恒定浓度持续模拟,时间长度等于duration; 若传入,则格式为{1.0,0.5,1.1}等系数列表pattern_step模拟等于模型的hydraulic time step :return: """ print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'contaminant_Sim_{name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(name): close_project(name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") dic_time = get_time(new_name) dic_time['QUALITY TIMESTEP'] = '0:05:00' cs = ChangeSet() cs.operations.append(dic_time) set_time(new_name, cs) # set QUALITY TIMESTEP time_option=get_time(new_name) hydraulic_step=time_option['HYDRAULIC TIMESTEP'] secs=from_clock_to_seconds_2(hydraulic_step) operation_step=0 #step 1. set duration if modify_total_duration==None: modify_total_duration =secs #step 2. set pattern if source_pattern!=None: pt=get_pattern(new_name,source_pattern) if pt==None: str_response=str('cant find source_pattern') return str_response else: cs_pattern=ChangeSet() pt={} factors=[] tmp_duration= modify_total_duration while tmp_duration>0: factors.append(1.0) tmp_duration=tmp_duration-secs pt['id']='contam_pt' pt['factors']=factors cs_pattern.append(pt) add_pattern(new_name,cs_pattern) operation_step+=1 #step 3. set source/initial quality # source quality cs_source=ChangeSet() source_schema={'node':source,'s_type':SOURCE_TYPE_CONCEN,'strength':concentration,'pattern':pt['id']} cs_source.append(source_schema) source_node=get_source(new_name,source) if len(source_node)==0: add_source(new_name,cs_source) else: set_source(new_name,cs_source) dict_demand = get_demand(new_name, source) for demands in dict_demand['demands']: dict_demand['demands'][dict_demand['demands'].index(demands)]['demand'] = -1 dict_demand['demands'][dict_demand['demands'].index(demands)]['pattern'] = None cs = ChangeSet() cs.append(dict_demand) set_demand(new_name, cs) # set inflow node # # initial quality # dict_quality = get_quality(new_name, source) # dict_quality['quality'] = concentration # cs = ChangeSet() # cs.append(dict_quality) # set_quality(new_name, cs) operation_step+=1 #step 4 set option of quality to chemical opt=get_option(new_name) opt['QUALITY']=OPTION_QUALITY_CHEMICAL cs_option=ChangeSet() cs_option.append(opt) set_option(new_name,cs_option) operation_step+=1 #step 5. run simulation # result = run_simulation_ex(new_name,'realtime', modify_pattern_start_time, modify_pattern_start_time, modify_total_duration, # downloading_prohibition=True) simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, modify_total_duration=modify_total_duration, scheme_Type="contaminant_Analysis", scheme_Name=scheme_Name) # for i in range(1,operation_step): # execute_undo(name) if is_project_open(new_name): close_project(new_name) delete_project(new_name) # return result ############################################################ # age analysis 05 ***水龄模拟目前还没和实时模拟打通,不确定是否需要,先不要使用*** ############################################################ def age_analysis(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900) -> None: """ 水龄模拟 :param name: 模型名称,数据库中对应的名字 :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' :param modify_total_duration: 模拟总历时,秒 :return: """ print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'age_Anal_{name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(name): close_project(name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") # step 1. run simulation result = run_simulation_ex(new_name, 'realtime', modify_pattern_start_time, modify_total_duration, downloading_prohibition=True) # step 2. restore the base model status # execute_undo(name) #有疑惑 if is_project_open(new_name): close_project(new_name) delete_project(new_name) output = Output("./temp/{}.db.out".format(new_name)) # element_name = output.element_name() # node_name = element_name['nodes'] # link_name = element_name['links'] nodes_age = [] node_result = output.node_results() for node in node_result: nodes_age.append(node['result'][-1]['quality']) links_age = [] link_result = output.link_results() for link in link_result: links_age.append(link['result'][-1]['quality']) age_result = {'nodes': nodes_age, 'links': links_age} # age_result = {'nodes': nodes_age, 'links': links_age, 'nodeIDs': node_name, 'linkIDs': link_name} return json.dumps(age_result) ############################################################ # pressure regulation 06 ############################################################ def pressure_regulation(name: str, modify_pattern_start_time: str, modify_total_duration: int = 900, modify_tank_initial_level: dict[str, float] = None, modify_fixed_pump_pattern: dict[str, list] = None, modify_variable_pump_pattern: dict[str, list] = None, scheme_Name: str = None) -> None: """ 区域调压模拟,用来模拟未来15分钟内,开关水泵对区域压力的影响 :param name: 模型名称,数据库中对应的名字 :param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00' :param modify_total_duration: 模拟总历时,秒 :param modify_tank_initial_level: dict中包含多个水塔,str为水塔的id,float为修改后的initial_level :param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern :param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern :param scheme_Name: 模拟方案名称 :return: """ print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'pressure_regulation_{name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(name): close_project(name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") # 全部关泵后,压力计算不合理,改为压力驱动PDA options = get_option(new_name) options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA options['REQUIRED PRESSURE'] = '15.0000' cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) # result = run_simulation_ex(name=new_name, # simulation_type='realtime', # start_datetime=start_datetime, # duration=900, # pump_control=pump_control, # tank_initial_level_control=tank_initial_level_control, # downloading_prohibition=True) simulation.run_simulation(name=new_name, simulation_type='extended', modify_pattern_start_time=modify_pattern_start_time, modify_total_duration=modify_total_duration, modify_tank_initial_level=modify_tank_initial_level, modify_fixed_pump_pattern=modify_fixed_pump_pattern, modify_variable_pump_pattern=modify_variable_pump_pattern, scheme_Type="pressure_regulation", scheme_Name=scheme_Name) if is_project_open(new_name): close_project(new_name) delete_project(new_name) # return result ############################################################ # project management 07 ***暂时不使用,与业务需求无关*** ############################################################ def project_management(prj_name, start_datetime, pump_control, tank_initial_level_control=None, region_demand_control=None) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'project_management_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") result = run_simulation_ex(name=new_name, simulation_type='realtime', start_datetime=start_datetime, duration=86400, pump_control=pump_control, tank_initial_level_control=tank_initial_level_control, region_demand_control=region_demand_control, downloading_prohibition=True) if is_project_open(new_name): close_project(new_name) delete_project(new_name) return result ############################################################ # scheduling analysis 08 ***暂时不使用,与业务需求无关*** ############################################################ def scheduling_simulation(prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'scheduling_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") run_simulation_ex(new_name, 'realtime', start_time, duration=0, pump_control=pump_control) if not is_project_open(new_name): open_project(new_name) tank = get_tank(new_name, tank_id) # 水塔信息 tank_floor_space = pi * pow(tank['diameter'] / 2, 2) # 水塔底面积(m^2) tank_init_level = tank['init_level'] # 水塔初始水位(m) tank_pipes_id = tank['links'] # pipes list tank_pipe_flow_direction = {} # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1 for pipe_id in tank_pipes_id: if get_pipe(new_name, pipe_id)['node2'] == tank_id: # 水塔为下游节点 tank_pipe_flow_direction[pipe_id] = 1 else: tank_pipe_flow_direction[pipe_id] = -1 output = Output("./temp/{}.db.out".format(new_name)) node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float}]}] water_plant_output_pressure = 0 for node_result in node_results: if node_result['node'] == water_plant_output_id: # 水厂出水压力(m) water_plant_output_pressure = node_result['result'][-1]['pressure'] water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa) pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}] tank_inflow = 0 for pipe_result in pipe_results: for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道 if pipe_result['link'] == pipe_id: # 水塔入流流量(L/s) tank_inflow += pipe_result['result'][-1]['flow'] * tank_pipe_flow_direction[pipe_id] tank_inflow /= 1000 # 水塔入流流量(m^3/s) tank_level_delta = tank_inflow * time_delta / tank_floor_space # 水塔水位改变值(m) tank_level = tank_init_level + tank_level_delta # 预计水塔水位(m) simulation_results = {'water_plant_output_pressure': water_plant_output_pressure, 'tank_init_level': tank_init_level, 'tank_level': tank_level} if is_project_open(new_name): close_project(new_name) delete_project(new_name) return json.dumps(simulation_results) def daily_scheduling_simulation(prj_name, start_time, pump_control, reservoir_id, tank_id, water_plant_output_id) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'daily_scheduling_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") run_simulation_ex(new_name, 'realtime', start_time, duration=86400, pump_control=pump_control) if not is_project_open(new_name): open_project(new_name) output = Output("./temp/{}.db.out".format(new_name)) node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] water_plant_output_pressure = [] reservoir_level = [] tank_level = [] for node_result in node_results: if node_result['node'] == water_plant_output_id: for result in node_result['result']: water_plant_output_pressure.append(result['pressure'] / 100) # 水厂出水压力(Mpa) elif node_result['node'] == reservoir_id: for result in node_result['result']: reservoir_level.append(result['head'] - 250.35) # 清水池液位(m) elif node_result['node'] == tank_id: for result in node_result['result']: tank_level.append(result['pressure']) # 调节池液位(m) simulation_results = {'water_plant_output_pressure': water_plant_output_pressure, 'reservoir_level': reservoir_level, 'tank_level': tank_level} if is_project_open(new_name): close_project(new_name) delete_project(new_name) return json.dumps(simulation_results) ############################################################ # network_update 10 ############################################################ def network_update(file_path: str) -> None: """ 更新pg数据库中的inp文件 :param file_path: inp文件 :return: """ read_inp('bb', file_path) csv_path = './history_pattern_flow.csv' # # 检查文件是否存在 # if os.path.exists(csv_path): # print(f"history_patterns_flows文件存在,开始处理...") # # # 读取 CSV 文件 # df = pd.read_csv(csv_path) # # # 连接到 PostgreSQL 数据库(这里是数据库 "bb") # with psycopg.connect("dbname=bb host=127.0.0.1") as conn: # with conn.cursor() as cur: # for index, row in df.iterrows(): # # 直接将数据插入,不进行唯一性检查 # insert_sql = sql.SQL(""" # INSERT INTO history_patterns_flows (id, factor, flow) # VALUES (%s, %s, %s); # """) # # 将数据插入数据库 # cur.execute(insert_sql, (row['id'], row['factor'], row['flow'])) # conn.commit() # print("数据成功导入到 'history_patterns_flows' 表格。") # else: # print(f"history_patterns_flows文件不存在。") # 检查文件是否存在 if os.path.exists(csv_path): print(f"history_patterns_flows文件存在,开始处理...") # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect("dbname=bb host=127.0.0.1") as conn: with conn.cursor() as cur: with open(csv_path, newline='', encoding='utf-8-sig') as csvfile: reader = csv.DictReader(csvfile) for row in reader: # 直接将数据插入,不进行唯一性检查 insert_sql = sql.SQL(""" INSERT INTO history_patterns_flows (id, factor, flow) VALUES (%s, %s, %s); """) # 将数据插入数据库 cur.execute(insert_sql, (row['id'], row['factor'], row['flow'])) conn.commit() print("数据成功导入到 'history_patterns_flows' 表格。") else: print(f"history_patterns_flows文件不存在。") def submit_scada_info(name: str, coord_id: str) -> None: """ 将scada信息表导入pg数据库 :param name: 项目名称(数据库名称) :param coord_id: 坐标系的id,如4326,根据原始坐标信息输入 :return: """ scada_info_path = './scada_info.csv' # 检查文件是否存在 if os.path.exists(scada_info_path): print(f"scada_info文件存在,开始处理...") # 自动检测文件编码 with open(scada_info_path, 'rb') as file: raw_data = file.read() detected = chardet.detect(raw_data) file_encoding = detected['encoding'] print(f"检测到的文件编码:{file_encoding}") try: # 动态替换数据库名称 conn_string = f"dbname={name} host=127.0.0.1" # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 检查 scada_info 表是否为空 cur.execute("SELECT COUNT(*) FROM scada_info;") count = cur.fetchone()[0] if count > 0: print("scada_info表中已有数据,正在清空记录...") cur.execute("DELETE FROM scada_info;") print("表记录已清空。") with open(scada_info_path, newline='', encoding=file_encoding) as csvfile: reader = csv.DictReader(csvfile) for row in reader: # 将CSV单元格值为空的字段转换为 None cleaned_row = {key: (value if value.strip() else None) for key, value in row.items()} # 处理 associated_source_outflow_id 列动态变化 associated_columns = [f"associated_source_outflow_id{i}" for i in range(1, 21)] associated_values = [ (cleaned_row.get(col).strip() if cleaned_row.get(col) and cleaned_row.get( col).strip() else None) for col in associated_columns ] # 将 X_coor 和 Y_coor 转换为 geometry 类型 x_coor = float(cleaned_row['X_coor']) if cleaned_row['X_coor'] else None y_coor = float(cleaned_row['Y_coor']) if cleaned_row['Y_coor'] else None coord = f"SRID={coord_id};POINT({x_coor} {y_coor})" if x_coor and y_coor else None # 准备插入 SQL 语句 insert_sql = sql.SQL(""" INSERT INTO scada_info ( id, type, associated_element_id, associated_pattern, associated_pipe_flow_id, {associated_columns}, API_query_id, transmission_mode, transmission_frequency, X_coor, Y_coor, coord ) VALUES ( %s, %s, %s, %s, %s, {associated_placeholders}, %s, %s, %s, %s, %s, %s ); """).format( associated_columns=sql.SQL(", ").join(sql.Identifier(col) for col in associated_columns), associated_placeholders=sql.SQL(", ").join(sql.Placeholder() for _ in associated_columns) ) # 将数据插入数据库 cur.execute(insert_sql, ( cleaned_row['id'], cleaned_row['type'], cleaned_row['associated_element_id'], cleaned_row.get('associated_pattern'), cleaned_row.get('associated_pipe_flow_id'), *associated_values, cleaned_row.get('API_query_id'), cleaned_row['transmission_mode'], cleaned_row['transmission_frequency'], x_coor, y_coor, coord )) conn.commit() print("数据成功导入到 'scada_info' 表格。") except Exception as e: print(f"导入时出错:{e}") else: print(f"scada_info文件不存在。") if __name__ == '__main__': # contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800) # flushing_analysis('bb_model','2024-04-01T08:00:00Z',{'GSD230719205857733F8F5214FF','GSD230719205857C0AF65B6A170'},'GSD2307192058570DEDF28E4F73',0,duration=900) # flushing_analysis('bb_model', '2024-08-26T08:00:00Z', ['GSD2307192058572E5C0E14D83E'], [0.5], 'ZBBDTZDP009410', 0, # duration=1800) # valve_close_analysis('bb_model','2024-04-01T08:00:00Z',['GSD2307192058576122D929EE99(L)'],duration=1800) # burst_analysis('bb','2024-04-01T08:00:00Z','ZBBGXSZW000001',burst_size=200,duration=1800) #run_simulation('beibeizone','2024-04-01T08:00:00Z') # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_no_burst.out') # with open("out_dump.txt", "w") as f: # f.write(str_dump) # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_busrtID(ZBBGXSZW000001).out') # with open("burst_out_dump.txt", "w") as f: # f.write(str_dump) # # network_update('model22_1223.inp') # submit_scada_info('bb', '4490') # 示例1:burst_analysis burst_analysis(name='bb', modify_pattern_start_time='2025-02-14T10:30:00+08:00', burst_ID='ZBBGXSZW000002', burst_size=50, modify_total_duration=1800, scheme_Name='burst_scheme')