diff --git a/influxdb_api.py b/influxdb_api.py index 9ed12c0..676b18d 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1321,6 +1321,67 @@ def download_history_data_manually(begin_time: str, end_time: str, bucket: str = write_api.write(bucket=bucket, org=org_name, record=points_to_write) write_api.flush() # 刷新缓存一次 +########################SCADA############################################################################################################ + +# DingZQ, 2025-03-08 +def query_all_SCADA_records_by_date(query_date: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> list[dict[str, float]]: + + """ + 根据日期查询所有SCADA数据 + + :param query_date: 输入的日期,格式为 '2024-11-24', 日期是北京时间的日期 + :param bucket: InfluxDB 的 bucket 名称,默认值为 "SCADA_data"。 + :param client: 已初始化的 InfluxDBClient 实例。 + + :return: + """ + + if client.ping(): print("{} -- Successfully connected to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将北京时间转换为 UTC 时间 + + bg_start_time, bg_end_time = time_api.parse_beijing_date_range(query_date) + # bg_end_time = bg_start_time + timedelta(hours=2) # 服务器性能不行,暂时返回2个小时的数据 + utc_start_time = bg_start_time.astimezone(timezone.utc) + utc_end_time = bg_end_time.astimezone(timezone.utc) + + print(f"utc_start_time: {utc_start_time}, utc_end_time: {utc_end_time}") + + # 构建查询字典 + SCADA_results = [] + + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_end_time.isoformat()}) + |> filter(fn: (r) => r["_field"] == "monitored_value") + |> sort(columns: ["_time"], desc: false) + ''' + + # 执行查询 + try: + result = query_api.query(flux_query) + + # 从查询结果中提取 monitored_value + if result: + # 假设返回的结果为一行数据 + for table in result: + for record in table.records: + # 获取字段 "_value" 即为 monitored_value + monitored_value = record.get_value() + rec = { + "ID": record['device_ID'], # 是api_query 而不是 普通的Id + "time": record.get_time(), + record['_measurement']: monitored_value + } + SCADA_results.append(rec) + + except Exception as e: + print(f"Error querying InfluxDB for date {query_date}: {e}") + + return SCADA_results def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]: """