From 0b72ac959adcd1bbfddd3afa1b7da84a76bd5733 Mon Sep 17 00:00:00 2001 From: Jiang Date: Mon, 9 Mar 2026 17:26:39 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=20app/algorithms/api=5Fex=20?= =?UTF-8?q?=E7=9B=AE=E5=BD=95=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- app/algorithms/__init__.py | 8 +- app/algorithms/_utils.py | 88 +++ app/algorithms/api_ex/__init__.py | 3 - app/algorithms/api_ex/sensor_placement.py | 557 ------------------ .../burst_location/leak_simulator.py | 12 +- .../__init__.py} | 18 +- .../flow_data_clean.py => cleaning/flow.py} | 75 +-- .../pressure.py} | 74 +-- app/algorithms/health/__init__.py | 3 + .../analyzer.py} | 0 .../model/my_survival_forest_model_quxi.zip | Bin app/algorithms/isolation/__init__.py | 3 + .../valve.py} | 0 app/algorithms/leakage/__init__.py | 3 + .../identifier.py} | 428 +++++++------- .../{sensors.py => sensor/__init__.py} | 4 +- .../kmeans_sensor.py => sensor/kmeans.py} | 0 .../{api_ex => sensor}/sensitivity.py | 0 app/algorithms/simulation/__init__.py | 19 + .../runner.py} | 0 .../scenarios.py} | 2 +- app/api/v1/endpoints/simulation.py | 8 +- app/infra/db/timescaledb/composite_queries.py | 6 +- app/services/leakage_identifier.py | 2 +- app/services/simulation_ops.py | 2 +- app/services/valve_isolation.py | 2 +- scripts/build_pyd.py | 1 - scripts/online_Analysis.py | 10 +- tests/unit/test_pipeline_health_analyzer.py | 2 +- 30 files changed, 364 insertions(+), 968 deletions(-) create mode 100644 app/algorithms/_utils.py delete mode 100644 app/algorithms/api_ex/__init__.py delete mode 100644 app/algorithms/api_ex/sensor_placement.py rename app/algorithms/{data_cleaning.py => cleaning/__init__.py} (76%) rename app/algorithms/{api_ex/flow_data_clean.py => cleaning/flow.py} (82%) rename app/algorithms/{api_ex/pressure_data_clean.py => cleaning/pressure.py} (80%) create mode 100644 app/algorithms/health/__init__.py rename app/algorithms/{api_ex/pipeline_health_analyzer.py => health/analyzer.py} (100%) rename app/algorithms/{api_ex => health}/model/my_survival_forest_model_quxi.zip (100%) create mode 100644 app/algorithms/isolation/__init__.py rename app/algorithms/{valve_isolation.py => isolation/valve.py} (100%) create mode 100644 app/algorithms/leakage/__init__.py rename app/algorithms/{leakage_identifier.py => leakage/identifier.py} (96%) rename app/algorithms/{sensors.py => sensor/__init__.py} (96%) rename app/algorithms/{api_ex/kmeans_sensor.py => sensor/kmeans.py} (100%) rename app/algorithms/{api_ex => sensor}/sensitivity.py (100%) create mode 100644 app/algorithms/simulation/__init__.py rename app/algorithms/{api_ex/run_simulation.py => simulation/runner.py} (100%) rename app/algorithms/{simulations.py => simulation/scenarios.py} (99%) diff --git a/.gitignore b/.gitignore index 88f3135..10754c5 100644 --- a/.gitignore +++ b/.gitignore @@ -5,5 +5,5 @@ build/ *.pyc .env *.dump -app/algorithms/api_ex/model/my_survival_forest_model_quxi.joblib .vscode/ +app/algorithms/health/model/my_survival_forest_model_quxi.joblib diff --git a/app/algorithms/__init__.py b/app/algorithms/__init__.py index b96c814..340653e 100644 --- a/app/algorithms/__init__.py +++ b/app/algorithms/__init__.py @@ -1,10 +1,10 @@ -from app.algorithms.data_cleaning import flow_data_clean, pressure_data_clean -from app.algorithms.sensors import ( +from app.algorithms.cleaning import flow_data_clean, pressure_data_clean +from app.algorithms.sensor import ( pressure_sensor_placement_sensitivity, pressure_sensor_placement_kmeans, ) -from app.algorithms.valve_isolation import valve_isolation_analysis -from app.algorithms.simulations import ( +from app.algorithms.isolation.valve import valve_isolation_analysis +from app.algorithms.simulation.scenarios import ( convert_to_local_unit, burst_analysis, valve_close_analysis, diff --git a/app/algorithms/_utils.py b/app/algorithms/_utils.py new file mode 100644 index 0000000..5137943 --- /dev/null +++ b/app/algorithms/_utils.py @@ -0,0 +1,88 @@ +import os + +import pandas as pd + + +def fill_time_gaps( + data: pd.DataFrame, + time_col: str = "time", + freq: str = "1min", + short_gap_threshold: int = 10, +) -> pd.DataFrame: + """ + 补齐缺失时间戳并填补数据缺口。 + + Args: + data: 包含时间列的 DataFrame + time_col: 时间列名(默认 'time') + freq: 重采样频率(默认 '1min') + short_gap_threshold: 短缺口阈值(分钟),<=此值用线性插值,>此值用前向填充 + + Returns: + 补齐时间后的 DataFrame(保留原时间列格式) + """ + if time_col not in data.columns: + raise ValueError(f"时间列 '{time_col}' 不存在于数据中") + + # 解析时间列并设为索引 + data = data.copy() + data[time_col] = pd.to_datetime(data[time_col], utc=True) + data_indexed = data.set_index(time_col) + + # 生成完整时间范围 + full_range = pd.date_range( + start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq + ) + + # 重索引以补齐缺失时间点,同时保留原始时间戳 + combined_index = data_indexed.index.union(full_range).sort_values().unique() + data_reindexed = data_indexed.reindex(combined_index) + + # 按列处理缺口 + for col in data_reindexed.columns: + # 识别缺失值位置 + is_missing = data_reindexed[col].isna() + + # 计算连续缺失的长度 + missing_groups = (is_missing != is_missing.shift()).cumsum() + gap_lengths = is_missing.groupby(missing_groups).transform("sum") + + # 短缺口:时间插值 + short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) + if short_gap_mask.any(): + data_reindexed.loc[short_gap_mask, col] = ( + data_reindexed[col] + .interpolate(method="time", limit_area="inside") + .loc[short_gap_mask] + ) + + # 长缺口:前向填充 + long_gap_mask = is_missing & (gap_lengths > short_gap_threshold) + if long_gap_mask.any(): + data_reindexed.loc[long_gap_mask, col] = ( + data_reindexed[col].ffill().loc[long_gap_mask] + ) + + # 重置索引并恢复时间列(保留原格式) + data_result = data_reindexed.reset_index() + data_result.rename(columns={"index": time_col}, inplace=True) + + # 保留时区信息 + data_result[time_col] = data_result[time_col].dt.strftime("%Y-%m-%dT%H:%M:%S%z") + # 修正时区格式(Python的%z输出为+0000,需转为+00:00) + data_result[time_col] = data_result[time_col].str.replace( + r"(\+\d{2})(\d{2})$", r"\1:\2", regex=True + ) + + return data_result + + +def _cleanup_temp_files(prefix: str) -> None: + """清理 EPANET 仿真产生的临时文件。""" + for ext in [".inp", ".rpt", ".bin", ".out"]: + temp_file = prefix + ext + if os.path.exists(temp_file): + try: + os.remove(temp_file) + except OSError: + pass diff --git a/app/algorithms/api_ex/__init__.py b/app/algorithms/api_ex/__init__.py deleted file mode 100644 index fbc5e67..0000000 --- a/app/algorithms/api_ex/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .flow_data_clean import * -from .pressure_data_clean import * -from .pipeline_health_analyzer import * \ No newline at end of file diff --git a/app/algorithms/api_ex/sensor_placement.py b/app/algorithms/api_ex/sensor_placement.py deleted file mode 100644 index 99b717a..0000000 --- a/app/algorithms/api_ex/sensor_placement.py +++ /dev/null @@ -1,557 +0,0 @@ -# 改进灵敏度法 -import networkx -import numpy as np -import pandas -import wntr -import pandas as pd -import copy -import matplotlib.pyplot as plt -import networkx as nx -from sklearn.cluster import KMeans -from wntr.epanet.toolkit import EpanetException -from numpy.linalg import slogdet -import random -from app.services.tjnetwork import * -import app.services.project_info as project_info - -# 2025/03/12 -# Step1: 获取节点坐标 -def getCoor(wn: wntr.network.WaterNetworkModel) -> pandas.DataFrame: - """ - 获取管网模型的节点坐标 - :param wn: 由wntr生成的模型 - :return: 节点坐标 - """ - # site: pandas.Series - # index:节点名称(wn.node_name_list) - # values:每个节点的坐标,格式为 tuple(如 (x, y) 或 (x, y, z)) - site = wn.query_node_attribute('coordinates') - # Coor: pandas.Series - # index:与site相同(节点名称)。 - # values:坐标转换为numpy.ndarray(如array([10.5, 20.3])) - Coor = site.apply(lambda x: np.array(x)) # 将节点坐标转换为numpy数组 - # x, y: list[float] - x = [] # 存储所有节点的 x 坐标 - y = [] # 存储所有节点的 y 坐标 - for i in range(0, len(Coor)): - x.append(Coor.values[i][0]) # 将 x 坐标存入 x 列表。 - y.append(Coor.values[i][1]) # 将 y 坐标存入 y 列表 - # xy: dict[str, list], x、y 坐标的字典 - xy = {'x': x, 'y': y} - # Coor_node: pandas.DataFrame, 存储节点 x, y 坐标的 DataFrame - Coor_node = pd.DataFrame(xy, index=wn.node_name_list, columns=['x', 'y']) - return Coor_node - - -# 2025/03/12 -# Step2: KMeans 聚类 -# 将节点用kmeans根据坐标分为k组,存入字典g -def kgroup(coor: pandas.DataFrame, knum: int) -> dict[int, list[str]]: - """ - 使用KMeans聚类,将节点坐标分组 - :param coor: 存储所有节点的坐标数据 - :param knum: 需要分成的聚类数 - :return: 聚类结果字典 - """ - g = {} - # estimator: sklearn.cluster.KMeans,KMeans 聚类模型 - estimator = KMeans(n_clusters=knum) - estimator.fit(coor) - # label_pred: numpy.ndarray(int),每个点的类别标签 - label_pred = estimator.labels_ - for i in range(0, knum): - g[i] = coor[label_pred == i].index.tolist() - return g - - -# 2025/03/12 -# Step3: wn_func类,水力计算 -# wn_func 主要用于计算: -# 水力距离(hydraulic length):即节点之间的水力阻力。 -# 灵敏度分析(sensitivity analysis):用于优化测压点的布置。 -# 一些与水力相关的函数,包括 CtoS:求水力距离,stafun:求状态函数F -# # diff:求F对P的导数,返回灵敏度矩阵A -# # sensitivity:返回灵敏度和总灵敏度 -class wn_func(object): - - # Step3.1: 初始化 - def __init__(self, wn: wntr.network.WaterNetworkModel): - """ - 获取管网模型信息 - :param wn: 由wntr生成的模型 - """ - # self.results: wntr.sim.results.SimulationResults,仿真结果,包含压力、流量、水头等数据 - self.results = wntr.sim.EpanetSimulator(wn).run_sim() # 存储运行结果 - self.wn = wn - # self.q:pandas.DataFrame,管道流量,索引为时间步长,列为管道名称 - self.q = self.results.link['flowrate'] - # ReservoirIndex / Tankindex: list[str],水库 / 水箱节点名称列表 - ReservoirIndex = wn.reservoir_name_list - Tankindex = wn.tank_name_list - # 删除水库节点,删除与直接水库相连的虚拟管道 - # self.pipes: list[str],所有管道的名称 - self.pipes = wn.pipe_name_list - # self.nodes: list[str],所有节点的名称 - self.nodes = wn.node_name_list - # self.coordinates:pandas.Series,节点坐标,索引为节点名,值为 (x, y) 坐标的 tuple - self.coordinates = wn.query_node_attribute('coordinates') - # allpumps / allvalves: list[str],所有泵/阀门名称列表 - allpumps = wn.pump_name_list - allvalves = wn.valve_name_list - # pumpstnode / pumpednode / valvestnode / valveednode: list[str],存储泵和阀门 起终点节点的名称 - pumpstnode = [] - pumpednode = [] - valvestnode = [] - valveednode = [] - # Reservoirpipe / Reservoirednode: list[str],记录与水库相关的管道和节点 - Reservoirpipe = [] - Reservoirednode = [] - for pump in allpumps: - pumpstnode.append(wn.links[pump].start_node.name) - pumpednode.append(wn.links[pump].end_node.name) - for valve in allvalves: - valvestnode.append(wn.links[valve].start_node.name) - valveednode.append(wn.links[valve].end_node.name) - for pipe in self.pipes: - if wn.links[pipe].start_node.name in ReservoirIndex: - Reservoirpipe.append(pipe) - Reservoirednode.append(wn.links[pipe].end_node.name) - if wn.links[pipe].start_node.name in Tankindex: - Reservoirpipe.append(pipe) - Reservoirednode.append(wn.links[pipe].end_node.name) - if wn.links[pipe].end_node.name in Tankindex: - Reservoirpipe.append(pipe) - Reservoirednode.append(wn.links[pipe].start_node.name) - # 泵的起终点、tank、reservoir - # self.delnodes: list[str],需要删除的节点(包括水库、泵、阀门连接的节点) - self.delnodes = list( - set(ReservoirIndex).union(Tankindex, pumpstnode, pumpednode, valvestnode, valveednode, Reservoirednode)) - # 泵、起终点为tank、reservoir的管道 - # self.delpipes: list[str],需要删除的管道(包括水库、泵、阀门连接的管道) - self.delpipes = list(set(wn.pump_name_list).union(wn.valve_name_list).union(Reservoirpipe)) - self.pipes = [pipe for pipe in wn.pipe_name_list if pipe not in self.delpipes] - # self.L: list[float],所有管道的长度(以米为单位) - self.L = wn.query_link_attribute('length')[self.pipes].tolist() - self.n = len(self.nodes) - self.m = len(self.pipes) - # self.unit_headloss: list[float],单位水头损失(headloss 数据的第一行,单位:米/km) - self.unit_headloss = self.results.link['headloss'].iloc[0, :].tolist() - ## - self.delnodes1 = list(set(ReservoirIndex).union(Tankindex)) - - # Step3.2: 计算水力距离 - def CtoS(self): - """ - 计算水力距离矩阵 - :return: - """ - # 水力距离:当行索引对应的节点为控制点时,列索引对应的节点距离控制点的(路径*水头损失)的最小值 - # nodes:list[str](节点名称) - nodes = copy.deepcopy(self.nodes) - # pipes:list[str](管道名称) - pipes = self.pipes - wn = self.wn - # n / m:int(节点数 / 管道数) - n = self.n - m = self.m - s1 = [0] * m - q = self.q - L = self.L - # H1:pandas.DataFrame,水头数据,索引为时间步长,列为节点名 - H1 = self.results.node['head'].T - # hh:list[float],计算管道两端水头之差 - hh = [] - # 水头损失 - for p in pipes: - h1 = self.wn.links[p].start_node.name - h1 = H1.loc[str(h1)] - h2 = self.wn.links[p].end_node.name - h2 = H1.loc[str(h2)] - hh.append(abs(h1 - h2)) - hh = np.array(hh) - # headloss:pandas.DataFrame,管道水头损失矩阵 - headloss = pd.DataFrame(hh, index=pipes).T - # s1:管道阻力系数,s2:将管道阻力系数与管道的起始节点和终止节点对应 - hf = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) - weightL = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) - # s2为对应管道起始节点与终止节点的粗糙度系数矩阵,index代表起始节点,columns代表终止节点 - G = nx.DiGraph() - for i in range(0, m): - pipe = pipes[i] - a = wn.links[pipe].start_node.name - b = wn.links[pipe].end_node.name - if q.loc[0, pipe] > 0: - hf.loc[a, b] = headloss.loc[0, pipe] - weightL.loc[a, b] = headloss.loc[0, pipe] * L[i] - G.add_weighted_edges_from([(a, b, weightL.loc[a, b])]) - - else: - hf.loc[b, a] = headloss.loc[0, pipe] - weightL.loc[b, a] = headloss.loc[0, pipe] * L[i] - G.add_weighted_edges_from([(b, a, weightL.loc[b, a])]) - - hydraulicL = pd.DataFrame(np.array([0] * (n ** 2)).reshape(n, n), index=nodes, columns=nodes, dtype=float) - - for a in nodes: - if a in G.nodes: - d = nx.shortest_path_length(G, source=a, weight='weight') - for b in list(d.keys()): - hydraulicL.loc[a, b] = d[b] - - hydraulicL = hydraulicL.drop(self.delnodes) - hydraulicL = hydraulicL.drop(self.delnodes, axis=1) - - # 求加权水力距离 - return hydraulicL, G - - # Step3.3: 计算灵敏度矩阵 - # 获取关系矩阵 - def get_Conn(self): - """ - 计算管网连接关系矩阵 - :return: - """ - m = self.wn.num_links - n = self.wn.num_nodes - p = self.wn.num_pumps - v = self.wn.num_valves - - self.nonjunc_index = [] - self.non_link_index = [] - for r in self.wn.reservoirs(): - self.nonjunc_index.append(r[0]) - for t in self.wn.tanks(): - self.nonjunc_index.append(t[0]) - # Conn:numpy.matrix,节点-管道连接矩阵,起点 -1,终点 1 - Conn = np.mat(np.zeros([n, m - p - v])) # 节点和管道的关系矩阵,行为节点,列为管道,起点为-1,终点为1 - # NConn:numpy.matrix,节点-节点连接矩阵,有管道相连的地方设为 1 - NConn = np.mat(np.zeros([n, n])) # 节点之间的关系,之间有管道为1,反之为0 - # pipes:list[str],去除泵和阀门的管道列表 - pipes = [pipe for pipe in self.wn.pipes() if pipe not in self.wn.pumps() and pipe not in self.wn.valves()] - for pipe_name, pipe in pipes: - start = self.wn.node_name_list.index(pipe.start_node_name) - end = self.wn.node_name_list.index(pipe.end_node_name) - p_index = self.wn.link_name_list.index(pipe_name) - Conn[start, p_index] = -1 - Conn[end, p_index] = 1 - NConn[start, end] = 1 - NConn[end, start] = 1 - self.A = Conn - link_name_list = [link for link in self.wn.link_name_list if - link not in self.wn.pump_name_list and link not in self.wn.valve_name_list] - self.A2 = pd.DataFrame(self.A, index=self.wn.node_name_list, columns=link_name_list) - self.A2 = self.A2.drop(self.delnodes) - for pipe in self.delpipes: - if pipe not in self.wn.pump_name_list and pipe not in self.wn.valve_name_list: - self.A2 = self.A2.drop(columns=pipe) - self.junc_list = self.A2.index - self.A2 = np.mat(self.A2) # 节点管道关系 - self.A3 = NConn - - def Jaco(self, hL: pandas.DataFrame): - """ - 计算灵敏度矩阵(节点压力对粗糙度变化的响应) - :param hL: 水力距离矩阵 - :return: - """ - # global result - # A:numpy.matrix, 节点-管道关系矩阵 - A = self.A2 - wn = self.wn - - try: - result = wntr.sim.EpanetSimulator(wn).run_sim() - except EpanetException: - pass - finally: - h = result.link['headloss'][self.pipes].values[0] - q = result.link['flowrate'][self.pipes].values[0] - l = self.wn.query_link_attribute('length')[self.pipes] - C = self.wn.query_link_attribute('roughness')[self.pipes] - # headloss:numpy.ndarray,水头损失数组 - headloss = np.array(h) - # 调整流量方向 - for i in range(0, len(q)): - if q[i] < 0: - A[:, i] = -A[:, i] - # q:numpy.ndarray,流量数组 - q = np.abs(q) - # 两个灵敏度矩阵 - # B / S:numpy.matrix,灵敏度计算的中间矩阵 - B = np.mat(np.diag(q / ((1.852 * headloss) + 1e-10))) - S = np.mat(np.diag(q / C)) - # X:numpy.matrix, 灵敏度矩阵 - X = A * B * A.T - try: - det = np.linalg.det(X) - except RuntimeError as e: - sign, logdet = slogdet(X) # 防止溢出 - det = sign * np.exp(logdet) - if det != 0: - J_H_Cw = X.I * A * S - # J_H_Q = -X.I - J_q_Cw = S - B * A.T * X.I * A * S # 去掉了delnodes和delpipes - # J_q_Q = B * A.T * X.I - else: # 当X不可逆 - J_H_Cw = np.linalg.pinv(X) @ A @ S - # J_H_Q = -np.linalg.pinv(X) - J_q_Cw = S - B * A.T * np.linalg.pinv(X) * A * S - # J_q_Q = B * A.T * np.linalg.pinv(X) - - Sen_pressure = [] - S_pressure = np.abs(J_H_Cw).sum(axis=1).tolist() # 修改为绝对值 - for ss in S_pressure: - Sen_pressure.append(ss[0]) - # 求总灵敏度 - SS_pressure = copy.deepcopy(hL) - for i in range(0, len(Sen_pressure)): - SS_pressure.iloc[i, :] = SS_pressure.iloc[i, :] * Sen_pressure[i] - SS = copy.deepcopy(hL) - for i in range(0, len(Sen_pressure)): - SS.iloc[i, :] = SS.iloc[i, :] * Sen_pressure[i] - # SS[i,j]:节点nodes[i]的灵敏度*该节点到nodes[j]的水力距离 - return SS - - -# 2025/03/12 -# Step4: 传感器布置优化 -# Sensorplacement -# weight:分配权重 -# sensor:传感器布置的位置 -class Sensorplacement(wn_func): - """ - Sensorplacement 类继承了 wn_func 类,并且用于计算和优化传感器布置的位置。 - """ - def __init__(self, wn: wntr.network.WaterNetworkModel, sensornum: int): - """ - - :param wn: 由wntr生成的模型 - :param sensornum: 传感器的数量 - """ - wn_func.__init__(self, wn) - self.sensornum = sensornum - - # 1.某个节点到所有节点的加权距离之和 - # 2.某个节点到该组内所有节点的加权距离之和 - def sensor(self, SS: pandas.DataFrame, G: networkx.Graph, group: dict[int, list[str]]): - """ - sensor 方法是用来根据灵敏度矩阵 SS 和加权图 G 来确定传感器布置位置的 - :param SS: 灵敏度矩阵,每个节点的行和列代表不同节点,矩阵元素表示节点间的灵敏度。SS.iloc[i, :] 表示第 i 行对应节点 i 到所有其他节点的灵敏度 - :param G: 加权图,表示管网的拓扑结构,每个节点通过管道连接。图的边的权重通常是根据水力距离或者流量等计算的 - :param group: 节点分组,字典的键是分组编号,值是该组的节点名称列表 - :return: - """ - # 传感器布置个数以及位置 - # W = self.weight() - n = self.n - len(self.delnodes) - nodes = copy.deepcopy(self.nodes) - for node in self.delnodes: - nodes.remove(node) - # sumSS:list[float],每个节点到其他节点的灵敏度之和。SS.iloc[i, :] 返回第 i 个节点与所有其他节点的灵敏度值,sum(SS.iloc[i, :]) 计算这些灵敏度值的总和。 - sumSS = [] - for i in range(0, n): - sumSS.append(sum(SS.iloc[i, :])) - # 一个整数范围,表示每个节点的索引,用作sumSS_ DataFrame的索引 - indices = range(0, n) - # sumSS_:pandas.DataFrame,将 sumSS 转换成 DataFrame 格式,并且将节点的总灵敏度保存到 CSV 文件 sumSS_data.csv 中 - sumSS_ = pd.DataFrame(np.array(sumSS), index=indices) - sumSS_.to_csv('sumSS_data.csv') # 存储节点总灵敏度 - # sumSS:pandas.DataFrame,sumSS 被转换为 DataFrame 类型,并且按总灵敏度(即灵敏度之和)降序排列。此时,sumSS 是按节点的灵敏度之和排序的 DataFrame - sumSS = pd.DataFrame(np.array(sumSS), index=nodes) - sumSS = sumSS.sort_values(by=[0], ascending=[False]) - # sensorindex:list[str],用于存储根据灵敏度排序选出的传感器位置的节点名称,存储根据总灵敏度排序的节点列表,用于传感器布置 - sensorindex = [] - # sensorindex_2:list[str],用于存储每组内根据灵敏度排序选出的传感器位置的节点名称,存储每个组内根据灵敏度排序选择的传感器节点 - sensorindex_2 = [] - # group_S:dict[int, pandas.DataFrame],存储每个组内的灵敏度矩阵 - group_S = {} - # group_sumSS:dict[int, list[float]],存储每个组内节点的总灵敏度,值为每个组内节点灵敏度之和的列表 - group_sumSS = {} - for i in range(0, len(group)): - for node in self.delnodes: - # 这里的group[i]是每个组的节点列表,代码首先去除已经被标记为删除的节点self.delnodes - if node in group[i]: - group[i].remove(node) - group_S[i] = SS.loc[group[i], group[i]] - # 对每个组内的节点,计算组内节点的总灵敏度(group_sumSS[i])。它将每个组内节点的灵敏度值相加,并且按灵敏度降序排序 - group_sumSS[i] = [] - for j in range(0, len(group[i])): - group_sumSS[i].append(sum(group_S[i].iloc[j, :])) - group_sumSS[i] = pd.DataFrame(np.array(group_sumSS[i]), index=group[i]) - group_sumSS[i] = group_sumSS[i].sort_values(by=[0], ascending=[False]) - pass - - # 1.选sumSS最大的节点,然后把这个节点所在的那个组删掉,就可以不再从这个组选点。再重新排序选sumSS最大的; - # 2.在每组内选group_sumSS最大的节点 - # 在这个循环中,首先选择灵敏度最高的节点Smaxnode并添加到sensorindex。然后根据灵敏度排序,删除已选的节点并继续选择下一个灵敏度最大的节点。这个过程用于选择传感器的位置 - sensornum = self.sensornum - for i in range(0, sensornum): - # Smaxnode:str,最大灵敏度节点,sumSS.index[0] 表示灵敏度最高的节点 - Smaxnode = sumSS.index[0] - sensorindex.append(Smaxnode) - sensorindex_2.append(group_sumSS[i].index[0]) - - for key, value in group.items(): - if Smaxnode in value: - sumSS = sumSS.drop(index=group[key]) - continue - - sumSS = sumSS.sort_values(by=[0], ascending=[False]) - - return sensorindex, sensorindex_2 - - -# 2025/03/13 -def get_sensor_coord(name: str, sensor_num: int) -> dict[str, float]: - """ - 获取布置测压点的坐标,初始测压点布置根据灵敏度来布置,计算初始情况下的校准过程的error - :param name: 数据库名称 - :param sensor_num: 测压点数目 - :return: 测压点坐标字典 - """ - # inp_file_real:str,输入文件名,表示原始水力模型文件的路径,该文件格式为 EPANET 输入文件(.inp),包含管网的结构信息、节点、管道、泵等数据 - inp_file_real = f'./db_inp/{name}.db.inp' - # sensornum:int,需要布置的传感器数量 - # sensornum = sensor_num - # wn_real:wntr.network.WaterNetworkModel,加载 EPANET 水力模型 - wn_real = wntr.network.WaterNetworkModel(inp_file_real) # 真实粗糙度的原始管网 - # sim_real:wntr.sim.EpanetSimulator,创建一个水力仿真器对象 - sim_real = wntr.sim.EpanetSimulator(wn_real) - # results_real:wntr.sim.results.SimulationResults,运行仿真并返回结果 - results_real = sim_real.run_sim() - - # real_C:list[float],包含所有管道粗糙度的列表 - real_C = wn_real.query_link_attribute('roughness').tolist() - # wn_fun1:wn_func(继承自 object),创建 wn_func 类的实例,传入 wn_real 水力模型对象。wn_func 用于计算管网相关的水力属性,比如水力距离、灵敏度等 - wn_fun1 = wn_func(wn_real) - # nodes:list[str],管网的节点名称列表 - nodes = wn_fun1.nodes - # delnodes:list[str],被删除的节点(如水库、泵、阀门连接的节点等) - delnodes = wn_fun1.delnodes - # Coor_node:pandas.DataFrame - Coor_node = getCoor(wn_real) - Coor_node = Coor_node.drop(wn_fun1.delnodes) - nodes = [node for node in wn_fun1.nodes if node not in delnodes] - # coordinates:pandas.Series,存储所有节点的坐标,类型为 Series,索引为节点名称,值为 (x, y) 坐标对 - coordinates = wn_fun1.coordinates - - # 随机产生监测点 - # junctionnum:int,nodes 的长度,表示节点的数量 - junctionnum = len(nodes) - # random_numbers:list[int],使用 random.sample 随机选择 sensornum(20)个节点的编号。它返回一个不重复的随机编号列表 - # random_numbers = random.sample(range(junctionnum), sensor_num) - # for i in range(sensor_num): - # # print(random_numbers[i]) - - wn_fun1.get_Conn() - # hL:pandas.DataFrame,水力距离矩阵,表示每个节点到其他节点的水力阻力 - # G:networkx.DiGraph,加权有向图,表示管网的拓扑结构,节点之间的边带有权重 - hL, G = wn_fun1.CtoS() - # SS:pandas.DataFrame,灵敏度矩阵,表示每个节点对管网变化(如粗糙度、流量等)的响应 - SS = wn_fun1.Jaco(hL) - # group:dict[int, list[str]],使用 kgroup 函数将节点按坐标分成若干组,每组包含的节点数不一定相同。group 是一个字典,键为分组编号,值为节点名列表 - group = kgroup(Coor_node, sensor_num) - # wn_fun:Sensorplacement(继承自wn_func) - # 创建Sensorplacement类的实例,传入水力网络模型wn_real和传感器数量sensornum。Sensorplacement用于计算和布置传感器 - wn_fun = Sensorplacement(wn_real, sensor_num) - wn_fun.__dict__.update(wn_fun1.__dict__) - # sensorindex:list[str],初始传感器布置位置的节点名称 - # sensorindex_2:list[str],根据分组选择的传感器位置 - sensorindex, sensorindex_2 = wn_fun.sensor(SS, G, group) # 初始的sensorindex - # print(str(sensor_num), "个测压点,测压点位置:", sensorindex) - sensor_coord = {} - # 重新打开数据库 - if is_project_open(name=name): - close_project(name=name) - open_project(name=name) - for node_id in sensorindex: - sensor_coord[node_id] = get_node_coord(name=name, node_id=node_id) - close_project(name=name) - # print(sensor_coord) - return sensor_coord - - -if __name__ == '__main__': - sensor_coord = get_sensor_coord(name=project_info.name, sensor_num=20) - print(sensor_coord) - # ''' - # 初始测压点布置根据灵敏度来布置,计算初始情况下的校准过程的error - # ''' - # - # # inp_file_real:str,输入文件名,表示原始水力模型文件的路径,该文件格式为 EPANET 输入文件(.inp),包含管网的结构信息、节点、管道、泵等数据 - # inp_file_real = './db_inp/bb.db.inp' - # # sensornum:int,需要布置的传感器数量 - # sensornum = 20 - # # wn_real:wntr.network.WaterNetworkModel,加载 EPANET 水力模型 - # wn_real = wntr.network.WaterNetworkModel(inp_file_real) # 真实粗糙度的原始管网 - # # sim_real:wntr.sim.EpanetSimulator,创建一个水力仿真器对象 - # sim_real = wntr.sim.EpanetSimulator(wn_real) - # # results_real:wntr.sim.results.SimulationResults,运行仿真并返回结果 - # results_real = sim_real.run_sim() - # - # # real_C:list[float],包含所有管道粗糙度的列表 - # real_C = wn_real.query_link_attribute('roughness').tolist() - # # wn_fun1:wn_func(继承自 object),创建 wn_func 类的实例,传入 wn_real 水力模型对象。wn_func 用于计算管网相关的水力属性,比如水力距离、灵敏度等 - # wn_fun1 = wn_func(wn_real) - # # nodes:list[str],管网的节点名称列表 - # nodes = wn_fun1.nodes - # # delnodes:list[str],被删除的节点(如水库、泵、阀门连接的节点等) - # delnodes = wn_fun1.delnodes - # # Coor_node:pandas.DataFrame - # Coor_node = getCoor(wn_real) - # Coor_node = Coor_node.drop(wn_fun1.delnodes) - # nodes = [node for node in wn_fun1.nodes if node not in delnodes] - # # coordinates:pandas.Series,存储所有节点的坐标,类型为 Series,索引为节点名称,值为 (x, y) 坐标对 - # coordinates = wn_fun1.coordinates - # - # # 随机产生监测点 - # # junctionnum:int,nodes 的长度,表示节点的数量 - # junctionnum = len(nodes) - # # random_numbers:list[int],使用 random.sample 随机选择 sensornum(20)个节点的编号。它返回一个不重复的随机编号列表 - # random_numbers = random.sample(range(junctionnum), sensornum) - # for i in range(sensornum): - # print(random_numbers[i]) - # - # wn_fun1.get_Conn() - # # hL:pandas.DataFrame,水力距离矩阵,表示每个节点到其他节点的水力阻力 - # # G:networkx.DiGraph,加权有向图,表示管网的拓扑结构,节点之间的边带有权重 - # hL, G = wn_fun1.CtoS() - # # SS:pandas.DataFrame,灵敏度矩阵,表示每个节点对管网变化(如粗糙度、流量等)的响应 - # SS = wn_fun1.Jaco(hL) - # # group:dict[int, list[str]],使用 kgroup 函数将节点按坐标分成若干组,每组包含的节点数不一定相同。group 是一个字典,键为分组编号,值为节点名列表 - # group = kgroup(Coor_node, sensornum) - # # wn_fun:Sensorplacement(继承自wn_func) - # # 创建Sensorplacement类的实例,传入水力网络模型wn_real和传感器数量sensornum。Sensorplacement用于计算和布置传感器 - # wn_fun = Sensorplacement(wn_real, sensornum) - # wn_fun.__dict__.update(wn_fun1.__dict__) - # # sensorindex:list[str],初始传感器布置位置的节点名称 - # # sensorindex_2:list[str],根据分组选择的传感器位置 - # sensorindex, sensorindex_2 = wn_fun.sensor(SS, G, group) # 初始的sensorindex - # print(str(sensornum), "个测压点,测压点位置:", sensorindex) - - # # 分区画图 - # colorlist = ['lightpink', 'coral', 'rosybrown', 'olive', 'powderblue', 'lightskyblue', 'steelblue', 'peachpuff','brown','silver','indigo','lime','gold','violet','maroon','navy','teal','magenta','cyan', - # 'burlywood', 'tan', 'slategrey', 'thistle', 'lightseagreen', 'lightgreen', 'red','blue','yellow','orange','purple','grey','green','pink','lightblue','beige','chartreuse','turquoise','lavender','fuchsia','coral'] - # G = wn_real.to_graph() - # G = G.to_undirected() # 变为无向图 - # pos = nx.get_node_attributes(G, 'pos') - # pass - # for i in range(0, sensornum): - # ax = plt.gca() - # ax.set_title(inp_file_real + str(sensornum)) - # nodes = nx.draw_networkx_nodes(G, pos, nodelist=group[i], node_color=colorlist[i], node_size=20) - # nodes = nx.draw_networkx_nodes(G, pos, - # nodelist=sensorindex_2, node_color='black', node_size=70, node_shape='*' - # ) - # edges = nx.draw_networkx_edges(G, pos) - # ax.spines['top'].set_visible(False) - # ax.spines['right'].set_visible(False) - # ax.spines['bottom'].set_visible(False) - # ax.spines['left'].set_visible(False) - # plt.savefig(inp_file_real + str(sensornum) + ".png") - # plt.show() - # - # wntr.graphics.plot_network(wn_real, node_attribute=sensorindex_2, node_size=50, node_labels=False, - # title=inp_file_real + '_Projetion' + str(sensornum)) - # plt.savefig(inp_file_real + '_S' + str(sensornum) + ".png") - # plt.show() \ No newline at end of file diff --git a/app/algorithms/burst_location/leak_simulator.py b/app/algorithms/burst_location/leak_simulator.py index 4b8547d..fd09b2f 100644 --- a/app/algorithms/burst_location/leak_simulator.py +++ b/app/algorithms/burst_location/leak_simulator.py @@ -8,20 +8,12 @@ import sys import pandas as pd import wntr +from app.algorithms._utils import _cleanup_temp_files + _PIPE2LEAKNODE = None _SIGNATURE_WORKER_DATA = {} -def _cleanup_temp_files(prefix): - for ext in [".inp", ".rpt", ".bin", ".out"]: - temp_file = prefix + ext - if os.path.exists(temp_file): - try: - os.remove(temp_file) - except OSError: - pass - - def _make_temp_prefix(tag): temp_dir = os.path.abspath(os.path.join("temp", "burst_location")) os.makedirs(temp_dir, exist_ok=True) diff --git a/app/algorithms/data_cleaning.py b/app/algorithms/cleaning/__init__.py similarity index 76% rename from app/algorithms/data_cleaning.py rename to app/algorithms/cleaning/__init__.py index 781c25e..c6f9a28 100644 --- a/app/algorithms/data_cleaning.py +++ b/app/algorithms/cleaning/__init__.py @@ -1,7 +1,7 @@ import os -import app.algorithms.api_ex.flow_data_clean as flow_data_clean -import app.algorithms.api_ex.pressure_data_clean as pressure_data_clean +from app.algorithms.cleaning import flow as _flow_module +from app.algorithms.cleaning import pressure as _pressure_module ############################################################ @@ -19,14 +19,15 @@ def flow_data_clean(input_csv_file: str) -> str: """ # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 - script_dir = os.path.dirname(os.path.abspath(__file__)) + # 使用 algorithms 根目录保持与原 data_cleaning.py 一致的行为 + script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) input_csv_path = os.path.join(script_dir, input_csv_file) # 检查文件是否存在 if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") - # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = flow_data_clean.clean_flow_data_kf(input_csv_path) + # 调用 clean_flow_data_kf 函数进行数据清洗 + out_xlsx_path = _flow_module.clean_flow_data_kf(input_csv_path) print("清洗后的数据已保存到:", out_xlsx_path) @@ -46,12 +47,13 @@ def pressure_data_clean(input_csv_file: str) -> str: """ # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 - script_dir = os.path.dirname(os.path.abspath(__file__)) + # 使用 algorithms 根目录保持与原 data_cleaning.py 一致的行为 + script_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) input_csv_path = os.path.join(script_dir, input_csv_file) # 检查文件是否存在 if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") - # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = pressure_data_clean.clean_pressure_data_km(input_csv_path) + # 调用 clean_pressure_data_km 函数进行数据清洗 + out_xlsx_path = _pressure_module.clean_pressure_data_km(input_csv_path) print("清洗后的数据已保存到:", out_xlsx_path) diff --git a/app/algorithms/api_ex/flow_data_clean.py b/app/algorithms/cleaning/flow.py similarity index 82% rename from app/algorithms/api_ex/flow_data_clean.py rename to app/algorithms/cleaning/flow.py index 526275c..ee361b9 100644 --- a/app/algorithms/api_ex/flow_data_clean.py +++ b/app/algorithms/cleaning/flow.py @@ -1,83 +1,10 @@ -# ...existing code... import pandas as pd import numpy as np import matplotlib.pyplot as plt from pykalman import KalmanFilter import os - -def fill_time_gaps( - data: pd.DataFrame, - time_col: str = "time", - freq: str = "1min", - short_gap_threshold: int = 10, -) -> pd.DataFrame: - """ - 补齐缺失时间戳并填补数据缺口。 - - Args: - data: 包含时间列的 DataFrame - time_col: 时间列名(默认 'time') - freq: 重采样频率(默认 '1min') - short_gap_threshold: 短缺口阈值(分钟),<=此值用线性插值,>此值用前向填充 - - Returns: - 补齐时间后的 DataFrame(保留原时间列格式) - """ - if time_col not in data.columns: - raise ValueError(f"时间列 '{time_col}' 不存在于数据中") - - # 解析时间列并设为索引 - data = data.copy() - data[time_col] = pd.to_datetime(data[time_col], utc=True) - data_indexed = data.set_index(time_col) - - # 生成完整时间范围 - full_range = pd.date_range( - start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq - ) - - # 重索引以补齐缺失时间点,同时保留原始时间戳 - combined_index = data_indexed.index.union(full_range).sort_values().unique() - data_reindexed = data_indexed.reindex(combined_index) - - # 按列处理缺口 - for col in data_reindexed.columns: - # 识别缺失值位置 - is_missing = data_reindexed[col].isna() - - # 计算连续缺失的长度 - missing_groups = (is_missing != is_missing.shift()).cumsum() - gap_lengths = is_missing.groupby(missing_groups).transform("sum") - - # 短缺口:时间插值 - short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) - if short_gap_mask.any(): - data_reindexed.loc[short_gap_mask, col] = ( - data_reindexed[col] - .interpolate(method="time", limit_area="inside") - .loc[short_gap_mask] - ) - - # 长缺口:前向填充 - long_gap_mask = is_missing & (gap_lengths > short_gap_threshold) - if long_gap_mask.any(): - data_reindexed.loc[long_gap_mask, col] = ( - data_reindexed[col].ffill().loc[long_gap_mask] - ) - - # 重置索引并恢复时间列(保留原格式) - data_result = data_reindexed.reset_index() - data_result.rename(columns={"index": time_col}, inplace=True) - - # 保留时区信息 - data_result[time_col] = data_result[time_col].dt.strftime("%Y-%m-%dT%H:%M:%S%z") - # 修正时区格式(Python的%z输出为+0000,需转为+00:00) - data_result[time_col] = data_result[time_col].str.replace( - r"(\+\d{2})(\d{2})$", r"\1:\2", regex=True - ) - - return data_result +from app.algorithms._utils import fill_time_gaps def clean_flow_data_kf( diff --git a/app/algorithms/api_ex/pressure_data_clean.py b/app/algorithms/cleaning/pressure.py similarity index 80% rename from app/algorithms/api_ex/pressure_data_clean.py rename to app/algorithms/cleaning/pressure.py index fcfa20c..6fc545e 100644 --- a/app/algorithms/api_ex/pressure_data_clean.py +++ b/app/algorithms/cleaning/pressure.py @@ -5,79 +5,7 @@ from sklearn.cluster import KMeans from sklearn.impute import SimpleImputer import os - -def fill_time_gaps( - data: pd.DataFrame, - time_col: str = "time", - freq: str = "1min", - short_gap_threshold: int = 10, -) -> pd.DataFrame: - """ - 补齐缺失时间戳并填补数据缺口。 - - Args: - data: 包含时间列的 DataFrame - time_col: 时间列名(默认 'time') - freq: 重采样频率(默认 '1min') - short_gap_threshold: 短缺口阈值(分钟),<=此值用线性插值,>此值用前向填充 - - Returns: - 补齐时间后的 DataFrame(保留原时间列格式) - """ - if time_col not in data.columns: - raise ValueError(f"时间列 '{time_col}' 不存在于数据中") - - # 解析时间列并设为索引 - data = data.copy() - data[time_col] = pd.to_datetime(data[time_col], utc=True) - data_indexed = data.set_index(time_col) - - # 生成完整时间范围 - full_range = pd.date_range( - start=data_indexed.index.min(), end=data_indexed.index.max(), freq=freq - ) - - # 重索引以补齐缺失时间点,同时保留原始时间戳 - combined_index = data_indexed.index.union(full_range).sort_values().unique() - data_reindexed = data_indexed.reindex(combined_index) - - # 按列处理缺口 - for col in data_reindexed.columns: - # 识别缺失值位置 - is_missing = data_reindexed[col].isna() - - # 计算连续缺失的长度 - missing_groups = (is_missing != is_missing.shift()).cumsum() - gap_lengths = is_missing.groupby(missing_groups).transform("sum") - - # 短缺口:时间插值 - short_gap_mask = is_missing & (gap_lengths <= short_gap_threshold) - if short_gap_mask.any(): - data_reindexed.loc[short_gap_mask, col] = ( - data_reindexed[col] - .interpolate(method="time", limit_area="inside") - .loc[short_gap_mask] - ) - - # 长缺口:前向填充 - long_gap_mask = is_missing & (gap_lengths > short_gap_threshold) - if long_gap_mask.any(): - data_reindexed.loc[long_gap_mask, col] = ( - data_reindexed[col].ffill().loc[long_gap_mask] - ) - - # 重置索引并恢复时间列(保留原格式) - data_result = data_reindexed.reset_index() - data_result.rename(columns={"index": time_col}, inplace=True) - - # 保留时区信息 - data_result[time_col] = data_result[time_col].dt.strftime("%Y-%m-%dT%H:%M:%S%z") - # 修正时区格式(Python的%z输出为+0000,需转为+00:00) - data_result[time_col] = data_result[time_col].str.replace( - r"(\+\d{2})(\d{2})$", r"\1:\2", regex=True - ) - - return data_result +from app.algorithms._utils import fill_time_gaps def clean_pressure_data_km( diff --git a/app/algorithms/health/__init__.py b/app/algorithms/health/__init__.py new file mode 100644 index 0000000..d0e216f --- /dev/null +++ b/app/algorithms/health/__init__.py @@ -0,0 +1,3 @@ +from app.algorithms.health.analyzer import PipelineHealthAnalyzer + +__all__ = ["PipelineHealthAnalyzer"] diff --git a/app/algorithms/api_ex/pipeline_health_analyzer.py b/app/algorithms/health/analyzer.py similarity index 100% rename from app/algorithms/api_ex/pipeline_health_analyzer.py rename to app/algorithms/health/analyzer.py diff --git a/app/algorithms/api_ex/model/my_survival_forest_model_quxi.zip b/app/algorithms/health/model/my_survival_forest_model_quxi.zip similarity index 100% rename from app/algorithms/api_ex/model/my_survival_forest_model_quxi.zip rename to app/algorithms/health/model/my_survival_forest_model_quxi.zip diff --git a/app/algorithms/isolation/__init__.py b/app/algorithms/isolation/__init__.py new file mode 100644 index 0000000..12d5a35 --- /dev/null +++ b/app/algorithms/isolation/__init__.py @@ -0,0 +1,3 @@ +from app.algorithms.isolation.valve import valve_isolation_analysis + +__all__ = ["valve_isolation_analysis"] diff --git a/app/algorithms/valve_isolation.py b/app/algorithms/isolation/valve.py similarity index 100% rename from app/algorithms/valve_isolation.py rename to app/algorithms/isolation/valve.py diff --git a/app/algorithms/leakage/__init__.py b/app/algorithms/leakage/__init__.py new file mode 100644 index 0000000..9ec8b91 --- /dev/null +++ b/app/algorithms/leakage/__init__.py @@ -0,0 +1,3 @@ +from app.algorithms.leakage.identifier import LeakageIdentifier + +__all__ = ["LeakageIdentifier"] diff --git a/app/algorithms/leakage_identifier.py b/app/algorithms/leakage/identifier.py similarity index 96% rename from app/algorithms/leakage_identifier.py rename to app/algorithms/leakage/identifier.py index 328ec63..c2044b1 100644 --- a/app/algorithms/leakage_identifier.py +++ b/app/algorithms/leakage/identifier.py @@ -1,11 +1,11 @@ import wntr import numpy as np -import pandas as pd -import os -import time -import argparse -from multiprocessing import Pool, cpu_count -from typing import Any, List, Dict, Union +import pandas as pd +import os +import time +import argparse +from multiprocessing import Pool, cpu_count +from typing import Any, List, Dict, Union from pymoo.core.problem import Problem from pymoo.core.callback import Callback @@ -13,123 +13,115 @@ from pymoo.algorithms.soo.nonconvex.ga import GA from pymoo.operators.crossover.sbx import SBX from pymoo.operators.mutation.pm import PM from pymoo.optimize import minimize as pymoo_minimize -from pymoo.termination.default import DefaultSingleObjectiveTermination - - -_worker_data: dict[str, Any] = {} -DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4)) - - -def _cleanup_temp_files(prefix: str) -> None: - for ext in [".inp", ".rpt", ".bin", ".out"]: - temp_file = prefix + ext - if os.path.exists(temp_file): - try: - os.remove(temp_file) - except OSError: - pass - - -def _worker_init( - inp_path: str, - sensor_nodes: list[str], - area_ids: list[str], - nodes_by_area: dict[str, list[str]], - obs_matrix: np.ndarray, - q_sum: float, - duration_sec: float, - timestep_sec: float, -) -> None: - global _worker_data - wn = wntr.network.WaterNetworkModel(inp_path) - wn.options.hydraulic.demand_model = "DD" - wn.options.time.duration = duration_sec - wn.options.time.hydraulic_timestep = timestep_sec - wn.options.time.pattern_timestep = timestep_sec - wn.options.time.report_timestep = timestep_sec - - demand_objs_by_area = {} - allocatable_counts = {} - for area_id in area_ids: - demand_objs = [] - for node_name in nodes_by_area.get(area_id, []): - if node_name not in wn.node_name_list: - continue - node = wn.get_node(node_name) - if ( - hasattr(node, "demand_timeseries_list") - and len(node.demand_timeseries_list) > 0 - ): - demand_objs.append(node.demand_timeseries_list[0]) - demand_objs_by_area[area_id] = demand_objs - allocatable_counts[area_id] = len(demand_objs) - - _worker_data = { - "wn": wn, - "sensor_nodes": sensor_nodes, - "area_ids": area_ids, - "nodes_by_area": nodes_by_area, - "demand_objs_by_area": demand_objs_by_area, - "allocatable_counts": allocatable_counts, - "obs_matrix": obs_matrix, - "q_sum": q_sum, - } - - -def _worker_evaluate(raw_ratios: np.ndarray) -> float: - d = _worker_data - effective_ratio_map = LeakageIdentifier._effective_area_ratios( - raw_ratios, - d["area_ids"], - d["nodes_by_area"], - allocatable_counts=d["allocatable_counts"], - ) - - modifications = [] - for area_id in d["area_ids"]: - ratio = effective_ratio_map.get(area_id, 0.0) - if ratio <= 0: - continue - - demand_objs = d["demand_objs_by_area"].get(area_id, []) - if not demand_objs: - continue - - per_node_leak = d["q_sum"] * ratio / len(demand_objs) - for demand_obj in demand_objs: - original_val = demand_obj.base_value - demand_obj.base_value = original_val + per_node_leak - modifications.append((demand_obj, original_val)) - - temp_dir = os.path.abspath(os.path.join("temp", "leakage")) - os.makedirs(temp_dir, exist_ok=True) - prefix = os.path.join(temp_dir, f"temp_{os.getpid()}") - - try: - sim = wntr.sim.EpanetSimulator(d["wn"]) - results = sim.run_sim(file_prefix=prefix) - sim_pressure = results.node["pressure"].loc[:, d["sensor_nodes"]] - - n_steps = min(sim_pressure.shape[0], d["obs_matrix"].shape[0]) - sim_vals = sim_pressure.values[:n_steps, :] - obs_vals = d["obs_matrix"][:n_steps, :] - diff = sim_vals - obs_vals - - row_max = np.max(np.abs(diff), axis=1, keepdims=True) - row_max[row_max == 0] = 1.0 - normalized_diff = diff / row_max - return float(np.linalg.norm(normalized_diff)) - - except Exception: - return 1e9 - - finally: - for demand_obj, original_val in modifications: - demand_obj.base_value = original_val - _cleanup_temp_files(prefix) - - -class LeakageIdentifier: +from pymoo.termination.default import DefaultSingleObjectiveTermination + + +from app.algorithms._utils import _cleanup_temp_files + +_worker_data: dict[str, Any] = {} +DEFAULT_N_WORKERS = max(1, min(cpu_count() - 1, 4)) + + +def _worker_init( + inp_path: str, + sensor_nodes: list[str], + area_ids: list[str], + nodes_by_area: dict[str, list[str]], + obs_matrix: np.ndarray, + q_sum: float, + duration_sec: float, + timestep_sec: float, +) -> None: + global _worker_data + wn = wntr.network.WaterNetworkModel(inp_path) + wn.options.hydraulic.demand_model = "DD" + wn.options.time.duration = duration_sec + wn.options.time.hydraulic_timestep = timestep_sec + wn.options.time.pattern_timestep = timestep_sec + wn.options.time.report_timestep = timestep_sec + + demand_objs_by_area = {} + allocatable_counts = {} + for area_id in area_ids: + demand_objs = [] + for node_name in nodes_by_area.get(area_id, []): + if node_name not in wn.node_name_list: + continue + node = wn.get_node(node_name) + if ( + hasattr(node, "demand_timeseries_list") + and len(node.demand_timeseries_list) > 0 + ): + demand_objs.append(node.demand_timeseries_list[0]) + demand_objs_by_area[area_id] = demand_objs + allocatable_counts[area_id] = len(demand_objs) + + _worker_data = { + "wn": wn, + "sensor_nodes": sensor_nodes, + "area_ids": area_ids, + "nodes_by_area": nodes_by_area, + "demand_objs_by_area": demand_objs_by_area, + "allocatable_counts": allocatable_counts, + "obs_matrix": obs_matrix, + "q_sum": q_sum, + } + + +def _worker_evaluate(raw_ratios: np.ndarray) -> float: + d = _worker_data + effective_ratio_map = LeakageIdentifier._effective_area_ratios( + raw_ratios, + d["area_ids"], + d["nodes_by_area"], + allocatable_counts=d["allocatable_counts"], + ) + + modifications = [] + for area_id in d["area_ids"]: + ratio = effective_ratio_map.get(area_id, 0.0) + if ratio <= 0: + continue + + demand_objs = d["demand_objs_by_area"].get(area_id, []) + if not demand_objs: + continue + + per_node_leak = d["q_sum"] * ratio / len(demand_objs) + for demand_obj in demand_objs: + original_val = demand_obj.base_value + demand_obj.base_value = original_val + per_node_leak + modifications.append((demand_obj, original_val)) + + temp_dir = os.path.abspath(os.path.join("temp", "leakage")) + os.makedirs(temp_dir, exist_ok=True) + prefix = os.path.join(temp_dir, f"temp_{os.getpid()}") + + try: + sim = wntr.sim.EpanetSimulator(d["wn"]) + results = sim.run_sim(file_prefix=prefix) + sim_pressure = results.node["pressure"].loc[:, d["sensor_nodes"]] + + n_steps = min(sim_pressure.shape[0], d["obs_matrix"].shape[0]) + sim_vals = sim_pressure.values[:n_steps, :] + obs_vals = d["obs_matrix"][:n_steps, :] + diff = sim_vals - obs_vals + + row_max = np.max(np.abs(diff), axis=1, keepdims=True) + row_max[row_max == 0] = 1.0 + normalized_diff = diff / row_max + return float(np.linalg.norm(normalized_diff)) + + except Exception: + return 1e9 + + finally: + for demand_obj, original_val in modifications: + demand_obj.base_value = original_val + _cleanup_temp_files(prefix) + + +class LeakageIdentifier: FLOW_UNIT_TO_M3S = { "m3/s": 1.0, "m3/h": 1.0 / 3600.0, @@ -279,20 +271,20 @@ class LeakageIdentifier: df = pd.read_csv(path, dtype={"ID": str, "Area": str}) return self._normalize_area_map_df(df) - def run_identification( - self, - observed_pressure_data: Union[ - str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]] - ], + def run_identification( + self, + observed_pressure_data: Union[ + str, pd.DataFrame, Dict[str, List[Any]], List[Dict[str, Any]] + ], output_dir: str = "Results", pop_size: int = 50, max_gen: int = 100, output_flow_unit: str = "m3/s", - save_result: bool = True, - ftol: float = 1e-3, - ftol_period: int = 15, - n_workers: int = DEFAULT_N_WORKERS, - ): + save_result: bool = True, + ftol: float = 1e-3, + ftol_period: int = 15, + n_workers: int = DEFAULT_N_WORKERS, + ): """ 运行遗传算法以识别漏损分布。 @@ -302,11 +294,11 @@ class LeakageIdentifier: pop_size: GA 的种群大小。 max_gen: GA 的最大代数。 output_flow_unit: 输出漏损流量的单位。 - save_result: 是否保存识别结果到本地 CSV。 - ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。 - ftol_period: 收敛检测的窗口代数。 - n_workers: 并行工作进程数(1=串行,>1=并行评估)。 - """ + save_result: 是否保存识别结果到本地 CSV。 + ftol: 目标值收敛容差(连续 ftol_period 代改善 < ftol 则停止)。 + ftol_period: 收敛检测的窗口代数。 + n_workers: 并行工作进程数(1=串行,>1=并行评估)。 + """ if save_result: os.makedirs(output_dir, exist_ok=True) @@ -322,16 +314,16 @@ class LeakageIdentifier: observed_name = "observed_pressure.csv" # 准备 pymoo 问题实例 - problem = LeakageProblem( - self.wn, - self.nodes_by_area, - self.area_ids, - self.sensor_nodes, - obs_df, - q_sum=self.q_sum, - n_workers=n_workers, - inp_path=os.path.abspath(self.inp_path), - ) + problem = LeakageProblem( + self.wn, + self.nodes_by_area, + self.area_ids, + self.sensor_nodes, + obs_df, + q_sum=self.q_sum, + n_workers=n_workers, + inp_path=os.path.abspath(self.inp_path), + ) # 配置 pymoo GA 算法 n_var = self.num_areas @@ -352,19 +344,19 @@ class LeakageIdentifier: # 回调:记录每代信息 callback = _ProgressCallback() - t0 = time.time() - try: - res = pymoo_minimize( - problem, - algorithm, - termination, - seed=42, - verbose=True, - callback=callback, - ) - finally: - problem.close() - elapsed = time.time() - t0 + t0 = time.time() + try: + res = pymoo_minimize( + problem, + algorithm, + termination, + seed=42, + verbose=True, + callback=callback, + ) + finally: + problem.close() + elapsed = time.time() - t0 # 提取最优解 best_ind = res.X # 最优个体(漏损比例原始值) @@ -426,7 +418,7 @@ class _ProgressCallback(Callback): self._t_last = now -class LeakageProblem(Problem): +class LeakageProblem(Problem): """pymoo 批量评估问题定义。 搜索空间:n 维 [0, 1] 实数 -> 通过 _effective_area_ratios 归一化到单纯形。 @@ -434,17 +426,17 @@ class LeakageProblem(Problem): 无显式约束(sum=1 由归一化自动保证)。 """ - def __init__( - self, - wn, - nodes_by_area, - area_ids, - sensor_nodes, - observed_data, - q_sum: float = 0.2, - n_workers: int = DEFAULT_N_WORKERS, - inp_path: str | None = None, - ): + def __init__( + self, + wn, + nodes_by_area, + area_ids, + sensor_nodes, + observed_data, + q_sum: float = 0.2, + n_workers: int = DEFAULT_N_WORKERS, + inp_path: str | None = None, + ): n_var = len(area_ids) super().__init__( @@ -458,10 +450,10 @@ class LeakageProblem(Problem): self.wn = wn self.nodes_by_area = nodes_by_area self.area_ids = area_ids - self.sensor_nodes = sensor_nodes - self.q_sum = q_sum - self.n_workers = max(1, int(n_workers)) - self.inp_path = inp_path + self.sensor_nodes = sensor_nodes + self.q_sum = q_sum + self.n_workers = max(1, int(n_workers)) + self.inp_path = inp_path # 预处理观测数据以匹配模拟格式 try: @@ -500,31 +492,31 @@ class LeakageProblem(Problem): area_id: len(self.demand_objs_by_area.get(area_id, [])) for area_id in self.area_ids } - if not any(count > 0 for count in self.allocatable_counts.values()): - raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") - - # 评估计数器(诊断用) - self._eval_count = 0 - self._pool = None - if self.n_workers > 1: - if not self.inp_path: - raise ValueError("并行评估需要提供 inp_path。") - duration_sec = float(self.wn.options.time.duration) - timestep_sec = float(self.wn.options.time.hydraulic_timestep) - self._pool = Pool( - processes=self.n_workers, - initializer=_worker_init, - initargs=( - self.inp_path, - list(self.sensor_nodes), - list(self.area_ids), - {k: list(v) for k, v in self.nodes_by_area.items()}, - self.obs_matrix.copy(), - self.q_sum, - duration_sec, - timestep_sec, - ), - ) + if not any(count > 0 for count in self.allocatable_counts.values()): + raise ValueError("没有可分配漏损的有效分区,无法满足漏损总量约束。") + + # 评估计数器(诊断用) + self._eval_count = 0 + self._pool = None + if self.n_workers > 1: + if not self.inp_path: + raise ValueError("并行评估需要提供 inp_path。") + duration_sec = float(self.wn.options.time.duration) + timestep_sec = float(self.wn.options.time.hydraulic_timestep) + self._pool = Pool( + processes=self.n_workers, + initializer=_worker_init, + initargs=( + self.inp_path, + list(self.sensor_nodes), + list(self.area_ids), + {k: list(v) for k, v in self.nodes_by_area.items()}, + self.obs_matrix.copy(), + self.q_sum, + duration_sec, + timestep_sec, + ), + ) def _evaluate(self, X, out, *args, **kwargs): """批量评估种群。 @@ -534,15 +526,15 @@ class LeakageProblem(Problem): n_pop = X.shape[0] self._eval_count += n_pop - if self._pool is not None: - results = self._pool.map(_worker_evaluate, [X[i] for i in range(n_pop)]) - out["F"] = np.array(results, dtype=float).reshape(-1, 1) - return - - F = np.zeros((n_pop, 1)) - for i in range(n_pop): - F[i, 0] = self._evaluate_single(X[i]) - out["F"] = F + if self._pool is not None: + results = self._pool.map(_worker_evaluate, [X[i] for i in range(n_pop)]) + out["F"] = np.array(results, dtype=float).reshape(-1, 1) + return + + F = np.zeros((n_pop, 1)) + for i in range(n_pop): + F[i, 0] = self._evaluate_single(X[i]) + out["F"] = F def _evaluate_single(self, x): """评估单个个体,返回归一化误差范数。""" @@ -607,13 +599,13 @@ class LeakageProblem(Problem): for demand_obj, original_val in modifications: demand_obj.base_value = original_val - _cleanup_temp_files(prefix) - - def close(self) -> None: - if self._pool is not None: - self._pool.close() - self._pool.join() - self._pool = None + _cleanup_temp_files(prefix) + + def close(self) -> None: + if self._pool is not None: + self._pool.close() + self._pool.join() + self._pool = None def main() -> int: diff --git a/app/algorithms/sensors.py b/app/algorithms/sensor/__init__.py similarity index 96% rename from app/algorithms/sensors.py rename to app/algorithms/sensor/__init__.py index 0286dcc..d6c1a48 100644 --- a/app/algorithms/sensors.py +++ b/app/algorithms/sensor/__init__.py @@ -1,7 +1,7 @@ import psycopg -import app.algorithms.api_ex.kmeans_sensor as kmeans_sensor -import app.algorithms.api_ex.sensitivity as sensitivity +from app.algorithms.sensor import kmeans as kmeans_sensor +from app.algorithms.sensor import sensitivity from app.core.config import get_pgconn_string from app.services.tjnetwork import dump_inp diff --git a/app/algorithms/api_ex/kmeans_sensor.py b/app/algorithms/sensor/kmeans.py similarity index 100% rename from app/algorithms/api_ex/kmeans_sensor.py rename to app/algorithms/sensor/kmeans.py diff --git a/app/algorithms/api_ex/sensitivity.py b/app/algorithms/sensor/sensitivity.py similarity index 100% rename from app/algorithms/api_ex/sensitivity.py rename to app/algorithms/sensor/sensitivity.py diff --git a/app/algorithms/simulation/__init__.py b/app/algorithms/simulation/__init__.py new file mode 100644 index 0000000..b8e0fa6 --- /dev/null +++ b/app/algorithms/simulation/__init__.py @@ -0,0 +1,19 @@ +from app.algorithms.simulation.scenarios import ( + convert_to_local_unit, + burst_analysis, + valve_close_analysis, + flushing_analysis, + contaminant_simulation, + age_analysis, + pressure_regulation, +) + +__all__ = [ + "convert_to_local_unit", + "burst_analysis", + "valve_close_analysis", + "flushing_analysis", + "contaminant_simulation", + "age_analysis", + "pressure_regulation", +] diff --git a/app/algorithms/api_ex/run_simulation.py b/app/algorithms/simulation/runner.py similarity index 100% rename from app/algorithms/api_ex/run_simulation.py rename to app/algorithms/simulation/runner.py diff --git a/app/algorithms/simulations.py b/app/algorithms/simulation/scenarios.py similarity index 99% rename from app/algorithms/simulations.py rename to app/algorithms/simulation/scenarios.py index d84f0b5..546bf53 100644 --- a/app/algorithms/simulations.py +++ b/app/algorithms/simulation/scenarios.py @@ -5,7 +5,7 @@ from math import pi, sqrt import pytz import app.services.simulation as simulation -from app.algorithms.api_ex.run_simulation import ( +from app.algorithms.simulation.runner import ( run_simulation_ex, from_clock_to_seconds_2, ) diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index 74168ff..56dea9f 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -17,7 +17,7 @@ from app.services.tjnetwork import ( run_inp, dump_output, ) -from app.algorithms.simulations import ( +from app.algorithms.simulation.scenarios import ( burst_analysis, valve_close_analysis, flushing_analysis, @@ -26,12 +26,12 @@ from app.algorithms.simulations import ( # scheduling_analysis, pressure_regulation, ) -from app.algorithms.sensors import ( +from app.algorithms.sensor import ( pressure_sensor_placement_sensitivity, pressure_sensor_placement_kmeans, ) -import app.algorithms.api_ex.flow_data_clean as flow_data_clean -import app.algorithms.api_ex.pressure_data_clean as pressure_data_clean +import app.algorithms.cleaning.flow as flow_data_clean +import app.algorithms.cleaning.pressure as pressure_data_clean from app.services.network_import import network_update from app.services.simulation_ops import ( project_management, diff --git a/app/infra/db/timescaledb/composite_queries.py b/app/infra/db/timescaledb/composite_queries.py index c62efe1..bfcee73 100644 --- a/app/infra/db/timescaledb/composite_queries.py +++ b/app/infra/db/timescaledb/composite_queries.py @@ -4,9 +4,9 @@ from datetime import datetime, timedelta from psycopg import AsyncConnection import pandas as pd import numpy as np -from app.algorithms.api_ex.flow_data_clean import clean_flow_data_df_kf -from app.algorithms.api_ex.pressure_data_clean import clean_pressure_data_df_km -from app.algorithms.api_ex.pipeline_health_analyzer import PipelineHealthAnalyzer +from app.algorithms.cleaning.flow import clean_flow_data_df_kf +from app.algorithms.cleaning.pressure import clean_pressure_data_df_km +from app.algorithms.health.analyzer import PipelineHealthAnalyzer from app.infra.db.postgresql.internal_queries import InternalQueries from app.infra.db.postgresql.scada_info import ScadaRepository as PostgreScadaRepository diff --git a/app/services/leakage_identifier.py b/app/services/leakage_identifier.py index 38b9663..e90cb24 100644 --- a/app/services/leakage_identifier.py +++ b/app/services/leakage_identifier.py @@ -8,7 +8,7 @@ import numpy as np import pandas as pd import wntr -from app.algorithms.leakage_identifier import LeakageIdentifier +from app.algorithms.leakage.identifier import LeakageIdentifier from app.infra.db.timescaledb.internal_queries import InternalQueries from app.services.scheme_management import ( query_leakage_identify_scheme_detail, diff --git a/app/services/simulation_ops.py b/app/services/simulation_ops.py index 6b2f3b5..5c0dd38 100644 --- a/app/services/simulation_ops.py +++ b/app/services/simulation_ops.py @@ -4,7 +4,7 @@ from math import pi import pytz -from app.algorithms.api_ex.run_simulation import run_simulation_ex +from app.algorithms.simulation.runner import run_simulation_ex from app.infra.epanet.epanet import Output from app.services.tjnetwork import * diff --git a/app/services/valve_isolation.py b/app/services/valve_isolation.py index dc41c05..993b23e 100644 --- a/app/services/valve_isolation.py +++ b/app/services/valve_isolation.py @@ -1,6 +1,6 @@ from typing import Any -from app.algorithms.valve_isolation import valve_isolation_analysis +from app.algorithms.isolation.valve import valve_isolation_analysis def analyze_valve_isolation( diff --git a/scripts/build_pyd.py b/scripts/build_pyd.py index 564f217..fa91129 100644 --- a/scripts/build_pyd.py +++ b/scripts/build_pyd.py @@ -17,7 +17,6 @@ setup(ext_modules=cythonize([ "get_current_status.py", "influxdb_api.py", "influxdb_query_SCADA_data.py", - "sensor_placement.py", "simulation.py", "time_api.py", "api/*.py", diff --git a/scripts/online_Analysis.py b/scripts/online_Analysis.py index e1f9414..34edfe0 100644 --- a/scripts/online_Analysis.py +++ b/scripts/online_Analysis.py @@ -1,7 +1,7 @@ import os from app.services.tjnetwork import * from app.native.wndb.project import copy_project -from app.algorithms.api_ex.run_simulation import run_simulation_ex, from_clock_to_seconds_2 +from app.algorithms.simulation.runner import run_simulation_ex, from_clock_to_seconds_2 from math import sqrt, pi from app.infra.epanet.epanet import Output import json @@ -18,10 +18,10 @@ import geopandas as gpd from sqlalchemy import create_engine import ast import app.services.project_info as project_info -import app.algorithms.api_ex.kmeans_sensor as kmeans_sensor -import app.algorithms.api_ex.flow_data_clean as flow_data_clean -import app.algorithms.api_ex.pressure_data_clean as pressure_data_clean -import app.algorithms.api_ex.sensitivity as sensitivity +import app.algorithms.sensor.kmeans as kmeans_sensor +import app.algorithms.cleaning.flow as flow_data_clean +import app.algorithms.cleaning.pressure as pressure_data_clean +import app.algorithms.sensor.sensitivity as sensitivity from app.core.config import get_pgconn_string diff --git a/tests/unit/test_pipeline_health_analyzer.py b/tests/unit/test_pipeline_health_analyzer.py index 518f51f..6342295 100644 --- a/tests/unit/test_pipeline_health_analyzer.py +++ b/tests/unit/test_pipeline_health_analyzer.py @@ -4,7 +4,7 @@ tests.unit.test_pipeline_health_analyzer 的 Docstring def test_pipeline_health_analyzer(): - from app.algorithms.api_ex.pipeline_health_analyzer import PipelineHealthAnalyzer + from app.algorithms.health.analyzer import PipelineHealthAnalyzer # 初始化分析器,假设模型文件路径为'models/rsf_model.joblib' analyzer = PipelineHealthAnalyzer()