From ab787d22c654173077e95b195bf59d75ef8c8535 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sat, 12 Apr 2025 20:45:17 +0800 Subject: [PATCH] Add API query_all_records_by_time_range --- influxdb_api.py | 87 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/influxdb_api.py b/influxdb_api.py index 5718a83..0ca239b 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1931,6 +1931,93 @@ def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_ return node_records, link_records +# 2025/04/12 DingZQ +def query_all_records_by_time_range(starttime: str, endtime: str, bucket: str="realtime_simulation_result") -> tuple: + """ + 查询指定时间范围内的所有记录,包括‘node’和‘link’,分别以指定的格式返回 + :param starttime: 输入的开始时间,格式为‘2025-02-14T16:00:00+08:00’ + :param endtime: 输入的结束时间,格式为‘2025-02-14T16:00:00+08:00’ + :param bucket: 数据存储的bucket名称 + :return: dict: tuple: (node_records, link_records) + """ + client = get_new_client() + + # 记录开始时间 + time_cost_start = time.perf_counter() + print('{} -- query_all_records_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + + if not client.ping(): + print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + + bg_start_time = time_api.parse_beijing_time(starttime) + bg_end_time = time_api.parse_beijing_time(endtime) + utc_start_time = time_api.to_utc_time(bg_start_time) + utc_stop_time = time_api.to_utc_time(bg_end_time) + + print("bg_start_time", bg_start_time) + print("bg_end_time", bg_end_time) + print('utc_start_time', utc_start_time) + print('utc_stop_time', utc_stop_time) + + print('utc_start_time.isoformat', utc_start_time.isoformat()) + print('utc_stop_time.isoformat', utc_stop_time.isoformat()) + + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()}) + |> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link" and r["date"] == "{query_date}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + + # 执行查询 + tables = query_api.query(flux_query) + + node_records = [] + link_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + node_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + link_records.append({ + "time": record["_time"], + "ID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + + time_cost_end = time.perf_counter() + print('{} -- query_all_records_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) + + client.close() + + return node_records, link_records + # 2025/03/15 DingZQ def query_all_records_by_date_with_type(query_date: str, query_type: str, bucket: str="realtime_simulation_result") -> list: """