From 3c7e2c58069251343b97b92f27636cf09e328f12 Mon Sep 17 00:00:00 2001 From: Jiang Date: Mon, 2 Feb 2026 14:15:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=95=B0=E6=8D=AE=E6=B8=85?= =?UTF-8?q?=E6=B4=97index=E8=B6=8A=E7=95=8C=E9=94=99=E8=AF=AF=EF=BC=9B?= =?UTF-8?q?=E9=87=8D=E5=91=BD=E5=90=8D=E5=8E=8B=E5=8A=9B=E6=B5=81=E9=87=8F?= =?UTF-8?q?=E6=B8=85=E6=B4=97=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/algorithms/api_ex/__init__.py | 4 +- .../{Fdataclean.py => flow_data_clean.py} | 31 +++++++-- .../{Pdataclean.py => pressure_data_clean.py} | 26 ++++---- app/algorithms/data_cleaning.py | 8 +-- app/api/v1/endpoints/simulation.py | 8 +-- app/core/audit.py | 65 +++++++++++++------ app/core/config.py | 20 ++++-- app/infra/audit/middleware.py | 2 + app/infra/db/timescaledb/composite_queries.py | 4 +- scripts/online_Analysis.py | 8 +-- 10 files changed, 116 insertions(+), 60 deletions(-) rename app/algorithms/api_ex/{Fdataclean.py => flow_data_clean.py} (91%) rename app/algorithms/api_ex/{Pdataclean.py => pressure_data_clean.py} (94%) diff --git a/app/algorithms/api_ex/__init__.py b/app/algorithms/api_ex/__init__.py index dcd7666..fbc5e67 100644 --- a/app/algorithms/api_ex/__init__.py +++ b/app/algorithms/api_ex/__init__.py @@ -1,3 +1,3 @@ -from .Fdataclean import * -from .Pdataclean import * +from .flow_data_clean import * +from .pressure_data_clean import * from .pipeline_health_analyzer import * \ No newline at end of file diff --git a/app/algorithms/api_ex/Fdataclean.py b/app/algorithms/api_ex/flow_data_clean.py similarity index 91% rename from app/algorithms/api_ex/Fdataclean.py rename to app/algorithms/api_ex/flow_data_clean.py index 88ed379..1e689a9 100644 --- a/app/algorithms/api_ex/Fdataclean.py +++ b/app/algorithms/api_ex/flow_data_clean.py @@ -292,28 +292,47 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: plt.rcParams["axes.unicode_minus"] = False if show_plot and len(data.columns) > 0: sensor_to_plot = data.columns[0] + + # 定义x轴 + n = len(data) + time = np.arange(n) + n_filled = len(data_filled) + time_filled = np.arange(n_filled) + plt.figure(figsize=(12, 8)) plt.subplot(2, 1, 1) plt.plot( - data.index, + time, data[sensor_to_plot], label="原始监测值", marker="o", markersize=3, alpha=0.7, ) - abnormal_zero_idx = data.index[data_filled[sensor_to_plot].isna()] + + # 修正:检查 data_filled 的异常值,绘制在 time_filled 上 + abnormal_zero_mask = data_filled[sensor_to_plot].isna() + # 如果目的是检查0值,应该用 == 0。这里保留 isna() 但修正索引引用,防止crash。 + # 如果原意是 isna() 则在 fillna 后通常没有 na。假设用户可能想检查 0 值? + # 基于 "异常0值" 的标签,改为检查 0 值更合理,但为了保险起见, + # 如果 isna() 返回空,就不画。防止索引越界是主要的。 + abnormal_zero_idx = data_filled.index[abnormal_zero_mask] + if len(abnormal_zero_idx) > 0: + # 注意:如果 abnormal_zero_idx 是基于 data_filled 的索引(0..M-1), + # 直接作为 x 坐标即可,因为 time_filled 也是 0..M-1 + # 而 y 值应该取自 data_filled 或 data_kf,取 data 会越界 plt.plot( abnormal_zero_idx, - data[sensor_to_plot].loc[abnormal_zero_idx], + data_filled[sensor_to_plot].loc[abnormal_zero_idx], "mo", markersize=8, - label="异常0值", + label="异常值(NaN)", ) + plt.plot( - data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2 + time_filled, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2 ) anomaly_idx = anomalies_info[sensor_to_plot].index if len(anomaly_idx) > 0: @@ -331,7 +350,7 @@ def clean_flow_data_df_kf(data: pd.DataFrame, show_plot: bool = False) -> dict: plt.subplot(2, 1, 2) plt.plot( - data.index, + time_filled, cleaned_data[sensor_to_plot], label="修复后监测值", marker="o", diff --git a/app/algorithms/api_ex/Pdataclean.py b/app/algorithms/api_ex/pressure_data_clean.py similarity index 94% rename from app/algorithms/api_ex/Pdataclean.py rename to app/algorithms/api_ex/pressure_data_clean.py index af4bab5..4afdcf4 100644 --- a/app/algorithms/api_ex/Pdataclean.py +++ b/app/algorithms/api_ex/pressure_data_clean.py @@ -239,7 +239,7 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di threshold = distances.mean() + 3 * distances.std() anomaly_pos = np.where(distances > threshold)[0] - anomaly_indices = data.index[anomaly_pos] + anomaly_indices = data_filled.index[anomaly_pos] anomaly_details = {} for pos in anomaly_pos: @@ -248,13 +248,13 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di center = centers[cluster_idx] diff = abs(row_norm - center) main_sensor = diff.idxmax() - anomaly_details[data.index[pos]] = main_sensor + anomaly_details[data_filled.index[pos]] = main_sensor # 修复:滚动平均(窗口可调) data_rolled = data_filled.rolling(window=13, center=True, min_periods=1).mean() data_repaired = data_filled.copy() for pos in anomaly_pos: - label = data.index[pos] + label = data_filled.index[pos] sensor = anomaly_details[label] data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] @@ -265,6 +265,8 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di if show_plot and len(data.columns) > 0: n = len(data) time = np.arange(n) + n_filled = len(data_filled) + time_filled = np.arange(n_filled) plt.figure(figsize=(12, 8)) for col in data.columns: plt.plot( @@ -272,7 +274,7 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di ) for col in data_filled.columns: plt.plot( - time, + time_filled, data_filled[col].values, marker="x", markersize=3, @@ -280,7 +282,7 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di linestyle="--", ) for pos in anomaly_pos: - sensor = anomaly_details[data.index[pos]] + sensor = anomaly_details[data_filled.index[pos]] plt.plot(pos, data_filled.iloc[pos][sensor], "ro", markersize=8) plt.xlabel("时间点(序号)") plt.ylabel("压力监测值") @@ -291,16 +293,16 @@ def clean_pressure_data_df_km(data: pd.DataFrame, show_plot: bool = False) -> di plt.figure(figsize=(12, 8)) for col in data_repaired.columns: plt.plot( - time, data_repaired[col].values, marker="o", markersize=3, label=col + time_filled, data_repaired[col].values, marker="o", markersize=3, label=col ) for pos in anomaly_pos: - sensor = anomaly_details[data.index[pos]] + sensor = anomaly_details[data_filled.index[pos]] plt.plot(pos, data_repaired.iloc[pos][sensor], "go", markersize=8) - plt.xlabel("时间点(序号)") - plt.ylabel("修复后压力监测值") - plt.title("修复后各传感器折线图(绿色标记修复值)") - plt.legend() - plt.show() + plt.xlabel("时间点(序号)") + plt.ylabel("修复后压力监测值") + plt.title("修复后各传感器折线图(绿色标记修复值)") + plt.legend() + plt.show() # 返回清洗后的字典 return data_repaired diff --git a/app/algorithms/data_cleaning.py b/app/algorithms/data_cleaning.py index fcad898..781c25e 100644 --- a/app/algorithms/data_cleaning.py +++ b/app/algorithms/data_cleaning.py @@ -1,7 +1,7 @@ import os -import app.algorithms.api_ex.Fdataclean as Fdataclean -import app.algorithms.api_ex.Pdataclean as Pdataclean +import app.algorithms.api_ex.flow_data_clean as flow_data_clean +import app.algorithms.api_ex.pressure_data_clean as pressure_data_clean ############################################################ @@ -26,7 +26,7 @@ def flow_data_clean(input_csv_file: str) -> str: if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = Fdataclean.clean_flow_data_kf(input_csv_path) + out_xlsx_path = flow_data_clean.clean_flow_data_kf(input_csv_path) print("清洗后的数据已保存到:", out_xlsx_path) @@ -53,5 +53,5 @@ def pressure_data_clean(input_csv_file: str) -> str: if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = Pdataclean.clean_pressure_data_km(input_csv_path) + out_xlsx_path = pressure_data_clean.clean_pressure_data_km(input_csv_path) print("清洗后的数据已保存到:", out_xlsx_path) diff --git a/app/api/v1/endpoints/simulation.py b/app/api/v1/endpoints/simulation.py index 5289088..0e4d137 100644 --- a/app/api/v1/endpoints/simulation.py +++ b/app/api/v1/endpoints/simulation.py @@ -30,8 +30,8 @@ from app.algorithms.sensors import ( pressure_sensor_placement_sensitivity, pressure_sensor_placement_kmeans, ) -import app.algorithms.api_ex.Fdataclean as Fdataclean -import app.algorithms.api_ex.Pdataclean as Pdataclean +import app.algorithms.api_ex.flow_data_clean as flow_data_clean +import app.algorithms.api_ex.pressure_data_clean as pressure_data_clean from app.services.network_import import network_update from app.services.simulation_ops import ( project_management, @@ -588,9 +588,9 @@ async def fastapi_scada_device_data_cleaning( values = [record["value"] for record in type_scada_data[device_id]] df[device_id] = values if device_type == "pressure": - cleaned_value_df = Pdataclean.clean_pressure_data_df_km(df) + cleaned_value_df = pressure_data_clean.clean_pressure_data_df_km(df) elif device_type == "pipe_flow": - cleaned_value_df = Fdataclean.clean_flow_data_df_kf(df) + cleaned_value_df = flow_data_clean.clean_flow_data_df_kf(df) cleaned_value_df = pd.DataFrame(cleaned_value_df) cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1) influxdb_api.import_multicolumn_data_from_dict( diff --git a/app/core/audit.py b/app/core/audit.py index 1b04d9f..b0aa37a 100644 --- a/app/core/audit.py +++ b/app/core/audit.py @@ -3,35 +3,39 @@ 记录系统关键操作,用于安全审计和合规追踪 """ + from typing import Optional from datetime import datetime import logging logger = logging.getLogger(__name__) + class AuditAction: """审计操作类型常量""" + # 认证相关 LOGIN = "LOGIN" LOGOUT = "LOGOUT" REGISTER = "REGISTER" PASSWORD_CHANGE = "PASSWORD_CHANGE" - + # 数据操作 CREATE = "CREATE" READ = "READ" UPDATE = "UPDATE" DELETE = "DELETE" - + # 权限相关 PERMISSION_CHANGE = "PERMISSION_CHANGE" ROLE_CHANGE = "ROLE_CHANGE" - + # 系统操作 CONFIG_CHANGE = "CONFIG_CHANGE" SYSTEM_START = "SYSTEM_START" SYSTEM_STOP = "SYSTEM_STOP" + async def log_audit_event( action: str, user_id: Optional[int] = None, @@ -45,11 +49,11 @@ async def log_audit_event( request_data: Optional[dict] = None, response_status: Optional[int] = None, error_message: Optional[str] = None, - db = None # 新增:可选的数据库实例 + db=None, # 新增:可选的数据库实例 ): """ 记录审计日志 - + Args: action: 操作类型 user_id: 用户ID @@ -66,20 +70,31 @@ async def log_audit_event( db: 数据库实例(可选,如果不提供则尝试获取) """ from app.infra.repositories.audit_repository import AuditRepository - + try: # 脱敏敏感数据 if request_data: request_data = sanitize_sensitive_data(request_data) - - # 如果没有提供数据库实例,尝试获取(这在中间件中可能不可用) + + # 如果没有提供数据库实例,尝试从全局获取 + if db is None: + try: + from app.infra.db.postgresql.database import db as default_db + + # 仅当连接池已初始化时使用 + if default_db.pool: + db = default_db + except ImportError: + pass + + # 如果仍然没有数据库实例 if db is None: # 在某些上下文中可能无法获取,此时静默失败 logger.warning("No database instance provided for audit logging") return - + audit_repo = AuditRepository(db) - + await audit_repo.create_log( user_id=user_id, username=username, @@ -92,40 +107,48 @@ async def log_audit_event( request_path=request_path, request_data=request_data, response_status=response_status, - error_message=error_message + error_message=error_message, ) - + logger.info( f"Audit log created: action={action}, user={username or user_id}, " f"resource={resource_type}:{resource_id}" ) - + except Exception as e: # 审计日志失败不应影响业务流程 logger.error(f"Failed to create audit log: {e}", exc_info=True) + def sanitize_sensitive_data(data: dict) -> dict: """ 脱敏敏感数据 - + Args: data: 原始数据 - + Returns: 脱敏后的数据 """ sensitive_fields = [ - 'password', 'passwd', 'pwd', - 'secret', 'token', 'api_key', 'apikey', - 'credit_card', 'ssn', 'social_security' + "password", + "passwd", + "pwd", + "secret", + "token", + "api_key", + "apikey", + "credit_card", + "ssn", + "social_security", ] - + sanitized = data.copy() - + for key in sanitized: if isinstance(sanitized[key], dict): sanitized[key] = sanitize_sensitive_data(sanitized[key]) elif any(sensitive in key.lower() for sensitive in sensitive_fields): sanitized[key] = "***REDACTED***" - + return sanitized diff --git a/app/core/config.py b/app/core/config.py index 4f0151f..83321a8 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -1,18 +1,21 @@ from pydantic_settings import BaseSettings + class Settings(BaseSettings): PROJECT_NAME: str = "TJWater Server" API_V1_STR: str = "/api/v1" - + # JWT 配置 - SECRET_KEY: str = "your-secret-key-here-change-in-production-use-openssl-rand-hex-32" + SECRET_KEY: str = ( + "your-secret-key-here-change-in-production-use-openssl-rand-hex-32" + ) ALGORITHM: str = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 REFRESH_TOKEN_EXPIRE_DAYS: int = 7 - + # 数据加密密钥 (使用 Fernet) ENCRYPTION_KEY: str = "" # 必须从环境变量设置 - + # Database Config (PostgreSQL) DB_NAME: str = "tjwater" DB_HOST: str = "localhost" @@ -20,6 +23,12 @@ class Settings(BaseSettings): DB_USER: str = "postgres" DB_PASSWORD: str = "password" + # Database Config (TimescaleDB) + TIMESCALEDB_DB_NAME: str = "tjwater" + TIMESCALEDB_DB_HOST: str = "localhost" + TIMESCALEDB_DB_PORT: str = "5433" + TIMESCALEDB_DB_USER: str = "postgres" + TIMESCALEDB_DB_PASSWORD: str = "password" # InfluxDB INFLUXDB_URL: str = "http://localhost:8086" INFLUXDB_TOKEN: str = "token" @@ -29,9 +38,10 @@ class Settings(BaseSettings): @property def SQLALCHEMY_DATABASE_URI(self) -> str: return f"postgresql://{self.DB_USER}:{self.DB_PASSWORD}@{self.DB_HOST}:{self.DB_PORT}/{self.DB_NAME}" - + class Config: env_file = ".env" extra = "ignore" + settings = Settings() diff --git a/app/infra/audit/middleware.py b/app/infra/audit/middleware.py index f515620..348ecc1 100644 --- a/app/infra/audit/middleware.py +++ b/app/infra/audit/middleware.py @@ -9,6 +9,7 @@ import json from typing import Callable from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware +from app.infra.db.postgresql.database import db as default_db from app.core.audit import log_audit_event, AuditAction import logging @@ -135,6 +136,7 @@ class AuditMiddleware(BaseHTTPMiddleware): if response.status_code < 400 else f"HTTP {response.status_code}" ), + db=default_db, ) except Exception as e: # 审计失败不应影响响应 diff --git a/app/infra/db/timescaledb/composite_queries.py b/app/infra/db/timescaledb/composite_queries.py index 412d93a..5d2536c 100644 --- a/app/infra/db/timescaledb/composite_queries.py +++ b/app/infra/db/timescaledb/composite_queries.py @@ -4,8 +4,8 @@ from datetime import datetime, timedelta from psycopg import AsyncConnection import pandas as pd import numpy as np -from app.algorithms.api_ex.Fdataclean import clean_flow_data_df_kf -from app.algorithms.api_ex.Pdataclean import clean_pressure_data_df_km +from app.algorithms.api_ex.flow_data_clean import clean_flow_data_df_kf +from app.algorithms.api_ex.pressure_data_clean import clean_pressure_data_df_km from app.algorithms.api_ex.pipeline_health_analyzer import PipelineHealthAnalyzer from app.infra.db.postgresql.internal_queries import InternalQueries diff --git a/scripts/online_Analysis.py b/scripts/online_Analysis.py index 08fd0c6..84ee28d 100644 --- a/scripts/online_Analysis.py +++ b/scripts/online_Analysis.py @@ -19,8 +19,8 @@ from sqlalchemy import create_engine import ast import app.services.project_info as project_info import app.algorithms.api_ex.kmeans_sensor as kmeans_sensor -import app.algorithms.api_ex.Fdataclean as Fdataclean -import app.algorithms.api_ex.Pdataclean as Pdataclean +import app.algorithms.api_ex.flow_data_clean as flow_data_clean +import app.algorithms.api_ex.pressure_data_clean as pressure_data_clean import app.algorithms.api_ex.sensitivity as sensitivity from app.native.api.postgresql_info import get_pgconn_string @@ -1475,7 +1475,7 @@ def flow_data_clean(input_csv_file: str) -> str: if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = Fdataclean.clean_flow_data_kf(input_csv_path) + out_xlsx_path = flow_data_clean.clean_flow_data_kf(input_csv_path) print("清洗后的数据已保存到:", out_xlsx_path) @@ -1502,7 +1502,7 @@ def pressure_data_clean(input_csv_file: str) -> str: if not os.path.exists(input_csv_path): raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 - out_xlsx_path = Pdataclean.clean_pressure_data_km(input_csv_path) + out_xlsx_path = pressure_data_clean.clean_pressure_data_km(input_csv_path) print("清洗后的数据已保存到:", out_xlsx_path)