Files
TJWaterServerBinary/app/algorithms/burst_location/network_model.py
T

170 lines
5.2 KiB
Python

"""管网模型读取与图构建模块。"""
import copy
import numpy as np
import networkx as nx
import pandas as pd
import wntr
def load_inp(inp_name, inp_location, inp_time, driven_mode, require_p, minimum_p):
inp_file = inp_location + inp_name
wn = wntr.network.WaterNetworkModel(inp_file)
if driven_mode == "PDD":
wn.options.hydraulic.demand_model = "PDD"
wn.options.hydraulic.required_pressure = require_p
wn.options.hydraulic.minimum_pressure = minimum_p
else:
wn.options.hydraulic.demand_model = "DD"
return wn
def read_inf_inp(wn):
all_node = wn.node_name_list
node_elevation = wn.query_node_attribute("elevation")
node_coordinates = wn.query_node_attribute("coordinates")
all_pipe = wn.pipe_name_list
# 改_wz__________________________________
n_pipe = []
for p in all_pipe:
pipe = wn.get_link(p)
if pipe.initial_status == 0: # 状态为'Closed'
n_pipe.append(p)
candidate_pipe_init = list(set(all_pipe) - set(n_pipe))
pipe_start_node = wn.query_link_attribute(
"start_node_name", link_type=wntr.network.model.Pipe
)
pipe_end_node = wn.query_link_attribute(
"end_node_name", link_type=wntr.network.model.Pipe
)
pipe_length = wn.query_link_attribute("length")
pipe_diameter = wn.query_link_attribute("diameter")
return (
all_node,
node_elevation,
node_coordinates,
candidate_pipe_init,
pipe_start_node,
pipe_end_node,
pipe_length,
pipe_diameter,
)
def read_inf_inp_other(wn):
all_link = wn.link_name_list
pipe_start_node_all = wn.query_link_attribute("start_node_name")
pipe_end_node_all = wn.query_link_attribute("end_node_name")
return all_link, pipe_start_node_all, pipe_end_node_all
def construct_graph(wn):
length = wn.query_link_attribute("length")
G = wn.get_graph(wn, link_weight=length)
# 转为无向图
G0 = G.to_undirected()
# A0 = np.array(nx.adjacency_graph(G0).todense())
return G0 # , A0
def cal_pipe_coordinate(all_pipe, pipe_start_node, pipe_end_node, node_coordinates):
pipe_num = len(all_pipe)
pipe_coordinates = np.zeros([pipe_num, 2])
pipe_x = copy.deepcopy(pipe_start_node)
pipe_y = copy.deepcopy(pipe_start_node)
for i in range(pipe_num):
temp_pipe = all_pipe[i]
pipe_x[temp_pipe] = (
node_coordinates[pipe_start_node[temp_pipe]][0]
+ node_coordinates[pipe_end_node[temp_pipe]][0]
) / 2
pipe_y[temp_pipe] = (
node_coordinates[pipe_start_node[temp_pipe]][1]
+ node_coordinates[pipe_end_node[temp_pipe]][1]
) / 2
return pipe_x, pipe_y
def cal_node_coordinate(all_node, node_coordinates):
node_x = copy.deepcopy(node_coordinates)
node_y = copy.deepcopy(node_coordinates)
for i in range(len(node_x)):
temp_node = all_node[i]
node_x[temp_node] = node_coordinates[temp_node][0]
node_y[temp_node] = node_coordinates[temp_node][1]
return node_x, node_y
def produce_pattern_value(wn, all_node):
wn_o = copy.deepcopy(wn)
# 改_wz_____________________________
# sample_node = wn_o.get_node(all_node[0])
# num_categories = len(sample_node.demand_timeseries_list)
num_categories = 1
columns = [f"D{i}" for i in range(num_categories)]
basic_demand_pd = pd.DataFrame(index=all_node, columns=columns)
for each in all_node:
node = wn_o.get_node(each)
for i in range(num_categories):
basic_demand_pd.loc[each, columns[i]] = node.demand_timeseries_list[
i
].base_value
return basic_demand_pd
def _build_node_pipe_maps(
all_nodes, candidate_pipes, pipe_start_node, pipe_end_node, pipe_length
):
node_pipe_dic = {node: [] for node in all_nodes}
couple_node_length = {}
for pipe in candidate_pipes:
start_node = pipe_start_node[pipe]
end_node = pipe_end_node[pipe]
if start_node in node_pipe_dic:
node_pipe_dic[start_node].append(pipe)
if end_node in node_pipe_dic:
node_pipe_dic[end_node].append(pipe)
length = float(pipe_length[pipe])
couple_node_length[f"{start_node},{end_node}"] = length
couple_node_length[f"{end_node},{start_node}"] = length
return node_pipe_dic, couple_node_length
class NetworkModelReader:
@staticmethod
def load_inp(*args, **kwargs):
return load_inp(*args, **kwargs)
@staticmethod
def read_inf_inp(*args, **kwargs):
return read_inf_inp(*args, **kwargs)
@staticmethod
def read_inf_inp_other(*args, **kwargs):
return read_inf_inp_other(*args, **kwargs)
@staticmethod
def construct_graph(*args, **kwargs):
return construct_graph(*args, **kwargs)
@staticmethod
def cal_pipe_coordinate(*args, **kwargs):
return cal_pipe_coordinate(*args, **kwargs)
@staticmethod
def cal_node_coordinate(*args, **kwargs):
return cal_node_coordinate(*args, **kwargs)
@staticmethod
def produce_pattern_value(*args, **kwargs):
return produce_pattern_value(*args, **kwargs)
@staticmethod
def build_node_pipe_maps(*args, **kwargs):
return _build_node_pipe_maps(*args, **kwargs)