diff --git a/influxdb_api.py b/influxdb_api.py index afb2143..4aafdd0 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1669,7 +1669,60 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start "value": record["_value"] }) SCADA_dict[device_id] = records_list + client.close() + + return SCADA_dict + +def query_cleaning_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start_time: str, end_time: str, bucket: str="SCADA_data"): + """ + 查询指定时间范围内,多个SCADA设备的修复的单个的数据 + :param query_ids_list: SCADA设备ID的列表 + :param start_time: 输入的北京时间,格式为 '2023-11-24T17:30:00+08:00'。 + :param end_time: 输入的北京时间,格式为 '2023-11-24T17:30:00+08:00'。 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :return: + """ + client = get_new_client() + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + print('start_time', start_time) + print('end_time', end_time) + # 将北京时间转换为 UTC 时间 + beijing_start_time = datetime.fromisoformat(start_time) + print('beijing_start_time', beijing_start_time) + utc_start_time = time_api.to_utc_time(beijing_start_time) + print('utc_start_time', utc_start_time) + beijing_end_time = datetime.fromisoformat(end_time) + print('beijing_end_time', beijing_end_time) + utc_stop_time = time_api.to_utc_time(beijing_end_time) + print('utc_stop_time', utc_stop_time) + + SCADA_dict = {} + for device_id in query_ids_list: + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "datacleaning_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询,返回一个 FluxTable 列表 + tables = query_api.query(flux_query) + print(tables) + records_list = [] + for table in tables: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + SCADA_dict[device_id] = records_list + + client.close() + return SCADA_dict # DingZQ, 2025-02-15 @@ -3373,6 +3426,7 @@ def upload_cleaned_SCADA_data_to_influxdb(file_path: str, bucket: str="SCADA_dat client.close() + # 示例调用 if __name__ == "__main__": url = influxdb_info.url