From 3012ddab064dccda85c77a3f2c8b578aedfa7af3 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sun, 4 May 2025 17:31:10 +0800 Subject: [PATCH] Add more SCADA API --- influxdb_api.py | 102 ++++++++++++++++++++++++++++++++++++++---------- main.py | 33 ++++++++++++++++ 2 files changed, 115 insertions(+), 20 deletions(-) diff --git a/influxdb_api.py b/influxdb_api.py index 4aafdd0..379397c 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1612,6 +1612,10 @@ def query_scheme_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], que return SCADA_result_dict # 2025/03/14 +# DingZQ +# 返回SCADA数据的原始值,其中可能包含了异常值跟缺失值,我们需要再后续曲线中修复 +# 缺失值 +# 异常值 def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): """ 查询指定时间范围内,多个SCADA设备的数据,用于漏损定位 @@ -1674,6 +1678,8 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start return SCADA_dict +# 2025/05/04 DingZQ +# SCADA 原始数据有异常偏离,返回的是一个list,list的内容是清洗后的正常值,表示为 time + value def query_cleaning_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): """ 查询指定时间范围内,多个SCADA设备的修复的单个的数据 @@ -1725,26 +1731,11 @@ def query_cleaning_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[st return SCADA_dict -# DingZQ, 2025-02-15 -def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date: str, bucket: str="SCADA_data") -> list[dict[str, float]]: +# 2025/05/04 DingZQ +# SCADA 数据原版缺失,根据历史数据的平均值补上缺失的部分 +def query_filling_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): """ - 根据SCADA设备的ID和日期查询值 - :param query_ids_list: SCADA设备ID的列表, 是api_query 而不是 普通的Id - :param query_date: 输入的日期,格式为 '2024-11-24', 日期是北京时间的日期 - :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 - :param client: 已初始化的 InfluxDBClient 实例。 - :return: - """ - - start_time, end_time = time_api.parse_beijing_date_range(query_date) - - return query_SCADA_data_by_device_ID_and_time_range(query_ids_list, str(start_time), str(end_time), bucket) - - -# 2025/04/17 -def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): - """ - 查询指定时间范围内,多个SCADA设备的清洗后的数据 + 查询指定时间范围内,多个SCADA设备的填补的单个的数据 :param query_ids_list: SCADA设备ID的列表 :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 @@ -1772,7 +1763,7 @@ def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str flux_query = f''' from(bucket: "{bucket}") |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) - |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datacleaning_value") + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datafilling_value") |> sort(columns: ["_time"]) ''' # 执行查询,返回一个 FluxTable 列表 @@ -1787,9 +1778,80 @@ def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str "value": record["_value"] }) SCADA_dict[device_id] = records_list + client.close() + return SCADA_dict +# 2025/05/04 DingZQ +# 是把原始数据跟清洗后的数据合并到一起,暂时不需要用这个API +def query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): + """ + 查询指定时间范围内,多个SCADA设备的清洗完毕后的完整数据 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2024-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + print('start_time', start_time) + print('end_time', end_time) + # 将北京时间转换为 UTC 时间 + beijing_start_time = datetime.fromisoformat(start_time) + print('beijing_start_time', beijing_start_time) + utc_start_time = time_api.to_utc_time(beijing_start_time) + print('utc_start_time', utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + print('beijing_end_time', beijing_end_time) + utc_stop_time = time_api.to_utc_time(beijing_end_time) + print('utc_stop_time', utc_stop_time) + SCADA_dict = {} + for device_id in query_ids_list: + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "cleaned_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + print(tables) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list + + client.close() + + return SCADA_dict + + +# DingZQ, 2025-02-15 +def query_SCADA_data_by_device_ID_and_date(query_ids_list: List[str], query_date: str, bucket: str="SCADA_data") -> list[dict[str, float]]: + """ + 根据SCADA设备的ID和日期查询值 + :param query_ids_list: SCADA设备ID的列表, 是api_query 而不是 普通的Id + :param query_date: 输入的日期,格式为 '2024-11-24', 日期是北京时间的日期 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :param client: 已初始化的 InfluxDBClient 实例。 + :return: + """ + + start_time, end_time = time_api.parse_beijing_date_range(query_date) + + return query_SCADA_data_by_device_ID_and_timerange(query_ids_list, str(start_time), str(end_time), bucket) + + # 2025/02/01 def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str, any]], link_result_list: List[Dict[str, any]], diff --git a/main.py b/main.py index dbfc572..f821497 100644 --- a/main.py +++ b/main.py @@ -2495,6 +2495,15 @@ async def fastapi_query_scada_data_by_device_id_and_time(ids: str, querytime: st logger.info(querytime) return influxdb_api.query_SCADA_data_by_device_ID_and_time(query_ids_list=query_ids, query_time=querytime) +# 2025/05/04 DingZQ +# 对于SCAD的曲线数据,我们需要有4 套数据值 +# 1. 原始数据 +# 2. 补充的数据 (补充前面第一步缺失的数据) +# 3. 清洗后的数据点 (用五角星表示) +# 4. 模拟曲线 + +# 查询到的SCADA原始数据 +# 数据1 @app.get("/queryscadadatabydeviceidandtimerange/") async def fastapi_query_scada_data_by_device_id_and_time_range(ids: str, starttime: str, endtime: str): @@ -2503,8 +2512,32 @@ async def fastapi_query_scada_data_by_device_id_and_time_range(ids: str, startti query_ids = ids.split(',') return influxdb_api.query_SCADA_data_by_device_ID_and_timerange(query_ids_list=query_ids, start_time=starttime, end_time=endtime) +# 查询到的SCADA补充的数据 +# 数据2 +@app.get("/queryfillingscadadatabydeviceidandtimerange/") +async def fastapi_query_filling_scada_data_by_device_id_and_time_range(ids: str, starttime: str, endtime: str): + + print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") + + query_ids = ids.split(',') + return influxdb_api.query_filling_SCADA_data_by_device_ID_and_timerange(query_ids_list=query_ids, start_time=starttime, end_time=endtime) + +# 查询到的SCADA清洗后的数据点 +# 数据3 +@app.get("/querycleanedscadadatabydeviceidandtimerange/") +async def fastapi_query_cleaned_scada_data_by_device_id_and_time_range(ids: str, starttime: str, endtime: str): + + print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") + + query_ids = ids.split(',') + return influxdb_api.query_cleaning_SCADA_data_by_device_ID_and_timerange(query_ids_list=query_ids, start_time=starttime, end_time=endtime) + + + + # 查询指定时间范围内,多个SCADA设备的清洗后的数据 # DingZQ, 2025-04-19 +# 2025/05/04 DingZQ 这个是将原始数据跟清洗后的数据合并到一起,暂时不需要用这个API @app.get("/querycleanedscadadatabydeviceidandtimerange/") async def fastapi_query_cleaned_scada_data_by_device_id_and_time_range(ids: str, starttime: str, endtime: str):