diff --git a/api_ex/Fdataclean.py b/api_ex/Fdataclean.py new file mode 100644 index 0000000..deb2048 --- /dev/null +++ b/api_ex/Fdataclean.py @@ -0,0 +1,207 @@ +# ...existing code... +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from pykalman import KalmanFilter +import os + + + +def clean_flow_data_kf(input_csv_path: str, show_plot: bool = False) -> str: + """ + 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 + 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。 + 仅保留输入文件路径作为参数(按要求)。 + """ + # 读取 CSV + data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8") + # 存储 Kalman 平滑结果 + data_kf = pd.DataFrame(index=data.index, columns=data.columns) + # 平滑每一列 + for col in data.columns: + observations = pd.Series(data[col].values).ffill().bfill() + if observations.isna().any(): + observations = observations.fillna(observations.mean()) + obs = observations.values.astype(float) + + kf = KalmanFilter( + transition_matrices=[1], + observation_matrices=[1], + initial_state_mean=float(obs[0]), + initial_state_covariance=1, + observation_covariance=1, + transition_covariance=0.01 + ) + # 跳过EM学习,使用固定参数以提高性能 + state_means, _ = kf.smooth(obs) + data_kf[col] = state_means.flatten() + + # 计算残差并用IQR检测异常(更稳健的方法) + residuals = data - data_kf + residual_thresholds = {} + for col in data.columns: + res_values = residuals[col].dropna().values # 移除NaN以计算IQR + q1 = np.percentile(res_values, 25) + q3 = np.percentile(res_values, 75) + iqr = q3 - q1 + lower_threshold = q1 - 1.5 * iqr + upper_threshold = q3 + 1.5 * iqr + residual_thresholds[col] = (lower_threshold, upper_threshold) + + cleaned_data = data.copy() + anomalies_info = {} + for col in data.columns: + lower, upper = residual_thresholds[col] + sensor_residuals = residuals[col] + anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper) + anomaly_idx = data.index[anomaly_mask.fillna(False)] + anomalies_info[col] = pd.DataFrame({ + 'Observed': data.loc[anomaly_idx, col], + 'Kalman_Predicted': data_kf.loc[anomaly_idx, col], + 'Residual': sensor_residuals.loc[anomaly_idx] + }) + cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col] + + # 构造输出文件名:在输入文件名基础上加后缀 _cleaned.xlsx + input_dir = os.path.dirname(os.path.abspath(input_csv_path)) + input_base = os.path.splitext(os.path.basename(input_csv_path))[0] + output_filename = f"{input_base}_cleaned.xlsx" + output_path = os.path.join(input_dir, output_filename) + + # 覆盖同名文件 + if os.path.exists(output_path): + os.remove(output_path) + cleaned_data.to_excel(output_path, index=False) + + # 可选可视化(第一个传感器) + plt.rcParams['font.sans-serif'] = ['SimHei'] + plt.rcParams['axes.unicode_minus'] = False + if show_plot and len(data.columns) > 0: + sensor_to_plot = data.columns[0] + plt.figure(figsize=(12, 6)) + plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7) + plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2) + anomaly_idx = anomalies_info[sensor_to_plot].index + if len(anomaly_idx) > 0: + plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点") + plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值") + plt.xlabel("时间点(序号)") + plt.ylabel("监测值") + plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)") + plt.legend() + plt.show() + + # 返回输出文件的绝对路径 + return os.path.abspath(output_path) + +def clean_flow_data_dict(data_dict: dict, show_plot: bool = False) -> dict: + """ + 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 + 返回清洗后的字典数据结构。 + """ + # 将字典转换为 DataFrame + data = pd.DataFrame(data_dict) + # 存储 Kalman 平滑结果 + data_kf = pd.DataFrame(index=data.index, columns=data.columns) + # 平滑每一列 + for col in data.columns: + observations = pd.Series(data[col].values).ffill().bfill() + if observations.isna().any(): + observations = observations.fillna(observations.mean()) + obs = observations.values.astype(float) + + kf = KalmanFilter( + transition_matrices=[1], + observation_matrices=[1], + initial_state_mean=float(obs[0]), + initial_state_covariance=1, + observation_covariance=10, + transition_covariance=10 + ) + # 跳过EM学习,使用固定参数以提高性能 + state_means, _ = kf.smooth(obs) + data_kf[col] = state_means.flatten() + + # 计算残差并用IQR检测异常(更稳健的方法) + residuals = data - data_kf + residual_thresholds = {} + for col in data.columns: + res_values = residuals[col].dropna().values # 移除NaN以计算IQR + q1 = np.percentile(res_values, 25) + q3 = np.percentile(res_values, 75) + iqr = q3 - q1 + lower_threshold = q1 - 1.5 * iqr + upper_threshold = q3 + 1.5 * iqr + residual_thresholds[col] = (lower_threshold, upper_threshold) + + cleaned_data = data.copy() + anomalies_info = {} + for col in data.columns: + lower, upper = residual_thresholds[col] + sensor_residuals = residuals[col] + anomaly_mask = (sensor_residuals < lower) | (sensor_residuals > upper) + anomaly_idx = data.index[anomaly_mask.fillna(False)] + anomalies_info[col] = pd.DataFrame({ + 'Observed': data.loc[anomaly_idx, col], + 'Kalman_Predicted': data_kf.loc[anomaly_idx, col], + 'Residual': sensor_residuals.loc[anomaly_idx] + }) + cleaned_data.loc[anomaly_idx, f'{col}_cleaned'] = data_kf.loc[anomaly_idx, col] + + # 可选可视化(第一个传感器) + plt.rcParams['font.sans-serif'] = ['SimHei'] + plt.rcParams['axes.unicode_minus'] = False + if show_plot and len(data.columns) > 0: + sensor_to_plot = data.columns[0] + plt.figure(figsize=(12, 6)) + plt.plot(data.index, data[sensor_to_plot], label="监测值", marker='o', markersize=3, alpha=0.7) + plt.plot(data.index, data_kf[sensor_to_plot], label="Kalman滤波预测值", linewidth=2) + anomaly_idx = anomalies_info[sensor_to_plot].index + if len(anomaly_idx) > 0: + plt.plot(anomaly_idx, data[sensor_to_plot].loc[anomaly_idx], 'ro', markersize=8, label="监测值异常点") + plt.plot(anomaly_idx, data_kf[sensor_to_plot].loc[anomaly_idx], 'go', markersize=8, label="Kalman修复值") + plt.xlabel("时间点(序号)") + plt.ylabel("监测值") + plt.title(f"{sensor_to_plot}:观测值与Kalman滤波预测值(异常点标记)") + plt.legend() + plt.show() + + # 返回清洗后的字典 + return cleaned_data.to_dict(orient='list') + +# # 测试 +# if __name__ == "__main__": +# # 默认:脚本目录下同名 CSV 文件 +# script_dir = os.path.dirname(os.path.abspath(__file__)) +# default_csv = os.path.join(script_dir, "pipe_flow_data_to_clean2.0.csv") +# out = clean_flow_data_kf(default_csv) +# print("清洗后的数据已保存到:", out) + +# 测试 clean_flow_data_dict 函数 +if __name__ == "__main__": + import random + # 读取 szh_flow_scada.csv 文件 + script_dir = os.path.dirname(os.path.abspath(__file__)) + csv_path = os.path.join(script_dir, "szh_flow_scada.csv") + data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8") + + # 排除 Time 列,随机选择 5 列 + columns_to_exclude = ['Time'] + available_columns = [col for col in data.columns if col not in columns_to_exclude] + selected_columns = random.sample(available_columns, 1) + + # 将选中的列转换为字典 + data_dict = {col: data[col].tolist() for col in selected_columns} + + print("选中的列:", selected_columns) + print("原始数据长度:", len(data_dict[selected_columns[0]])) + + # 调用函数进行清洗 + cleaned_dict = clean_flow_data_dict(data_dict, show_plot=True) + # 将清洗后的字典写回 CSV + out_csv = os.path.join(script_dir, f"{selected_columns[0]}_clean.csv") + pd.DataFrame(cleaned_dict).to_csv(out_csv, index=False, encoding='utf-8-sig') + print("已保存清洗结果到:", out_csv) + print("清洗后的字典键:", list(cleaned_dict.keys())) + print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]])) + print("测试完成:函数运行正常") diff --git a/api_ex/Pdataclean.py b/api_ex/Pdataclean.py new file mode 100644 index 0000000..a5c0bd1 --- /dev/null +++ b/api_ex/Pdataclean.py @@ -0,0 +1,207 @@ +import pandas as pd +import numpy as np +import matplotlib.pyplot as plt +from sklearn.decomposition import PCA +from sklearn.cluster import KMeans +from sklearn.metrics import silhouette_score +import os + + + +def clean_pressure_data_km(input_csv_path: str, show_plot: bool = False) -> str: + """ + 读取输入 CSV,基于 KMeans 检测异常并用滚动平均修复。输出为 _cleaned.xlsx(同目录)。 + 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 + 返回输出文件的绝对路径。 + """ + # 读取 CSV + input_csv_path = os.path.abspath(input_csv_path) + data = pd.read_csv(input_csv_path, header=0, index_col=None, encoding="utf-8") + # 标准化 + data_norm = (data - data.mean()) / data.std() + + # 聚类与异常检测 + k = 3 + kmeans = KMeans(n_clusters=k, init="k-means++", n_init=50, random_state=42) + clusters = kmeans.fit_predict(data_norm) + centers = kmeans.cluster_centers_ + + distances = np.linalg.norm(data_norm.values - centers[clusters], axis=1) + threshold = distances.mean() + 3 * distances.std() + + anomaly_pos = np.where(distances > threshold)[0] + anomaly_indices = data.index[anomaly_pos] + + anomaly_details = {} + for pos in anomaly_pos: + row_norm = data_norm.iloc[pos] + cluster_idx = clusters[pos] + center = centers[cluster_idx] + diff = abs(row_norm - center) + main_sensor = diff.idxmax() + anomaly_details[data.index[pos]] = main_sensor + + # 修复:滚动平均(窗口可调) + data_rolled = data.rolling(window=13, center=True, min_periods=1).mean() + data_repaired = data.copy() + for pos in anomaly_pos: + label = data.index[pos] + sensor = anomaly_details[label] + data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] + + # 可选可视化(使用位置作为 x 轴) + plt.rcParams['font.sans-serif'] = ['SimHei'] + plt.rcParams['axes.unicode_minus'] = False + + if show_plot and len(data.columns) > 0: + n = len(data) + time = np.arange(n) + plt.figure(figsize=(12, 8)) + for col in data.columns: + plt.plot(time, data[col].values, marker='o', markersize=3, label=col) + for pos in anomaly_pos: + sensor = anomaly_details[data.index[pos]] + plt.plot(pos, data.iloc[pos][sensor], 'ro', markersize=8) + plt.xlabel("时间点(序号)") + plt.ylabel("压力监测值") + plt.title("各传感器折线图(红色标记主要异常点)") + plt.legend() + plt.show() + + plt.figure(figsize=(12, 8)) + for col in data_repaired.columns: + plt.plot(time, data_repaired[col].values, marker='o', markersize=3, label=col) + for pos in anomaly_pos: + sensor = anomaly_details[data.index[pos]] + plt.plot(pos, data_repaired.iloc[pos][sensor], 'go', markersize=8) + plt.xlabel("时间点(序号)") + plt.ylabel("修复后压力监测值") + plt.title("修复后各传感器折线图(绿色标记修复值)") + plt.legend() + plt.show() + + # 保存到 Excel:两个 sheet + input_dir = os.path.dirname(os.path.abspath(input_csv_path)) + input_base = os.path.splitext(os.path.basename(input_csv_path))[0] + output_filename = f"{input_base}_cleaned.xlsx" + output_path = os.path.join(input_dir, output_filename) + + if os.path.exists(output_path): + os.remove(output_path) # 覆盖同名文件 + with pd.ExcelWriter(output_path, engine='openpyxl') as writer: + data.to_excel(writer, sheet_name='raw_pressure_data', index=False) + data_repaired.to_excel(writer, sheet_name='cleaned_pressusre_data', index=False) + + # 返回输出文件的绝对路径 + return os.path.abspath(output_path) + + +def clean_pressure_data_dict_km(data_dict: dict, show_plot: bool = False) -> dict: + """ + 接收一个字典数据结构,其中键为列名,值为时间序列列表,使用KMeans聚类检测异常并用滚动平均修复。 + 返回清洗后的字典数据结构。 + """ + # 将字典转换为 DataFrame + data = pd.DataFrame(data_dict) + # 填充NaN值 + data = data.ffill().bfill() + # 标准化 + data_norm = (data - data.mean()) / data.std() + + # 聚类与异常检测 + k = 3 + kmeans = KMeans(n_clusters=k, init="k-means++", n_init=50, random_state=42) + clusters = kmeans.fit_predict(data_norm) + centers = kmeans.cluster_centers_ + + distances = np.linalg.norm(data_norm.values - centers[clusters], axis=1) + threshold = distances.mean() + 3 * distances.std() + + anomaly_pos = np.where(distances > threshold)[0] + anomaly_indices = data.index[anomaly_pos] + + anomaly_details = {} + for pos in anomaly_pos: + row_norm = data_norm.iloc[pos] + cluster_idx = clusters[pos] + center = centers[cluster_idx] + diff = abs(row_norm - center) + main_sensor = diff.idxmax() + anomaly_details[data.index[pos]] = main_sensor + + # 修复:滚动平均(窗口可调) + data_rolled = data.rolling(window=13, center=True, min_periods=1).mean() + data_repaired = data.copy() + for pos in anomaly_pos: + label = data.index[pos] + sensor = anomaly_details[label] + data_repaired.loc[label, sensor] = data_rolled.loc[label, sensor] + + # 可选可视化(使用位置作为 x 轴) + plt.rcParams['font.sans-serif'] = ['SimHei'] + plt.rcParams['axes.unicode_minus'] = False + + if show_plot and len(data.columns) > 0: + n = len(data) + time = np.arange(n) + plt.figure(figsize=(12, 8)) + for col in data.columns: + plt.plot(time, data[col].values, marker='o', markersize=3, label=col) + for pos in anomaly_pos: + sensor = anomaly_details[data.index[pos]] + plt.plot(pos, data.iloc[pos][sensor], 'ro', markersize=8) + plt.xlabel("时间点(序号)") + plt.ylabel("压力监测值") + plt.title("各传感器折线图(红色标记主要异常点)") + plt.legend() + plt.show() + + plt.figure(figsize=(12, 8)) + for col in data_repaired.columns: + plt.plot(time, data_repaired[col].values, marker='o', markersize=3, label=col) + for pos in anomaly_pos: + sensor = anomaly_details[data.index[pos]] + plt.plot(pos, data_repaired.iloc[pos][sensor], 'go', markersize=8) + plt.xlabel("时间点(序号)") + plt.ylabel("修复后压力监测值") + plt.title("修复后各传感器折线图(绿色标记修复值)") + plt.legend() + plt.show() + + # 返回清洗后的字典 + return data_repaired.to_dict(orient='list') + + +# 测试 +# if __name__ == "__main__": +# # 默认使用脚本目录下的 pressure_raw_data.csv +# script_dir = os.path.dirname(os.path.abspath(__file__)) +# default_csv = os.path.join(script_dir, "pressure_raw_data.csv") +# out_path = clean_pressure_data_km(default_csv, show_plot=False) +# print("保存路径:", out_path) + +# 测试 clean_pressure_data_dict_km 函数 +if __name__ == "__main__": + import random + # 读取 szh_pressure_scada.csv 文件 + script_dir = os.path.dirname(os.path.abspath(__file__)) + csv_path = os.path.join(script_dir, "szh_pressure_scada.csv") + data = pd.read_csv(csv_path, header=0, index_col=None, encoding="utf-8") + + # 排除 Time 列,随机选择 5 列 + columns_to_exclude = ['Time'] + available_columns = [col for col in data.columns if col not in columns_to_exclude] + selected_columns = random.sample(available_columns, 5) + + # 将选中的列转换为字典 + data_dict = {col: data[col].tolist() for col in selected_columns} + + print("选中的列:", selected_columns) + print("原始数据长度:", len(data_dict[selected_columns[0]])) + + # 调用函数进行清洗 + cleaned_dict = clean_pressure_data_dict_km(data_dict, show_plot=True) + + print("清洗后的字典键:", list(cleaned_dict.keys())) + print("清洗后的数据长度:", len(cleaned_dict[selected_columns[0]])) + print("测试完成:函数运行正常") diff --git a/api_ex/kmeans_sensor.cp312-win_amd64.pyd b/api_ex/kmeans_sensor.cp312-win_amd64.pyd new file mode 100644 index 0000000..8bbe800 Binary files /dev/null and b/api_ex/kmeans_sensor.cp312-win_amd64.pyd differ diff --git a/api_ex/remove_sb_columns.py b/api_ex/remove_sb_columns.py new file mode 100644 index 0000000..93f3f70 --- /dev/null +++ b/api_ex/remove_sb_columns.py @@ -0,0 +1,32 @@ +import csv +from pathlib import Path + +# infile = Path(r"c:\copilot codes\dataclean\Flow_Timedata2025_new_format.csv") +# outfile = Path(r"c:\copilot codes\dataclean\szh_flow_scada.csv") + +infile = Path(r"c:\copilot codes\dataclean\Pressure_Timedata2025_new_format.csv") +outfile = Path(r"c:\copilot codes\dataclean\szh_pressure_scada.csv") + +with infile.open("r", newline="", encoding="utf-8") as f_in: + reader = csv.reader(f_in) + rows = list(reader) + +if not rows: + print("input file is empty") + raise SystemExit(1) + +headers = rows[0] +# keep columns whose header does NOT contain 'SB_' +keep_indices = [i for i,h in enumerate(headers) if 'SB_' not in h] +removed = [h for i,h in enumerate(headers) if 'SB_' in h] + +with outfile.open("w", newline="", encoding="utf-8") as f_out: + writer = csv.writer(f_out) + for row in rows: + # ensure row has same length as headers + if len(row) < len(headers): + row = row + [''] * (len(headers) - len(row)) + newrow = [row[i] for i in keep_indices] + writer.writerow(newrow) + +print(f"Wrote {outfile} — removed {len(removed)} columns containing 'SB_'.") diff --git a/influxdb_api.py b/influxdb_api.py index 69480e9..c45f7e6 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -3513,6 +3513,129 @@ def delete_data(delete_date: str, bucket: str) -> None: delete_api: DeleteApi = client.delete_api() delete_api.delete(start=start_time, stop=stop_time, predicate=predicate, bucket=bucket) + + #2025/08/18 从文件导入scada数据,xkl +def import_data_from_file(file_path: str, bucket: str = "SCADA_data") -> None: + """ + 从指定的CSV文件导入数据到InfluxDB的指定bucket中。 + :param file_path: CSV文件的路径 + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + #清空指定bucket的数据 + # delete_api = DeleteApi(client) + # start = "1970-01-01T00:00:00Z" + # stop = "2100-01-01T00:00:00Z" + # delete_api.delete(start, stop, '', bucket="SCADA_data", org="TJWATERORG") + + df = pd.read_csv(file_path) + write_api = client.write_api(write_options=SYNCHRONOUS) + points_to_write = [] + for _, row in df.iterrows(): + scada_id = row['ScadaId'] + value = row['Value'] + time_str = row['Time'] + date_str = str(time_str)[:10] # 取前10位作为日期 + try: + raw_value = float(value) + except (ValueError, TypeError): + raw_value = 0.0 + point = Point("SCADA") \ + .tag("date", date_str) \ + .tag("description", None) \ + .tag("device_ID", scada_id) \ + .field("monitored_value", raw_value) \ + .field("datacleaning_value", 0.0) \ + .field("simulation_value", 0.0) \ + .time(time_str, write_precision='s') + points_to_write.append(point) + # 批量写入数据 + batch_size = 500 + for i in range(0, len(points_to_write), batch_size): + batch = points_to_write[i:i+batch_size] + write_api.write(bucket=bucket, record=batch) + print(f"Data imported from {file_path} to bucket {bucket} successfully.") + print(f"Total points written: {len(points_to_write)}") + write_api.close() + client.close() + +#2025/08/28 从多列格式文件导入SCADA数据,xkl +def import_multicolumn_data_from_file(file_path: str, raw: bool = True, bucket: str = "SCADA_data") -> None: + """ + 从指定的多列格式CSV文件导入数据到InfluxDB的指定bucket中。 + :param file_path: CSV文件的路径 + :param bucket: 数据存储的 bucket 名称,默认值为 "SCADA_data" + :return: + """ + client = get_new_client() + write_api = client.write_api(write_options=SYNCHRONOUS) + points_to_write = [] + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y/%m/%d %H:%M'))) + def convert_to_iso(timestr): + # 假设原格式为 '2025/8/3 0:00' + dt = datetime.strptime(timestr, '%Y-%m-%d %H:%M:%S') + return dt.isoformat() + + with open(file_path, encoding='utf-8') as f: + reader = csv.reader(f) + header = next(reader) + device_ids = header[1:] # 第一列是time,后面是device_ID + if raw: + for row in reader: + time_str = row[0] + iso_time = convert_to_iso(time_str) + for idx, value in enumerate(row[1:]): + try: + raw_value = float(value) + except (ValueError, TypeError): + raw_value = 0.0 + scada_id = device_ids[idx] + # 如果是原始数据,直接使用Value列 + point = Point("SCADA") \ + .tag("date", iso_time.split('T')[0]) \ + .tag("description", None) \ + .tag("device_ID", scada_id) \ + .field("monitored_value", raw_value) \ + .field("datacleaning_value", 0.0) \ + .field("simulation_value", 0.0) \ + .time(iso_time, WritePrecision.S) + points_to_write.append(point) + else: + for row in reader: + time_str = row[0] + iso_time = convert_to_iso(time_str) + # 如果不是原始数据,直接使用datacleaning_value列 + for idx, value in enumerate(row[1:]): + scada_id = device_ids[idx] + try: + datacleaning_value = float(value) + except (ValueError, TypeError): + datacleaning_value = 0.0 + # 如果是清洗数据,直接使用datacleaning_value列 + point = Point("SCADA") \ + .tag("date", iso_time.split('T')[0]) \ + .tag("description", "None") \ + .tag("device_ID", scada_id) \ + .field("monitored_value", 0.0) \ + .field("datacleaning_value",datacleaning_value) \ + .field("simulation_value", 0.0) \ + .time(iso_time, WritePrecision.S) + points_to_write.append(point) + # 批量写入数据 + batch_size = 1000 + for i in range(0, len(points_to_write), batch_size): + batch = points_to_write[i:i+batch_size] + write_api.write(bucket=bucket, record=batch) + print(f"Data imported from {file_path} to bucket {bucket} successfully.") + print(f"Total points written: {len(points_to_write)}") + write_api.close() + client.close() + # 示例调用 @@ -3652,5 +3775,25 @@ if __name__ == "__main__": # end_time='2024-03-26T23:59:00+08:00') # print(result) + #示例:import_data_from_file + #import_data_from_file(file_path='data/Flow_Timedata.csv', bucket='SCADA_data') + # # 示例:query_all_records_by_type_date + #result = query_all__records_by_type__date(type='node', query_date='2025-08-04') + + #示例:query_all_records_by_date_hour + #result = query_all_records_by_date_hour(query_date='2025-08-04', query_hour=1) + + #示例:import_multicolumn_data_from_file + #import_multicolumn_data_from_file(file_path='data/selected_Flow_Timedata2025_new_format_cleaned.csv', raw=False, bucket='SCADA_data') + + # client = InfluxDBClient(url="http://localhost:8086", token=token, org=org_name) + # delete_api = client.delete_api() + + # start = "2025-08-02T00:00:00Z" # 要删除的起始时间 + # stop = "2025-08-11T00:00:00Z" # 结束时间(可设为未来) + # predicate = '_measurement="SCADA"' # 指定 measurement + + # delete_api.delete(start, stop, predicate, bucket="SCADA_data", org=org_name) + # client.close() diff --git a/online_Analysis.py b/online_Analysis.py index 52ea9b1..69d76f3 100644 --- a/online_Analysis.py +++ b/online_Analysis.py @@ -19,6 +19,7 @@ from sqlalchemy import create_engine import ast import sensitivity import project_info +import api_ex.kmeans_sensor ############################################################ # burst analysis 01 @@ -1064,6 +1065,85 @@ def pressure_sensor_placement_sensitivity(name: str, scheme_name: str, sensor_nu except Exception as e: print(f"存储方案信息时出错:{e}") +#2025/08/21 +# 基于kmeans聚类法进行压力监测点优化布置 +def pressure_sensor_placement_kmeans(name: str, scheme_name: str, sensor_number: int, + min_diameter: int, username: str) -> None: + """ + 基于聚类法进行压力监测点优化布置 + :param name: 数据库名称(注意,此处数据库名称也是inp文件名称,inp文件与pg库名要一样) + :param scheme_name: 监测优化布置方案名称 + :param sensor_number: 传感器数目 + :param min_diameter: 最小管径 + :param username: 用户名 + :return: + """ + sensor_location = api_ex.kmeans_sensor.kmeans_sensor_placement(name=name, sensor_num=sensor_number, min_diameter=min_diameter) + try: + conn_string = f"dbname={name} host=127.0.0.1" + with psycopg.connect(conn_string) as conn: + with conn.cursor() as cur: + sql = """ + INSERT INTO sensor_placement (scheme_name, sensor_number, min_diameter, username, sensor_location) + VALUES (%s, %s, %s, %s, %s) + """ + + cur.execute(sql, (scheme_name, sensor_number, min_diameter, username, sensor_location)) + conn.commit() + print("方案信息存储成功!") + except Exception as e: + print(f"存储方案信息时出错:{e}") + +############################################################ +# 流量监测数据清洗 ***卡尔曼滤波法*** +############################################################ +#2025/08/21 hxyan + +def flow_data_clean(input_csv_file: str) -> str: + """ + 读取 input_csv_path 中的每列时间序列,使用一维 Kalman 滤波平滑并用预测值替换基于 3σ 检测出的异常点。 + 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 + :param: input_csv_file: 输入的 CSV 文件明或路径 + :return: 输出文件的绝对路径 + """ + + + # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 + script_dir = os.path.dirname(os.path.abspath(__file__)) + input_csv_path= os.path.join(script_dir, input_csv_file) + + # 检查文件是否存在 + if not os.path.exists(input_csv_path): + raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") + # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 + out_xlsx_path = api_ex.Fdataclean.clean_flow_data_kf(input_csv_path) + print("清洗后的数据已保存到:", out_xlsx_path ) + + +############################################################ +# 压力监测数据清洗 ***kmean++法*** +############################################################ +#2025/08/21 hxyan + +def pressure_data_clean(input_csv_file: str) -> str: + """ + 读取 input_csv_path 中的每列时间序列,使用Kmean++清洗数据。 + 保存输出为:_cleaned.xlsx(与输入同目录),并返回输出文件的绝对路径。如有同名文件存在,则覆盖。 + 原始数据在 sheet 'raw_pressure_data',处理后数据在 sheet 'cleaned_pressusre_data'。 + :param input_csv_path: 输入的 CSV 文件路径 + :return: 输出文件的绝对路径 + """ + + # 提供的 input_csv_path 绝对路径,以下为 默认脚本目录下同名 CSV 文件,构建绝对路径,可根据情况修改 + script_dir = os.path.dirname(os.path.abspath(__file__)) + input_csv_path= os.path.join(script_dir, input_csv_file) + + # 检查文件是否存在 + if not os.path.exists(input_csv_path): + raise FileNotFoundError(f"指定的文件不存在: {input_csv_path}") + # 调用 Fdataclean.clean_flow_data_kf 函数进行数据清洗 + out_xlsx_path = api_ex.Pdataclean.clean_pressure_data_km(input_csv_path) + print("清洗后的数据已保存到:", out_xlsx_path ) if __name__ == '__main__': # contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800) @@ -1120,3 +1200,5 @@ if __name__ == '__main__': # 示例:pressure_sensor_placement_sensitivity pressure_sensor_placement_sensitivity(name=project_info.name, scheme_name='20250517', sensor_number=10, min_diameter=300, username='admin') + # 示例:pressure_sensor_placement_kmeans + pressure_sensor_placement_kmeans(name=project_info.name, scheme_name='sensor_1027', sensor_number=35, min_diameter=300, username='admin') \ No newline at end of file