新增管道健康风险预测 api

This commit is contained in:
JIANG
2025-12-17 17:02:22 +08:00
parent 0865bdb2ae
commit e3a04ec297
7 changed files with 297 additions and 18 deletions

1
.gitignore vendored
View File

@@ -4,3 +4,4 @@ db_inp/
temp/
data/
*.dump
api_ex/model/my_survival_forest_model_quxi.joblib

View File

@@ -0,0 +1,83 @@
import time
from typing import List, Optional
from fastapi.logger import logger
import postgresql_info
import psycopg
class InternalQueries:
@staticmethod
def get_links_by_property(
fields: Optional[List[str]] = None,
property_conditions: Optional[dict] = None,
db_name: str = None,
max_retries: int = 3,
) -> List[dict]:
"""
查询pg数据库中,pipes 的指定字段记录或根据属性筛选
:param fields: 要查询的字段列表,如 ["id", "diameter", "status"],默认查询所有字段
:param property: 可选的筛选条件字典,如 {"status": "Open"} 或 {"diameter": 300}
:param db_name: 数据库名称
:param max_retries: 最大重试次数
:return: 包含所有记录的列表,每条记录为一个字典
"""
# 如果未指定字段,查询所有字段
if not fields:
fields = [
"id",
"node1",
"node2",
"length",
"diameter",
"roughness",
"minor_loss",
"status",
]
for attempt in range(max_retries):
try:
conn_string = (
postgresql_info.get_pgconn_string(db_name=db_name)
if db_name
else postgresql_info.get_pgconn_string()
)
with psycopg.Connection.connect(conn_string) as conn:
with conn.cursor() as cur:
# 构建SELECT子句
select_fields = ", ".join(fields)
base_query = f"""
SELECT {select_fields}
FROM public.pipes
"""
# 如果提供了筛选条件构建WHERE子句
if property_conditions:
conditions = []
params = []
for key, value in property_conditions.items():
conditions.append(f"{key} = %s")
params.append(value)
query = base_query + " WHERE " + " AND ".join(conditions)
cur.execute(query, params)
else:
cur.execute(base_query)
records = cur.fetchall()
# 将查询结果转换为字典列表
pipes = []
for record in records:
pipe_dict = {}
for idx, field in enumerate(fields):
pipe_dict[field] = record[idx]
pipes.append(pipe_dict)
return pipes
break # 成功
except Exception as e:
logger.error(f"查询尝试 {attempt + 1} 失败: {e}")
if attempt < max_retries - 1:
time.sleep(1)
else:
raise

View File

@@ -13,6 +13,6 @@ if __name__ == "__main__":
"main:app",
host="0.0.0.0",
port=8000,
workers=2, # 这里可以设置多进程
# workers=2, # 这里可以设置多进程
loop="asyncio",
)

View File

@@ -21,7 +21,7 @@ import globals
import uuid
import project_info
from api.postgresql_info import get_pgconn_string
from timescaledb.internal_queries import InternalStorage as TimescaleInternalStorage
from timescaledb.internal_queries import InternalQueries as TimescaleInternalQueries
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
@@ -774,7 +774,7 @@ def run_simulation(
if globals.reservoirs_id:
# reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'}
# 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'}
reservoir_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
reservoir_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.reservoirs_id.values()),
query_time=modify_pattern_start_time,
)
@@ -798,7 +798,7 @@ def run_simulation(
set_pattern(name_c, cs)
if globals.tanks_id:
# 修改tank初始液位
tank_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
tank_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.tanks_id.values()),
query_time=modify_pattern_start_time,
)
@@ -814,7 +814,7 @@ def run_simulation(
set_tank(name_c, cs)
if globals.fixed_pumps_id:
# 修改工频泵的pattern
fixed_pump_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
fixed_pump_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.fixed_pumps_id.values()),
query_time=modify_pattern_start_time,
)
@@ -838,7 +838,7 @@ def run_simulation(
if globals.variable_pumps_id:
# 修改变频泵的pattern
variable_pump_SCADA_data_dict = (
TimescaleInternalStorage.query_scada_by_ids_time(
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.variable_pumps_id.values()),
query_time=modify_pattern_start_time,
)
@@ -858,7 +858,7 @@ def run_simulation(
set_pattern(name_c, cs)
if globals.demand_id:
# 基于实时数据修改大用户节点的pattern
demand_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time(
demand_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.demand_id.values()),
query_time=modify_pattern_start_time,
)
@@ -885,7 +885,7 @@ def run_simulation(
if globals.source_outflow_pattern_id:
# 基于实时的出厂流量计数据修改出厂流量计绑定的pattern
source_outflow_SCADA_data_dict = (
TimescaleInternalStorage.query_scada_by_ids_time(
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.source_outflow_pattern_id.values()),
query_time=modify_pattern_start_time,
)
@@ -923,7 +923,7 @@ def run_simulation(
if globals.realtime_pipe_flow_pattern_id:
# 基于实时的pipe_flow类数据修改pipe_flow类绑定的pattern
realtime_pipe_flow_SCADA_data_dict = (
TimescaleInternalStorage.query_scada_by_ids_time(
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.realtime_pipe_flow_pattern_id.values()),
query_time=modify_pattern_start_time,
)
@@ -960,7 +960,7 @@ def run_simulation(
query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region)
temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id
temp_realtime_pipe_flow_SCADA_data_dict = (
TimescaleInternalStorage.query_scada_by_ids_time(
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(temp_realtime_pipe_flow_pattern_id.values()),
query_time=modify_pattern_start_time,
)
@@ -1018,13 +1018,13 @@ def run_simulation(
globals.non_realtime_region_patterns.get(region, [])
)
region_source_outflow_data_dict = (
TimescaleInternalStorage.query_scada_by_ids_time(
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=temp_source_outflow_region_id,
query_time=modify_pattern_start_time,
)
)
region_realtime_region_pipe_flow_and_demand_data_dict = (
TimescaleInternalStorage.query_scada_by_ids_time(
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=temp_realtime_region_pipe_flow_and_demand_id,
query_time=modify_pattern_start_time,
)
@@ -1229,11 +1229,11 @@ def run_simulation(
# 存储
starttime = time.time()
if simulation_type.upper() == "REALTIME":
TimescaleInternalStorage.store_realtime_simulation(
TimescaleInternalQueries.store_realtime_simulation(
node_result, link_result, modify_pattern_start_time
)
elif simulation_type.upper() == "EXTENDED":
TimescaleInternalStorage.store_scheme_simulation(
TimescaleInternalQueries.store_scheme_simulation(
scheme_Type,
scheme_Name,
node_result,
@@ -1244,7 +1244,7 @@ def run_simulation(
endtime = time.time()
logging.info("store time: %f", endtime - starttime)
# 暂不需要再次存储 SCADA 模拟信息
# TimescaleInternalStorage.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name)
# TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name)
# if simulation_type.upper() == "REALTIME":
# influxdb_api.store_realtime_simulation_result_to_influxdb(

View File

@@ -1,10 +1,14 @@
import time
from typing import List, Optional, Any, Dict, Tuple
from datetime import datetime
from datetime import datetime, timedelta
from psycopg import AsyncConnection
import pandas as pd
import numpy as np
from api_ex.Fdataclean import clean_flow_data_df_kf
from api_ex.Pdataclean import clean_pressure_data_df_km
from api_ex.pipeline_health_analyzer import PipelineHealthAnalyzer
from postgresql.internal_queries import InternalQueries
from postgresql.scada_info import ScadaRepository as PostgreScadaRepository
from timescaledb.schemas.realtime import RealtimeRepository
from timescaledb.schemas.scheme import SchemeRepository
@@ -450,3 +454,158 @@ class CompositeQueries:
return "success"
except Exception as e:
return f"error: {str(e)}"
@staticmethod
async def predict_pipeline_health(
timescale_conn: AsyncConnection,
db_name: str,
query_time: datetime,
) -> List[Dict[str, Any]]:
"""
预测管道健康状况
根据管网名称和当前时间,查询管道信息和实时数据,
使用随机生存森林模型预测管道的生存概率
Args:
timescale_conn: TimescaleDB 异步连接
db_name: 管网数据库名称
query_time: 查询时间
property_conditions: 可选的管道筛选条件,如 {"diameter": 300}
Returns:
预测结果列表,每个元素包含 link_id 和对应的生存函数
Raises:
ValueError: 当参数无效或数据不足时
FileNotFoundError: 当模型文件未找到时
"""
try:
perf_start_time = time.time()
# 1. 准备时间范围查询时间前后1秒
start_time = query_time - timedelta(seconds=1)
end_time = query_time + timedelta(seconds=1)
# 2. 先查询流速数据velocity获取有数据的管道ID列表
velocity_data = await RealtimeRepository.get_links_field_by_time_range(
timescale_conn, start_time, end_time, "velocity"
)
if not velocity_data:
raise ValueError("未找到流速数据")
# 3. 只查询有流速数据的管道的基本信息
valid_link_ids = list(velocity_data.keys())
# 批量查询这些管道的详细信息
fields = ["id", "diameter", "node1", "node2"]
all_links = InternalQueries.get_links_by_property(
fields=fields,
db_name=db_name,
)
# 转换为字典以快速查找
links_dict = {link["id"]: link for link in all_links}
# 获取所有需要查询的节点ID
node_ids = set()
for link_id in valid_link_ids:
if link_id in links_dict:
link = links_dict[link_id]
node_ids.add(link["node1"])
node_ids.add(link["node2"])
# 4. 批量查询压力数据pressure
pressure_data = await RealtimeRepository.get_nodes_field_by_time_range(
timescale_conn, start_time, end_time, "pressure"
)
# 5. 组合数据结构
materials = []
diameters = []
velocities = []
pressures = []
link_ids = []
for link_id in valid_link_ids:
# 跳过不在管道字典中的ID如泵等其他元素
if link_id not in links_dict:
continue
link = links_dict[link_id]
diameter = link["diameter"]
node1 = link["node1"]
node2 = link["node2"]
# 获取流速数据
velocity_values = velocity_data[link_id]
velocity = velocity_values[-1]["value"] if velocity_values else 0
# 获取node1和node2的压力数据计算平均值
node1_pressure = 0
node2_pressure = 0
if node1 in pressure_data and pressure_data[node1]:
pressure_values = pressure_data[node1]
node1_pressure = (
pressure_values[-1]["value"] if pressure_values else 0
)
if node2 in pressure_data and pressure_data[node2]:
pressure_values = pressure_data[node2]
node2_pressure = (
pressure_values[-1]["value"] if pressure_values else 0
)
# 计算平均压力
avg_pressure = (node1_pressure + node2_pressure) / 2
# 添加到列表
link_ids.append(link_id)
materials.append(7) # 默认材料类型为7可根据实际情况调整
diameters.append(diameter)
velocities.append(velocity)
pressures.append(avg_pressure)
if not link_ids:
raise ValueError("没有找到有效的管道数据用于预测")
# 6. 创建DataFrame
data = pd.DataFrame(
{
"Material": materials,
"Diameter": diameters,
"Flow Velocity": velocities,
"Pressure": pressures,
}
)
# 7. 使用PipelineHealthAnalyzer进行预测
analyzer = PipelineHealthAnalyzer(
model_path="api_ex/model/my_survival_forest_model_quxi.joblib"
)
survival_functions = analyzer.predict_survival(data)
print("预测管道健康耗时: {:.2f}".format(time.time() - perf_start_time))
# 8. 组合结果
results = []
for i, link_id in enumerate(link_ids):
sf = survival_functions[i]
results.append(
{
"link_id": link_id,
"diameter": diameters[i],
"velocity": velocities[i],
"pressure": pressures[i],
"survival_function": {
"x": sf.x.tolist(), # 时间点(年)
"y": sf.y.tolist(), # 生存概率
"a": float(sf.a),
"b": float(sf.b),
},
}
)
return results
except Exception as e:
raise ValueError(f"管道健康预测失败: {str(e)}")

View File

@@ -9,8 +9,6 @@ from timescaledb.schemas.scada import ScadaRepository
import psycopg
import time
# 内部使用存储类
class InternalStorage:
@staticmethod
@@ -78,6 +76,8 @@ class InternalStorage:
else:
raise # 达到最大重试次数后抛出异常
class InternalQueries:
@staticmethod
def query_scada_by_ids_time(
device_ids: List[str],

View File

@@ -594,3 +594,39 @@ async def clean_scada_data(
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.get("/composite/pipeline-health-prediction")
async def predict_pipeline_health(
query_time: datetime = Query(..., description="查询时间"),
db_name: str = Query(..., description="管网数据库名称"),
timescale_conn: AsyncConnection = Depends(get_database_connection),
):
"""
预测管道健康状况
根据管网名称和当前时间,查询管道信息和实时数据,
使用随机生存森林模型预测管道的生存概率
Args:
query_time: 查询时间
db_name: 管网数据库名称
Returns:
预测结果列表,每个元素包含 link_id 和对应的生存函数
"""
try:
result = await CompositeQueries.predict_pipeline_health(
timescale_conn, db_name, query_time
)
return {
"success": True,
"result": result,
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except FileNotFoundError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")