Add API query_all_records_by_time_range
This commit is contained in:
@@ -1931,6 +1931,93 @@ def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_
|
||||
|
||||
return node_records, link_records
|
||||
|
||||
# 2025/04/12 DingZQ
|
||||
def query_all_records_by_time_range(starttime: str, endtime: str, bucket: str="realtime_simulation_result") -> tuple:
|
||||
"""
|
||||
查询指定时间范围内的所有记录,包括‘node’和‘link’,分别以指定的格式返回
|
||||
:param starttime: 输入的开始时间,格式为‘2025-02-14T16:00:00+08:00’
|
||||
:param endtime: 输入的结束时间,格式为‘2025-02-14T16:00:00+08:00’
|
||||
:param bucket: 数据存储的bucket名称
|
||||
:return: dict: tuple: (node_records, link_records)
|
||||
"""
|
||||
client = get_new_client()
|
||||
|
||||
# 记录开始时间
|
||||
time_cost_start = time.perf_counter()
|
||||
print('{} -- query_all_records_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
if not client.ping():
|
||||
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||
|
||||
query_api = client.query_api()
|
||||
|
||||
bg_start_time = time_api.parse_beijing_time(starttime)
|
||||
bg_end_time = time_api.parse_beijing_time(endtime)
|
||||
utc_start_time = time_api.to_utc_time(bg_start_time)
|
||||
utc_stop_time = time_api.to_utc_time(bg_end_time)
|
||||
|
||||
print("bg_start_time", bg_start_time)
|
||||
print("bg_end_time", bg_end_time)
|
||||
print('utc_start_time', utc_start_time)
|
||||
print('utc_stop_time', utc_stop_time)
|
||||
|
||||
print('utc_start_time.isoformat', utc_start_time.isoformat())
|
||||
print('utc_stop_time.isoformat', utc_stop_time.isoformat())
|
||||
|
||||
# 构建 Flux 查询语句
|
||||
flux_query = f'''
|
||||
from(bucket: "{bucket}")
|
||||
|> range(start: {utc_start_time.isoformat()}, stop: {utc_stop_time.isoformat()})
|
||||
|> filter(fn: (r) => r["_measurement"] == "node" or r["_measurement"] == "link" and r["date"] == "{query_date}")
|
||||
|> pivot(
|
||||
rowKey:["_time"],
|
||||
columnKey:["_field"],
|
||||
valueColumn:"_value"
|
||||
)
|
||||
'''
|
||||
|
||||
# 执行查询
|
||||
tables = query_api.query(flux_query)
|
||||
|
||||
node_records = []
|
||||
link_records = []
|
||||
# 解析查询结果
|
||||
for table in tables:
|
||||
for record in table.records:
|
||||
# print(record.values) # 打印完整记录内容
|
||||
measurement = record["_measurement"]
|
||||
# 处理 node 数据
|
||||
if measurement == "node":
|
||||
node_records.append({
|
||||
"time": record["_time"],
|
||||
"ID": record["ID"],
|
||||
"head": record["head"],
|
||||
"pressure": record["pressure"],
|
||||
"actualdemand": record["actualdemand"],
|
||||
"quality": record["quality"]
|
||||
})
|
||||
# 处理 link 数据
|
||||
elif measurement == "link":
|
||||
link_records.append({
|
||||
"time": record["_time"],
|
||||
"ID": record["ID"],
|
||||
"flow": record["flow"],
|
||||
"velocity": record["velocity"],
|
||||
"headloss": record["headloss"],
|
||||
"quality": record["quality"],
|
||||
"status": record["status"],
|
||||
"setting": record["setting"],
|
||||
"reaction": record["reaction"],
|
||||
"friction": record["friction"]
|
||||
})
|
||||
|
||||
time_cost_end = time.perf_counter()
|
||||
print('{} -- query_all_records_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start))
|
||||
|
||||
client.close()
|
||||
|
||||
return node_records, link_records
|
||||
|
||||
# 2025/03/15 DingZQ
|
||||
def query_all_records_by_date_with_type(query_date: str, query_type: str, bucket: str="realtime_simulation_result") -> list:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user