From 251a8f8499bfc56987fe1b5d37a45641c270ae5e Mon Sep 17 00:00:00 2001 From: JIANG Date: Fri, 14 Nov 2025 18:36:27 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=B8=85=E6=B4=97=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- influxdb_api.py | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 34 +++++++++++++++++++--------- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index 64c736f..bd74fdf 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -2004,6 +2004,66 @@ def query_cleaning_SCADA_data_by_device_ID_and_timerange( return SCADA_dict +# 查找 SCADA 模拟数据,返回的是一个list,list的内容是清洗后的正常值,表示为 time + value +def query_simulation_SCADA_data_by_device_ID_and_timerange( + query_ids_list: List[str], + start_time: str, + end_time: str, + bucket: str = "SCADA_data", +): + """ + 查询指定时间范围内,多个SCADA设备的修复的单个的数据 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2023-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2023-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print( + "{} -- Failed to connect to InfluxDB.".format( + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ) + ) + + query_api = client.query_api() + print("start_time", start_time) + print("end_time", end_time) + # 将北京时间转换为 UTC 时间 + beijing_start_time = datetime.fromisoformat(start_time) + print("beijing_start_time", beijing_start_time) + utc_start_time = time_api.to_utc_time(beijing_start_time) + print("utc_start_time", utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + print("beijing_end_time", beijing_end_time) + utc_stop_time = time_api.to_utc_time(beijing_end_time) + print("utc_stop_time", utc_stop_time) + + SCADA_dict = {} + for device_id in query_ids_list: + flux_query = f""" + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datacleaning_value") + |> sort(columns: ["_time"]) + """ + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + print(tables) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append( + {"time": record["_time"], "value": record["_value"]} + ) + SCADA_dict[device_id] = records_list + + client.close() + + return SCADA_dict + # 2025/05/04 DingZQ # SCADA 数据原版缺失,根据历史数据的平均值补上缺失的部分 diff --git a/main.py b/main.py index 62c298e..36e8185 100644 --- a/main.py +++ b/main.py @@ -3120,6 +3120,18 @@ async def fastapi_query_cleaning_scada_data_by_device_id_and_time_range( return influxdb_api.query_cleaning_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) +# 查询到的SCADA模拟数据(从 realtime_simulation bucket 中查找) +@app.get("/querysimulationscadadatabydeviceidandtimerange/") +async def fastapi_query_simulation_scada_data_by_device_id_and_time_range( + ids: str, starttime: str, endtime: str +): + + print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") + + query_ids = ids.split(",") + return influxdb_api.query_simulation_SCADA_data_by_device_ID_and_timerange( + query_ids_list=query_ids, start_time=starttime, end_time=endtime + ) # 查询指定时间范围内,多个SCADA设备的清洗后的数据 @@ -4156,17 +4168,17 @@ async def fastapi_scada_device_data_cleaning( # 添加 time 列到首列 cleaned_value_df = pd.DataFrame(cleaned_value_df) - # 只选择以 '_cleaned' 结尾的清洗数据列 - cleaned_columns = [ - col for col in cleaned_value_df.columns if col.endswith("_cleaned") - ] - cleaned_value_df = cleaned_value_df[cleaned_columns] - # 重命名列,移除 '_cleaned' 后缀 - cleaned_value_df = cleaned_value_df.rename( - columns={ - col: col.replace("_cleaned", "") for col in cleaned_value_df.columns - } - ) + # # 只选择以 '_cleaned' 结尾的清洗数据列 + # cleaned_columns = [ + # col for col in cleaned_value_df.columns if col.endswith("_cleaned") + # ] + # cleaned_value_df = cleaned_value_df[cleaned_columns] + # # 重命名列,移除 '_cleaned' 后缀 + # cleaned_value_df = cleaned_value_df.rename( + # columns={ + # col: col.replace("_cleaned", "") for col in cleaned_value_df.columns + # } + # ) cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1) # 调试输出,确认列名