"""管网模型读取与图构建模块。""" import copy import numpy as np import networkx as nx import pandas as pd import wntr def load_inp(inp_name, inp_location, inp_time, driven_mode, require_p, minimum_p): inp_file = inp_location + inp_name wn = wntr.network.WaterNetworkModel(inp_file) if driven_mode == "PDD": wn.options.hydraulic.demand_model = "PDD" wn.options.hydraulic.required_pressure = require_p wn.options.hydraulic.minimum_pressure = minimum_p else: wn.options.hydraulic.demand_model = "DD" return wn def read_inf_inp(wn): all_node = wn.node_name_list node_elevation = wn.query_node_attribute("elevation") node_coordinates = wn.query_node_attribute("coordinates") all_pipe = wn.pipe_name_list # 改_wz__________________________________ n_pipe = [] for p in all_pipe: pipe = wn.get_link(p) if pipe.initial_status == 0: # 状态为'Closed' n_pipe.append(p) candidate_pipe_init = sorted(set(all_pipe) - set(n_pipe)) pipe_start_node = wn.query_link_attribute( "start_node_name", link_type=wntr.network.model.Pipe ) pipe_end_node = wn.query_link_attribute( "end_node_name", link_type=wntr.network.model.Pipe ) pipe_length = wn.query_link_attribute("length") pipe_diameter = wn.query_link_attribute("diameter") return ( all_node, node_elevation, node_coordinates, candidate_pipe_init, pipe_start_node, pipe_end_node, pipe_length, pipe_diameter, ) def read_inf_inp_other(wn): all_link = wn.link_name_list pipe_start_node_all = wn.query_link_attribute("start_node_name") pipe_end_node_all = wn.query_link_attribute("end_node_name") return all_link, pipe_start_node_all, pipe_end_node_all def construct_graph(wn): length = wn.query_link_attribute("length") G = wn.get_graph(wn, link_weight=length) # 转为无向图 G0 = G.to_undirected() # A0 = np.array(nx.adjacency_graph(G0).todense()) return G0 # , A0 def cal_pipe_coordinate(all_pipe, pipe_start_node, pipe_end_node, node_coordinates): pipe_num = len(all_pipe) pipe_coordinates = np.zeros([pipe_num, 2]) pipe_x = copy.deepcopy(pipe_start_node) pipe_y = copy.deepcopy(pipe_start_node) for i in range(pipe_num): temp_pipe = all_pipe[i] pipe_x[temp_pipe] = ( node_coordinates[pipe_start_node[temp_pipe]][0] + node_coordinates[pipe_end_node[temp_pipe]][0] ) / 2 pipe_y[temp_pipe] = ( node_coordinates[pipe_start_node[temp_pipe]][1] + node_coordinates[pipe_end_node[temp_pipe]][1] ) / 2 return pipe_x, pipe_y def cal_node_coordinate(all_node, node_coordinates): node_x = copy.deepcopy(node_coordinates) node_y = copy.deepcopy(node_coordinates) for i in range(len(node_x)): temp_node = all_node[i] node_x[temp_node] = node_coordinates[temp_node][0] node_y[temp_node] = node_coordinates[temp_node][1] return node_x, node_y def produce_pattern_value(wn, all_node): wn_o = copy.deepcopy(wn) # 改_wz_____________________________ # sample_node = wn_o.get_node(all_node[0]) # num_categories = len(sample_node.demand_timeseries_list) num_categories = 1 columns = [f"D{i}" for i in range(num_categories)] basic_demand_pd = pd.DataFrame(index=all_node, columns=columns) for each in all_node: node = wn_o.get_node(each) for i in range(num_categories): basic_demand_pd.loc[each, columns[i]] = node.demand_timeseries_list[ i ].base_value return basic_demand_pd def _build_node_pipe_maps( all_nodes, candidate_pipes, pipe_start_node, pipe_end_node, pipe_length ): node_pipe_dic = {node: [] for node in all_nodes} couple_node_length = {} for pipe in candidate_pipes: start_node = pipe_start_node[pipe] end_node = pipe_end_node[pipe] if start_node in node_pipe_dic: node_pipe_dic[start_node].append(pipe) if end_node in node_pipe_dic: node_pipe_dic[end_node].append(pipe) length = float(pipe_length[pipe]) couple_node_length[f"{start_node},{end_node}"] = length couple_node_length[f"{end_node},{start_node}"] = length return node_pipe_dic, couple_node_length