diff --git a/.gitignore b/.gitignore index 5d783af..7c04b63 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ db_inp/ temp/ data/ *.dump +api_ex/model/my_survival_forest_model_quxi.joblib diff --git a/postgresql/internal_queries.py b/postgresql/internal_queries.py new file mode 100644 index 0000000..a0209b2 --- /dev/null +++ b/postgresql/internal_queries.py @@ -0,0 +1,83 @@ +import time +from typing import List, Optional + +from fastapi.logger import logger +import postgresql_info +import psycopg + + +class InternalQueries: + @staticmethod + def get_links_by_property( + fields: Optional[List[str]] = None, + property_conditions: Optional[dict] = None, + db_name: str = None, + max_retries: int = 3, + ) -> List[dict]: + """ + 查询pg数据库中,pipes 的指定字段记录或根据属性筛选 + :param fields: 要查询的字段列表,如 ["id", "diameter", "status"],默认查询所有字段 + :param property: 可选的筛选条件字典,如 {"status": "Open"} 或 {"diameter": 300} + :param db_name: 数据库名称 + :param max_retries: 最大重试次数 + :return: 包含所有记录的列表,每条记录为一个字典 + """ + # 如果未指定字段,查询所有字段 + if not fields: + fields = [ + "id", + "node1", + "node2", + "length", + "diameter", + "roughness", + "minor_loss", + "status", + ] + + for attempt in range(max_retries): + try: + conn_string = ( + postgresql_info.get_pgconn_string(db_name=db_name) + if db_name + else postgresql_info.get_pgconn_string() + ) + with psycopg.Connection.connect(conn_string) as conn: + with conn.cursor() as cur: + # 构建SELECT子句 + select_fields = ", ".join(fields) + base_query = f""" + SELECT {select_fields} + FROM public.pipes + """ + + # 如果提供了筛选条件,构建WHERE子句 + if property_conditions: + conditions = [] + params = [] + for key, value in property_conditions.items(): + conditions.append(f"{key} = %s") + params.append(value) + + query = base_query + " WHERE " + " AND ".join(conditions) + cur.execute(query, params) + else: + cur.execute(base_query) + + records = cur.fetchall() + # 将查询结果转换为字典列表 + pipes = [] + for record in records: + pipe_dict = {} + for idx, field in enumerate(fields): + pipe_dict[field] = record[idx] + pipes.append(pipe_dict) + + return pipes + break # 成功 + except Exception as e: + logger.error(f"查询尝试 {attempt + 1} 失败: {e}") + if attempt < max_retries - 1: + time.sleep(1) + else: + raise diff --git a/run_server.py b/run_server.py index 958ce34..8f7d80d 100644 --- a/run_server.py +++ b/run_server.py @@ -13,6 +13,6 @@ if __name__ == "__main__": "main:app", host="0.0.0.0", port=8000, - workers=2, # 这里可以设置多进程 + # workers=2, # 这里可以设置多进程 loop="asyncio", ) diff --git a/simulation.py b/simulation.py index 61ab738..71389fd 100644 --- a/simulation.py +++ b/simulation.py @@ -21,7 +21,7 @@ import globals import uuid import project_info from api.postgresql_info import get_pgconn_string -from timescaledb.internal_queries import InternalStorage as TimescaleInternalStorage +from timescaledb.internal_queries import InternalQueries as TimescaleInternalQueries logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" @@ -774,7 +774,7 @@ def run_simulation( if globals.reservoirs_id: # reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'} # 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'} - reservoir_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + reservoir_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(globals.reservoirs_id.values()), query_time=modify_pattern_start_time, ) @@ -798,7 +798,7 @@ def run_simulation( set_pattern(name_c, cs) if globals.tanks_id: # 修改tank初始液位 - tank_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + tank_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(globals.tanks_id.values()), query_time=modify_pattern_start_time, ) @@ -814,7 +814,7 @@ def run_simulation( set_tank(name_c, cs) if globals.fixed_pumps_id: # 修改工频泵的pattern - fixed_pump_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + fixed_pump_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(globals.fixed_pumps_id.values()), query_time=modify_pattern_start_time, ) @@ -838,7 +838,7 @@ def run_simulation( if globals.variable_pumps_id: # 修改变频泵的pattern variable_pump_SCADA_data_dict = ( - TimescaleInternalStorage.query_scada_by_ids_time( + TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time, ) @@ -858,7 +858,7 @@ def run_simulation( set_pattern(name_c, cs) if globals.demand_id: # 基于实时数据,修改大用户节点的pattern - demand_SCADA_data_dict = TimescaleInternalStorage.query_scada_by_ids_time( + demand_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(globals.demand_id.values()), query_time=modify_pattern_start_time, ) @@ -885,7 +885,7 @@ def run_simulation( if globals.source_outflow_pattern_id: # 基于实时的出厂流量计数据,修改出厂流量计绑定的pattern source_outflow_SCADA_data_dict = ( - TimescaleInternalStorage.query_scada_by_ids_time( + TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time, ) @@ -923,7 +923,7 @@ def run_simulation( if globals.realtime_pipe_flow_pattern_id: # 基于实时的pipe_flow类数据,修改pipe_flow类绑定的pattern realtime_pipe_flow_SCADA_data_dict = ( - TimescaleInternalStorage.query_scada_by_ids_time( + TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(globals.realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time, ) @@ -960,7 +960,7 @@ def run_simulation( query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region) temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id temp_realtime_pipe_flow_SCADA_data_dict = ( - TimescaleInternalStorage.query_scada_by_ids_time( + TimescaleInternalQueries.query_scada_by_ids_time( device_ids=list(temp_realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time, ) @@ -1018,13 +1018,13 @@ def run_simulation( globals.non_realtime_region_patterns.get(region, []) ) region_source_outflow_data_dict = ( - TimescaleInternalStorage.query_scada_by_ids_time( + TimescaleInternalQueries.query_scada_by_ids_time( device_ids=temp_source_outflow_region_id, query_time=modify_pattern_start_time, ) ) region_realtime_region_pipe_flow_and_demand_data_dict = ( - TimescaleInternalStorage.query_scada_by_ids_time( + TimescaleInternalQueries.query_scada_by_ids_time( device_ids=temp_realtime_region_pipe_flow_and_demand_id, query_time=modify_pattern_start_time, ) @@ -1229,11 +1229,11 @@ def run_simulation( # 存储 starttime = time.time() if simulation_type.upper() == "REALTIME": - TimescaleInternalStorage.store_realtime_simulation( + TimescaleInternalQueries.store_realtime_simulation( node_result, link_result, modify_pattern_start_time ) elif simulation_type.upper() == "EXTENDED": - TimescaleInternalStorage.store_scheme_simulation( + TimescaleInternalQueries.store_scheme_simulation( scheme_Type, scheme_Name, node_result, @@ -1244,7 +1244,7 @@ def run_simulation( endtime = time.time() logging.info("store time: %f", endtime - starttime) # 暂不需要再次存储 SCADA 模拟信息 - # TimescaleInternalStorage.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) + # TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_Type=scheme_Type, scheme_Name=scheme_Name) # if simulation_type.upper() == "REALTIME": # influxdb_api.store_realtime_simulation_result_to_influxdb( diff --git a/timescaledb/composite_queries.py b/timescaledb/composite_queries.py index a37e242..620da6b 100644 --- a/timescaledb/composite_queries.py +++ b/timescaledb/composite_queries.py @@ -1,10 +1,14 @@ +import time from typing import List, Optional, Any, Dict, Tuple -from datetime import datetime +from datetime import datetime, timedelta from psycopg import AsyncConnection import pandas as pd +import numpy as np from api_ex.Fdataclean import clean_flow_data_df_kf from api_ex.Pdataclean import clean_pressure_data_df_km +from api_ex.pipeline_health_analyzer import PipelineHealthAnalyzer +from postgresql.internal_queries import InternalQueries from postgresql.scada_info import ScadaRepository as PostgreScadaRepository from timescaledb.schemas.realtime import RealtimeRepository from timescaledb.schemas.scheme import SchemeRepository @@ -450,3 +454,158 @@ class CompositeQueries: return "success" except Exception as e: return f"error: {str(e)}" + + @staticmethod + async def predict_pipeline_health( + timescale_conn: AsyncConnection, + db_name: str, + query_time: datetime, + ) -> List[Dict[str, Any]]: + """ + 预测管道健康状况 + + 根据管网名称和当前时间,查询管道信息和实时数据, + 使用随机生存森林模型预测管道的生存概率 + + Args: + timescale_conn: TimescaleDB 异步连接 + db_name: 管网数据库名称 + query_time: 查询时间 + property_conditions: 可选的管道筛选条件,如 {"diameter": 300} + + Returns: + 预测结果列表,每个元素包含 link_id 和对应的生存函数 + + Raises: + ValueError: 当参数无效或数据不足时 + FileNotFoundError: 当模型文件未找到时 + """ + try: + perf_start_time = time.time() + # 1. 准备时间范围(查询时间前后1秒) + start_time = query_time - timedelta(seconds=1) + end_time = query_time + timedelta(seconds=1) + + # 2. 先查询流速数据(velocity),获取有数据的管道ID列表 + velocity_data = await RealtimeRepository.get_links_field_by_time_range( + timescale_conn, start_time, end_time, "velocity" + ) + + if not velocity_data: + raise ValueError("未找到流速数据") + + # 3. 只查询有流速数据的管道的基本信息 + valid_link_ids = list(velocity_data.keys()) + + # 批量查询这些管道的详细信息 + fields = ["id", "diameter", "node1", "node2"] + all_links = InternalQueries.get_links_by_property( + fields=fields, + db_name=db_name, + ) + + # 转换为字典以快速查找 + links_dict = {link["id"]: link for link in all_links} + + # 获取所有需要查询的节点ID + node_ids = set() + for link_id in valid_link_ids: + if link_id in links_dict: + link = links_dict[link_id] + node_ids.add(link["node1"]) + node_ids.add(link["node2"]) + + # 4. 批量查询压力数据(pressure) + pressure_data = await RealtimeRepository.get_nodes_field_by_time_range( + timescale_conn, start_time, end_time, "pressure" + ) + + # 5. 组合数据结构 + materials = [] + diameters = [] + velocities = [] + pressures = [] + link_ids = [] + + for link_id in valid_link_ids: + # 跳过不在管道字典中的ID(如泵等其他元素) + if link_id not in links_dict: + continue + + link = links_dict[link_id] + diameter = link["diameter"] + node1 = link["node1"] + node2 = link["node2"] + + # 获取流速数据 + velocity_values = velocity_data[link_id] + velocity = velocity_values[-1]["value"] if velocity_values else 0 + + # 获取node1和node2的压力数据,计算平均值 + node1_pressure = 0 + node2_pressure = 0 + + if node1 in pressure_data and pressure_data[node1]: + pressure_values = pressure_data[node1] + node1_pressure = ( + pressure_values[-1]["value"] if pressure_values else 0 + ) + + if node2 in pressure_data and pressure_data[node2]: + pressure_values = pressure_data[node2] + node2_pressure = ( + pressure_values[-1]["value"] if pressure_values else 0 + ) + + # 计算平均压力 + avg_pressure = (node1_pressure + node2_pressure) / 2 + + # 添加到列表 + link_ids.append(link_id) + materials.append(7) # 默认材料类型为7,可根据实际情况调整 + diameters.append(diameter) + velocities.append(velocity) + pressures.append(avg_pressure) + + if not link_ids: + raise ValueError("没有找到有效的管道数据用于预测") + + # 6. 创建DataFrame + data = pd.DataFrame( + { + "Material": materials, + "Diameter": diameters, + "Flow Velocity": velocities, + "Pressure": pressures, + } + ) + + # 7. 使用PipelineHealthAnalyzer进行预测 + analyzer = PipelineHealthAnalyzer( + model_path="api_ex/model/my_survival_forest_model_quxi.joblib" + ) + survival_functions = analyzer.predict_survival(data) + print("预测管道健康耗时: {:.2f} 秒".format(time.time() - perf_start_time)) + # 8. 组合结果 + results = [] + for i, link_id in enumerate(link_ids): + sf = survival_functions[i] + results.append( + { + "link_id": link_id, + "diameter": diameters[i], + "velocity": velocities[i], + "pressure": pressures[i], + "survival_function": { + "x": sf.x.tolist(), # 时间点(年) + "y": sf.y.tolist(), # 生存概率 + "a": float(sf.a), + "b": float(sf.b), + }, + } + ) + + return results + + except Exception as e: + raise ValueError(f"管道健康预测失败: {str(e)}") diff --git a/timescaledb/internal_queries.py b/timescaledb/internal_queries.py index 9bbc724..652bfd0 100644 --- a/timescaledb/internal_queries.py +++ b/timescaledb/internal_queries.py @@ -9,8 +9,6 @@ from timescaledb.schemas.scada import ScadaRepository import psycopg import time -# 内部使用存储类 - class InternalStorage: @staticmethod @@ -78,6 +76,8 @@ class InternalStorage: else: raise # 达到最大重试次数后抛出异常 + +class InternalQueries: @staticmethod def query_scada_by_ids_time( device_ids: List[str], diff --git a/timescaledb/router.py b/timescaledb/router.py index 4bab5c4..4d4ea19 100644 --- a/timescaledb/router.py +++ b/timescaledb/router.py @@ -594,3 +594,39 @@ async def clean_scada_data( ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) + + +@router.get("/composite/pipeline-health-prediction") +async def predict_pipeline_health( + query_time: datetime = Query(..., description="查询时间"), + db_name: str = Query(..., description="管网数据库名称"), + timescale_conn: AsyncConnection = Depends(get_database_connection), +): + """ + 预测管道健康状况 + + 根据管网名称和当前时间,查询管道信息和实时数据, + 使用随机生存森林模型预测管道的生存概率 + + Args: + query_time: 查询时间 + db_name: 管网数据库名称 + + Returns: + 预测结果列表,每个元素包含 link_id 和对应的生存函数 + """ + try: + result = await CompositeQueries.predict_pipeline_health( + timescale_conn, db_name, query_time + ) + + return { + "success": True, + "result": result, + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except FileNotFoundError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + raise HTTPException(status_code=500, detail=f"内部服务器错误: {str(e)}")