"""相似性计算模块。""" import math import numpy as np import pandas as pd def cal_similarity_simple_return_dd( similarity_mode, monitor_p, predict_p, normal_p, leak_p, monitor_p_all, predict_p_all, normal_p_all, leak_p_all, important_sensor, mean_dpressure, dpressure_std, dpressure_std_all, if_gy=0, cos_or_flow=1, ): # cos_or_flow 用于 CAF dpressure_s = normal_p - leak_p dpressure = predict_p - monitor_p act_dpressure = pd.Series(dtype=object) for i in range(len(leak_p.index)): if dpressure_std.iloc[i] > -200: # 0.001: if if_gy == 1: act_dpressure[leak_p.index[i]] = ( leak_p.iloc[i] - monitor_p.iloc[i] ) / dpressure_std.iloc[i] else: act_dpressure[leak_p.index[i]] = leak_p.iloc[i] - monitor_p.iloc[i] if similarity_mode == "COS" or (similarity_mode == "CAF" and cos_or_flow == 1): """if leak_p.min()<0: none_flag = 1 similarity_cos = 0 similarity_dis = 0 else:""" none_flag = 0 sensor_for_cos = sorted( set(dpressure_s.index).intersection(set(act_dpressure.index)) ) """if len(dpressure_s) ==0 or len(dpressure) ==0: jj=9 else:""" try: s1 = np.dot( np.transpose(dpressure_s.loc[sensor_for_cos]), dpressure.loc[sensor_for_cos], ) s2 = np.linalg.norm(dpressure_s.loc[sensor_for_cos]) * np.linalg.norm( dpressure.loc[sensor_for_cos] ) if s2 == 0: s2 = s2 + 0.0001 similarity_cos = s1 / s2 similarity_dis = 0 except Exception as e: print(dpressure_s) print(sensor_for_cos) print(act_dpressure) print(dpressure_std) print(dpressure) elif similarity_mode == "DIS" or (similarity_mode == "CAF" and cos_or_flow == 2): """if leak_p.min()<0: none_flag = 1 else:""" none_flag = 0 important_sensor = sorted( set(important_sensor).intersection(set(act_dpressure.index)) ) part_dpressure = dpressure_s[important_sensor] - dpressure[important_sensor] similarity_pre_DIS = np.linalg.norm(part_dpressure) # similarity_pre_DIS_later = 1 / (1 + similarity_pre_DIS) similarity_dis = similarity_pre_DIS similarity_cos = 0 elif similarity_mode == "CAD_new": act_dpressure = leak_p - monitor_p """if leak_p.min() < 0: none_flag = 1 similarity_cos = 0 similarity_dis =0 else:""" none_flag = 0 # cos s1 = np.dot(np.transpose(dpressure_s), dpressure) s2 = np.linalg.norm(dpressure_s) * np.linalg.norm(dpressure) if s2 == 0: s2 = s2 + 0.0001 similarity_cos = s1 / s2 # DIS part_dpressure = act_dpressure.loc[important_sensor] similarity_pre_DIS = np.linalg.norm(part_dpressure) similarity_pre_DIS_later = 1 / (1 + similarity_pre_DIS) similarity_dis = similarity_pre_DIS elif similarity_mode == "CAD_new_gy" or similarity_mode == "CDF": # cos sensor_for_cos = sorted( set(dpressure_s.index).intersection(set(act_dpressure.index)) ) if len(sensor_for_cos) == 0 and len(dpressure_s) == 0: similarity_cos = 0 elif len(sensor_for_cos) == 0 and len(dpressure_s) > 0: sensor_for_cos = list(dpressure_s.index) none_flag = 0 s1 = np.dot( np.transpose(dpressure_s.loc[sensor_for_cos]), dpressure.loc[sensor_for_cos], ) s2 = np.linalg.norm(dpressure_s.loc[sensor_for_cos]) * np.linalg.norm( dpressure.loc[sensor_for_cos] ) if s2 == 0: s2 = s2 + 0.0001 similarity_cos = s1 / s2 else: none_flag = 0 s1 = np.dot( np.transpose(dpressure_s.loc[sensor_for_cos]), dpressure.loc[sensor_for_cos], ) s2 = np.linalg.norm(dpressure_s.loc[sensor_for_cos]) * np.linalg.norm( dpressure.loc[sensor_for_cos] ) if s2 == 0: s2 = s2 + 0.0001 similarity_cos = s1 / s2 # DIS important_sensor_new = sorted( set(important_sensor).intersection(set(act_dpressure.index)) ) if len(important_sensor_new) == 0: important_sensor_new = important_sensor act_dpressure = pd.Series(dtype=object) for i in range(len(leak_p_all.index)): # if dpressure_std.iloc [i] > -200: # 0.001: if if_gy == 1: act_dpressure[leak_p_all.index[i]] = ( leak_p_all.iloc[i] - monitor_p_all.iloc[i] ) / dpressure_std_all.iloc[i] else: act_dpressure[leak_p_all.index[i]] = ( leak_p_all.iloc[i] - monitor_p_all.iloc[i] ) # part_dpressure = act_dpressure.loc[important_sensor_new] part_dpressure = ( dpressure.loc[important_sensor_new] - dpressure_s.loc[important_sensor_new] ) similarity_pre_DIS = np.linalg.norm(part_dpressure) ## chang test # part_dpressure = dpressure_s.loc[important_sensor]-dpressure.loc[important_sensor] # similarity_pre_DIS = np.linalg.norm(part_dpressure) # similarity_pre_DIS_later = 1 / (1 + similarity_pre_DIS) similarity_dis = similarity_pre_DIS elif similarity_mode == "OF": # cos similarity_cos = 0 none_flag = 0 # DIS important_sensor_new = sorted( set(important_sensor).intersection(set(act_dpressure.index)) ) if len(important_sensor_new) == 0: important_sensor_new = important_sensor act_dpressure = pd.Series(dtype=object) for i in range(len(leak_p_all.index)): # if dpressure_std.iloc [i] > -200: # 0.001: if if_gy == 1: act_dpressure[leak_p_all.index[i]] = ( leak_p_all.iloc[i] - monitor_p_all.iloc[i] ) / dpressure_std_all.iloc[i] else: act_dpressure[leak_p_all.index[i]] = ( leak_p_all.iloc[i] - monitor_p_all.iloc[i] ) # part_dpressure = act_dpressure.loc[important_sensor_new] part_dpressure = ( dpressure.loc[important_sensor_new] - dpressure_s.loc[important_sensor_new] ) similarity_pre_DIS = np.linalg.norm(part_dpressure) ## chang test # part_dpressure = dpressure_s.loc[important_sensor]-dpressure.loc[important_sensor] # similarity_pre_DIS = np.linalg.norm(part_dpressure) # similarity_pre_DIS_later = 1 / (1 + similarity_pre_DIS) similarity_dis = similarity_pre_DIS return similarity_cos, similarity_dis, none_flag def adjust( similarity_cos, similarity_dis, record_success_candidate, record_success_no_candidate, ): if len(record_success_no_candidate) > 0: for each in record_success_no_candidate: similarity_cos[each] = similarity_cos[record_success_candidate].min() * 0.9 similarity_dis[each] = similarity_dis[record_success_candidate].max() * 1.1 return similarity_cos, similarity_dis def cal_sq_all_multi( similarity_cos, similarity_dis, similarity_f, candidate_pipe, timestep_list_spc, if_flow, if_only_cos, if_only_flow, cos_h_input, dis_h_input, dis_f_h_input, if_compalsive, cos_sensor_num, flow_sensor_num, ): """融合多种相似性并输出按时刻与候选管段组织的综合相似度。 该函数会根据模式开关(是否仅流量、是否仅 COS、是否包含流量)对 `similarity_cos`、`similarity_dis`、`similarity_f` 做标准化,并计算 权重 `sq_cos/sq_dis/sq_f` 后进行加权融合。 Args: similarity_cos: 压力余弦相似性(DataFrame/Series,通常为时刻 x 候选管段)。 similarity_dis: 压力距离相似性(DataFrame/Series,通常为时刻 x 候选管段)。 similarity_f: 流量距离相似性(DataFrame/Series,通常为时刻 x 候选管段)。 candidate_pipe: 候选管段列表,用于输出列索引。 timestep_list_spc: 时刻列表,用于输出行索引。 if_flow: 是否启用流量相似性(1 启用,0 禁用)。 if_only_cos: 相似性模式标识(0: COS+DIS;1: COS;其他值按分支定义处理)。 if_only_flow: 是否仅使用流量相似性(1 是,0 否)。 cos_h_input: 外部给定的 COS 权重(强制权重模式下使用)。 dis_h_input: 外部给定的 DIS 权重(强制权重模式下使用)。 dis_f_h_input: 外部给定的流量权重(强制权重模式下使用)。 if_compalsive: 是否使用外部强制权重(1 使用输入权重,0 自动计算权重)。 cos_sensor_num: 压力传感器数量,用于权重调整。 flow_sensor_num: 流量传感器数量,用于权重调整。 Returns: tuple[pd.DataFrame | pd.Series, float, float, float]: - output_similarity_pd: 综合相似性结果。 - sq_cos: 最终 COS 权重。 - sq_dis: 最终 DIS 权重。 - sq_f: 最终流量权重。 """ if if_only_flow == 1: similarity_f, h_f = cal_sq_single_array( similarity_f.values.reshape((-1, 1)), if_direct=2 ) sq_cos = 0 sq_dis = 0 sq_f = 1 similarity_all = similarity_f * sq_f output_similarity = similarity_all.reshape((-1, len(candidate_pipe))) output_similarity_pd = pd.DataFrame( output_similarity, index=timestep_list_spc, columns=candidate_pipe ) else: if if_only_cos == 0: if if_flow == 1: # standerdize similarity_cos, h_cos = cal_sq_single_array( similarity_cos.values.reshape((-1, 1)), if_direct=1 ) similarity_dis, h_dis = cal_sq_single_array( similarity_dis.values.reshape((-1, 1)), if_direct=2 ) similarity_f, h_f = cal_sq_single_array( similarity_f.values.reshape((-1, 1)), if_direct=2 ) if if_compalsive == 1: sq_cos = cos_h_input sq_dis = dis_h_input sq_f = dis_f_h_input else: """sq_cos = h_cos/(h_cos +h_dis +h_f ) sq_dis = h_dis/(h_cos +h_dis +h_f ) sq_f = h_f/(h_cos +h_dis +h_f )""" sq_cos, sq_dis, sq_f = add_weight_for_SQ( h_cos, h_dis, h_f, cos_sensor_num, flow_sensor_num ) """if cos_sensor_num == 2 and sq_cos>0.2: sq_cos = 0.2 sq_dis = 0.8*h_dis / (h_dis + h_f) sq_f = 0.8*h_f / (h_dis + h_f) if cos_sensor_num == 1 and sq_dis > 0.3: sq_cos = 0.1 sq_dis = 0.3 sq_f = 0.6""" sq_cos, sq_dis, sq_f = adjust_ratio("CDF", sq_cos, sq_dis, sq_f) if cos_sensor_num <= 1: sq_cos = 0 # similarity similarity_all = ( similarity_cos * sq_cos + similarity_dis * sq_dis + similarity_f * sq_f ) output_similarity = similarity_all.reshape((-1, len(candidate_pipe))) output_similarity_pd = pd.DataFrame( output_similarity, index=timestep_list_spc, columns=candidate_pipe ) else: # standerdize similarity_cos, h_cos = cal_sq_single_array( similarity_cos.values.reshape((-1, 1)), if_direct=1 ) similarity_dis, h_dis = cal_sq_single_array( similarity_dis.values.reshape((-1, 1)), if_direct=2 ) if if_compalsive == 1: sq_cos = cos_h_input sq_dis = dis_h_input else: sq_cos = h_cos / (h_cos + h_dis) sq_dis = h_dis / (h_cos + h_dis) if cos_sensor_num == 2 and sq_cos > 0.5: sq_cos = 0.5 sq_dis = 0.5 sq_cos, sq_dis, sq_f = adjust_ratio("CAD_new_gy", sq_cos, sq_dis, 0) sq_f = 0 # similarity similarity_all = similarity_cos * sq_cos + similarity_dis * sq_dis output_similarity = similarity_all.reshape((-1, len(candidate_pipe))) output_similarity_pd = pd.DataFrame( output_similarity, index=timestep_list_spc, columns=candidate_pipe ) elif if_only_cos == 1: if if_flow == 1: # standerdize similarity_cos, h_cos = cal_sq_single_array( similarity_cos.values.reshape((-1, 1)), if_direct=1 ) similarity_f, h_f = cal_sq_single_array( similarity_f.values.reshape((-1, 1)), if_direct=2 ) if if_compalsive == 1: sq_cos = cos_h_input sq_f = dis_f_h_input else: sq_cos = h_cos / (h_cos + h_f) sq_f = h_f / (h_cos + h_f) sq_cos, sq_dis, sq_f = adjust_ratio("CAF", sq_cos, 0, sq_f) sq_dis = 0 # similarity similarity_all = similarity_cos * sq_cos + similarity_f * sq_f output_similarity = similarity_all.reshape((-1, len(candidate_pipe))) output_similarity_pd = pd.DataFrame( output_similarity, index=timestep_list_spc, columns=candidate_pipe ) else: sq_cos = cos_h_input sq_dis = dis_h_input sq_f = dis_f_h_input output_similarity_pd = similarity_cos else: sq_cos = cos_h_input sq_dis = dis_h_input sq_f = dis_f_h_input output_similarity_pd = 1 / (similarity_dis + 1) return output_similarity_pd, sq_cos, sq_dis, sq_f def add_weight_for_SQ(h_cos, h_dis, h_f, sensor_cos_num, sensor_f_num): h_f_new = h_f * sensor_f_num if sensor_cos_num <= 1: h_cos_new = 0 h_dis_new = h_dis * sensor_cos_num else: h_cos_new = h_cos * sensor_cos_num # / 2 h_dis_new = h_dis * sensor_cos_num # / 2 cos_sq = h_cos_new / (h_cos_new + h_dis_new + h_f_new) dis_sq = h_dis_new / (h_cos_new + h_dis_new + h_f_new) f_sq = h_f_new / (h_cos_new + h_dis_new + h_f_new) if sensor_cos_num == 2 and cos_sq > 0.2: cos_sq = 0.2 dis_sq = 0.8 * h_dis_new / (h_dis_new + h_f_new) f_sq = 0.8 * h_f_new / (h_dis_new + h_f_new) """if sensor_cos_num == 1: if dis_sq / f_sq > sensor_cos_num/sensor_f_num: dis_sq = sensor_cos_num/sensor_f_num f_sq=1-dis_sq""" # if h_dis_new/h_f_new > sensor_cos_num/sensor_f_num return cos_sq, dis_sq, f_sq def cal_sq_single_array(similarity_pre, if_direct): if similarity_pre.max() - similarity_pre.min() == 0: similarity_pre = np.ones(similarity_pre.shape) else: if if_direct == 1: similarity_pre = ( 0.998 * (similarity_pre - similarity_pre.min()) / (similarity_pre.max() - similarity_pre.min()) + 0.002 ) else: similarity_pre = ( 0.998 * (similarity_pre.max() - similarity_pre) / (similarity_pre.max() - similarity_pre.min()) + 0.002 ) # calculate pij similarity_p = similarity_pre / similarity_pre.sum() # cal xinxishang similarity_lnp = np.zeros((len(similarity_pre), 1)) for j in range(len(similarity_p)): similarity_lnp[j] = -similarity_p[j] * math.log(similarity_p[j], math.e) h = 1 - 1 / math.log(len(similarity_pre), math.e) * similarity_lnp.sum() return similarity_pre, h def cal_similarity_all_multi_new_sq_improve_double_lzr( candidate_pipe, similarity_mode, pressure_leak, monitor_p, predict_p, normal_p, if_flow, if_only_cos, if_only_flow, flow_leak, monitor_f, predict_f, normal_f, timestep_list, Top_sensor_num, if_gy, effective_sensor, cos_h, dis_h, dis_f_h, if_compalsive, max_flow, ): similarity = pd.Series(dtype=float, index=candidate_pipe) similarity_detail: pd.DataFrame | None = None important_p_sensor = cal_top_sensors(monitor_p, predict_p, Top_sensor_num) # important_f_sensor, basic_f = cal_top_f_sensor(normal_f) important_f_sensor = monitor_f.columns if ( len(important_p_sensor) > 0 or len(important_f_sensor) > 0 ): # if len(important_p_sensor) > 0 break_flag = 0 pressure_leak_new = pressure_leak.swaplevel() # flow_leak_new = flow_leak.swaplevel() if isinstance(flow_leak, pd.DataFrame) and len(flow_leak) > 0: flow_leak_new = flow_leak.swaplevel() else: flow_leak_new = None total_similarity_cos = pd.DataFrame(index=timestep_list, columns=candidate_pipe) total_similarity_dis = pd.DataFrame(index=timestep_list, columns=candidate_pipe) total_similarity_dis_f = pd.DataFrame( index=timestep_list, columns=candidate_pipe ) for timestep in timestep_list: # cal p_cos, p_dis, f_dis if if_only_flow != 1: pressure_leak_temp = pressure_leak_new.loc[timestep].loc[ :, effective_sensor ] monitor_p_temp = monitor_p.loc[timestep, effective_sensor] predict_p_temp = predict_p.loc[timestep, effective_sensor] normal_p_temp = normal_p.loc[timestep, effective_sensor] ( total_similarity_cos.loc[timestep, :], total_similarity_dis.loc[timestep, :], ) = cal_similarity_all_cos_dis( candidate_pipe, pressure_leak_temp, similarity_mode, monitor_p_temp, predict_p_temp, normal_p_temp, pressure_leak_new.loc[timestep].loc[:, monitor_p.columns], monitor_p.loc[timestep, :], predict_p.loc[timestep, :], normal_p.loc[timestep, :], important_p_sensor, if_gy, cos_or_flow=1, ) if if_flow == 1: if len(timestep_list) == 1: leak_f_temp = flow_leak_new.loc[timestep].loc[:, important_f_sensor] monitor_f_temp = monitor_f.loc[timestep, important_f_sensor] predict_f_temp = predict_f.loc[timestep, important_f_sensor] normal_f_temp = normal_f.loc[timestep, important_f_sensor] basic_normal_f_temp = abs(max_flow.loc[important_f_sensor]) leak_f_temp = leak_f_temp / basic_normal_f_temp monitor_f_temp = monitor_f_temp / basic_normal_f_temp predict_f_temp = predict_f_temp / basic_normal_f_temp normal_f_temp = normal_f_temp / basic_normal_f_temp else: basic_f = abs(max_flow.loc[important_f_sensor]) leak_f_temp = ( flow_leak_new.loc[timestep].loc[:, important_f_sensor] / basic_f ) monitor_f_temp = ( monitor_f.loc[timestep, important_f_sensor] / basic_f ) predict_f_temp = ( predict_f.loc[timestep, important_f_sensor] / basic_f ) normal_f_temp = normal_f.loc[timestep, important_f_sensor] / basic_f _, total_similarity_dis_f.loc[timestep, :] = cal_similarity_all_cos_dis( candidate_pipe, leak_f_temp, similarity_mode, monitor_f_temp, predict_f_temp, normal_f_temp, flow_leak_new.loc[timestep].loc[:, monitor_f.columns], monitor_f.loc[timestep, :], predict_f.loc[timestep, :], normal_f.loc[timestep, :], important_f_sensor, if_gy, cos_or_flow=2, ) else: total_similarity_dis_f = [] similarity_all, cos_h, dis_h, dis_f_h = cal_sq_all_multi( total_similarity_cos, total_similarity_dis, total_similarity_dis_f, candidate_pipe, timestep_list, if_flow, if_only_cos, if_only_flow, cos_h, dis_h, dis_f_h, if_compalsive, len(important_p_sensor), len(important_f_sensor), ) if len(timestep_list) == 1: similarity = similarity_all.iloc[0] elif len(timestep_list) > 3: for each_candidate in candidate_pipe: similarity[each_candidate] = remove_3_sigma( similarity_all.loc[:, each_candidate] ) else: for each_candidate in candidate_pipe: similarity[each_candidate] = similarity_all.loc[ :, each_candidate ].mean() similarity = similarity.sort_values(ascending=False, kind="mergesort") detail_index = [str(pipe) for pipe in candidate_pipe] similarity_detail = pd.DataFrame(index=detail_index) similarity_detail.index.name = "pipe_id" if isinstance(total_similarity_cos, pd.DataFrame) and len(total_similarity_cos) > 0: pressure_cos_mean = ( total_similarity_cos.mean(axis=0) .reindex(candidate_pipe) .to_numpy(dtype=float) ) else: pressure_cos_mean = np.full(len(candidate_pipe), np.nan) if isinstance(total_similarity_dis, pd.DataFrame) and len(total_similarity_dis) > 0: pressure_dis_mean = ( total_similarity_dis.mean(axis=0) .reindex(candidate_pipe) .to_numpy(dtype=float) ) else: pressure_dis_mean = np.full(len(candidate_pipe), np.nan) if isinstance(total_similarity_dis_f, pd.DataFrame) and len(total_similarity_dis_f) > 0: flow_dis_mean = ( total_similarity_dis_f.mean(axis=0) .reindex(candidate_pipe) .to_numpy(dtype=float) ) else: flow_dis_mean = np.full(len(candidate_pipe), np.nan) similarity_detail["pressure_cos_mean"] = pressure_cos_mean similarity_detail["pressure_dis_mean"] = pressure_dis_mean similarity_detail["flow_dis_mean"] = flow_dis_mean similarity_detail["weight_cos"] = float(cos_h) similarity_detail["weight_dis"] = float(dis_h) similarity_detail["weight_flow"] = float(dis_f_h) similarity_detail["final_similarity"] = ( similarity.reindex(candidate_pipe).to_numpy(dtype=float) ) similarity_detail["similarity_rank"] = ( similarity_detail["final_similarity"].rank(method="dense", ascending=False) ).astype(int) similarity_detail["pressure_sensor_count"] = int(len(important_p_sensor)) similarity_detail["flow_sensor_count"] = int(len(important_f_sensor)) similarity_detail = similarity_detail.sort_values( by="final_similarity", ascending=False, kind="mergesort" ) else: break_flag = 1 similarity = 0 cos_h = 0 dis_h = 0 dis_f_h = 0 return similarity, cos_h, dis_h, dis_f_h, break_flag, similarity_detail def cal_similarity_all_cos_dis( candidate_pipe, pressure_leak, similarity_mode, monitor_p, predict_p, normal_p, pressure_leak_all, monitor_p_all, predict_p_all, normal_p_all, important_sensor, if_gy, cos_or_flow, ): similarity_cos = pd.Series(dtype=float, index=candidate_pipe) similarity_dis = pd.Series(dtype=float, index=candidate_pipe) dpressure = normal_p - pressure_leak # 无用 ---------------------------------------------- mean_dpressure = dpressure.mean() monitor_new = pd.DataFrame(index=["monitor"], columns=monitor_p.index) monitor_new.iloc[0] = monitor_p add_m_leak_pressure = [pressure_leak, monitor_p] add_m_leak_pressure = pd.concat(add_m_leak_pressure) pressure_leak_std = add_m_leak_pressure.std(axis=0, ddof=1) pressure_leak_std = pd.Series(pressure_leak_std, index=pressure_leak.columns) add_m_leak_pressure_all = [pressure_leak_all, monitor_p_all] add_m_leak_pressure_all = pd.concat(add_m_leak_pressure_all) pressure_leak_std_all = add_m_leak_pressure_all.std(axis=0, ddof=1) pressure_leak_std_all = pd.Series( pressure_leak_std_all, index=pressure_leak.columns ) # 无用 ---------------------------------------------- monitor_p_temp = monitor_p predict_p_temp = predict_p normal_p_temp = normal_p monitor_p_temp_all = monitor_p_all predict_p_temp_all = predict_p_all normal_p_temp_all = normal_p_all record_success_candidate = [] record_success_no_candidate = [] for i in range(len(candidate_pipe)): leak_p = pressure_leak.iloc[i, :] leak_p_all = pressure_leak_all.iloc[i, :] similarity_cos.iloc[i], similarity_dis.iloc[i], none_flag = ( cal_similarity_simple_return_dd( similarity_mode, monitor_p_temp, predict_p_temp, normal_p_temp, leak_p, monitor_p_temp_all, predict_p_temp_all, normal_p_temp_all, leak_p_all, important_sensor, mean_dpressure, pressure_leak_std, pressure_leak_std_all, if_gy, cos_or_flow, ) ) if none_flag == 0: record_success_candidate.append(candidate_pipe[i]) else: record_success_no_candidate.append(candidate_pipe[i]) similarity_cos, similarity_dis = adjust( similarity_cos, similarity_dis, record_success_candidate, record_success_no_candidate, ) return similarity_cos, similarity_dis def cal_top_f_sensor(normal_f): if type(normal_f) == pd.core.frame.DataFrame: mean_f = normal_f.mean() else: mean_f = normal_f output_sensor = [] output_normal_f = pd.Series(dtype=object) for i in range(len(mean_f.index)): if abs(mean_f.iloc[i]) > 0.01 / 3600: output_sensor.append(mean_f.index[i]) output_normal_f[mean_f.index[i]] = mean_f.iloc[i] return output_sensor, output_normal_f def cal_top_sensors(monitor_p, predict_p, Top_sensor_num): dpressure = abs(predict_p - monitor_p) if type(dpressure) == pd.core.frame.DataFrame: dpressure = dpressure.mean() dpressure_rank = dpressure.sort_values(ascending=False, kind="mergesort") return list(dpressure_rank.index[:Top_sensor_num]) def remove_3_sigma(similarity_t): all_sample = len(similarity_t.index) apart_sample = math.ceil(all_sample * 0.6) similarity = similarity_t.astype("float") mean_t = similarity.mean() std_t = similarity.std() new_similarity = similarity[ (similarity <= mean_t + 3 * std_t) & (similarity >= mean_t - 3 * std_t) ] mean_t_new = new_similarity.mean() return mean_t_new def update_similarity(leak_candidate_center, similarity, leak_center_dict): similarity_new = pd.Series(dtype=float) for each_center in leak_candidate_center: houxuan_center = leak_center_dict[each_center] if len(houxuan_center) > 1: temp_similarity = similarity[houxuan_center] similarity_new[each_center] = temp_similarity.max() else: if type(similarity[each_center]) == pd.core.series.Series: similarity_new[each_center] = similarity[each_center].mean() else: similarity_new[each_center] = similarity[each_center] similarity_new = similarity_new.sort_values(ascending=False, kind="mergesort") return similarity_new def extra_judge( similarity, min_candidates_to_prune: int = 200, std_relax_factor: float = 0.5 ): if len(similarity.index) == 0: return 1.0, similarity if len(similarity.index) < int(min_candidates_to_prune): return 1.0, similarity mean_similarity = float(similarity.mean()) std_similarity = float(similarity.std()) if not math.isfinite(std_similarity): std_similarity = 0.0 threshold = mean_similarity - float(std_relax_factor) * std_similarity out_put_similarity = similarity[similarity >= threshold - 1e-10] if len(out_put_similarity.index) == 0: out_put_similarity = similarity.iloc[:1] cut_ratio = len(out_put_similarity.index) / len(similarity.index) return cut_ratio, out_put_similarity def adjust_ratio(similarity_mode, cos_h, dis_h, dis_f_h, low_limit=0.1): if similarity_mode == "CAF": if cos_h < low_limit: cos_h = low_limit dis_f_h = 1 - cos_h elif dis_f_h < low_limit: dis_f_h = low_limit cos_h = 1 - dis_f_h elif similarity_mode == "CAD_new_gy": if dis_h < low_limit: dis_h = low_limit cos_h = 1 - dis_h elif cos_h < low_limit: cos_h = low_limit dis_h = 1 - cos_h elif similarity_mode == "CDF": normal_index = [0, 1, 2] h_list = [cos_h, dis_h, dis_f_h] if cos_h < low_limit: h_list[0] = low_limit normal_index.remove(0) if dis_h < low_limit: h_list[1] = low_limit normal_index.remove(1) if dis_f_h < low_limit: h_list[2] = low_limit normal_index.remove(2) if len(normal_index) == 1: h_list[normal_index[0]] = h_list[normal_index[0]] - (sum(h_list) - 1) elif len(normal_index) == 2: sum_list = sum(h_list) multiper = 1 - (sum_list - 1) / ( h_list[normal_index[0]] + h_list[normal_index[1]] ) h_list[normal_index[0]] = h_list[normal_index[0]] * multiper h_list[normal_index[1]] = h_list[normal_index[1]] * multiper cos_h, dis_h, dis_f_h = h_list[0], h_list[1], h_list[2] return cos_h, dis_h, dis_f_h # 返回相似性计算的模式(不同权重),是否计算流量相似性,是否只计算cos相似性,是否只计算流量相似性。 def decode_mode(similarity_mode): if similarity_mode == "COS": if_flow = 0 if_only_cos = 1 if_only_flow = 0 elif similarity_mode == "CAD_new_gy": if_flow = 0 if_only_cos = 0 if_only_flow = 0 elif similarity_mode == "CDF": if_flow = 1 if_only_cos = 0 if_only_flow = 0 elif similarity_mode == "CAF": if_flow = 1 if_only_cos = 1 if_only_flow = 0 elif similarity_mode == "DIS": if_flow = 1 if_only_cos = 2 if_only_flow = 0 elif similarity_mode == "OF": if_flow = 1 if_only_cos = 0 if_only_flow = 1 return if_flow, if_only_cos, if_only_flow class SimilarityCalculator: @staticmethod def cal_similarity_simple_return_dd(*args, **kwargs): return cal_similarity_simple_return_dd(*args, **kwargs) @staticmethod def cal_similarity_all_cos_dis(*args, **kwargs): return cal_similarity_all_cos_dis(*args, **kwargs) @staticmethod def cal_similarity_all_multi_new_sq_improve_double_lzr(*args, **kwargs): return cal_similarity_all_multi_new_sq_improve_double_lzr(*args, **kwargs) @staticmethod def cal_sq_all_multi(*args, **kwargs): return cal_sq_all_multi(*args, **kwargs) @staticmethod def cal_sq_single_array(*args, **kwargs): return cal_sq_single_array(*args, **kwargs) @staticmethod def add_weight_for_SQ(*args, **kwargs): return add_weight_for_SQ(*args, **kwargs) @staticmethod def adjust_ratio(*args, **kwargs): return adjust_ratio(*args, **kwargs) @staticmethod def adjust(*args, **kwargs): return adjust(*args, **kwargs) @staticmethod def cal_top_sensors(*args, **kwargs): return cal_top_sensors(*args, **kwargs) @staticmethod def cal_top_f_sensor(*args, **kwargs): return cal_top_f_sensor(*args, **kwargs) @staticmethod def remove_3_sigma(*args, **kwargs): return remove_3_sigma(*args, **kwargs) @staticmethod def update_similarity(*args, **kwargs): return update_similarity(*args, **kwargs) @staticmethod def extra_judge(*args, **kwargs): return extra_judge(*args, **kwargs) @staticmethod def decode_mode(*args, **kwargs): return decode_mode(*args, **kwargs)