diff --git a/influxdb_api.py b/influxdb_api.py index d7aae55..b3e0468 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1487,35 +1487,39 @@ def query_SCADA_data_by_device_ID_and_timerange(query_ids_list: List[str], start beijing_end_time = datetime.fromisoformat(end_time) print('beijing_end_time', beijing_end_time) - utc_end_time = time_api.to_utc_time(beijing_end_time) - print('utc_end_time', utc_end_time) + utc_stop_time = time_api.to_utc_time(beijing_end_time) + print('utc_stop_time', utc_stop_time) SCADA_dict = {} for device_id in query_ids_list: - - print('device_id', device_id) - print('utc_start_time', utc_start_time.isoformat()) - print('utc_end_time', utc_end_time.isoformat()) - + # 构建 Flux 查询语句 flux_query = f''' - from(bucket: "{bucket}") - |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) - |> filter(fn: (r) => r["device_ID"] = {device_id} and r["_field"] == "monitored_value") - |> sort(columns: ["_time"]) - ''' - - # 执行查询,返回一个 FluxTable 列表 - tables = query_api.query(flux_query) - - records_list = [] - for table in tables: - for record in table.records: - # 获取记录的时间和监测值 - records_list.append({ - "time": record["_time"], - "value": record["_value"] - }) - SCADA_dict[device_id] = records_list + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["device_ID"] == "{device_id}" and r["_field"] == "monitored_value") + |> sort(columns: ["_time"]) + ''' + # 执行查询 + try: + result = query_api.query(flux_query) + # 从查询结果中提取 monitored_value + if result: + # 假设返回的结果为一行数据 + records_list = [] + + for table in result: + for record in table.records: + # 获取记录的时间和监测值 + records_list.append({ + "time": record["_time"], + "value": record["_value"] + }) + + SCADA_dict[device_id] = records_list + + except Exception as e: + print(f"Error querying InfluxDB for device ID {device_id}: {e}") + SCADA_result_dict[device_id] = None client.close()