From a3e5d55ed87518236dcf03d70bb438cd842cbd27 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sat, 15 Mar 2025 16:49:49 +0800 Subject: [PATCH] Refine --- influxdb_api.py | 72 +++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 21 +++++++++++++++ 2 files changed, 93 insertions(+) diff --git a/influxdb_api.py b/influxdb_api.py index a376f8f..dc6ad84 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1877,6 +1877,78 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r time_cost_end - time_cost_start)) return node_records, link_records +# 2025/03/15 DingZQ +def query_all_records_by_date_with_type(query_date: str, query_type: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list: + """ + 查询指定日期的所有记录,包括‘node’和‘link’,分别以指定的格式返回 + :param query_date: 输入的日期,格式为‘2025-02-14’ + :param query_type: type 可以是 node 或者 link + :param bucket: 数据存储的bucket名称 + :param client: 已初始化的InfluxDBClient 实例。 + :return: dict: tuple: (node_records, link_records) + """ + # 记录开始时间 + time_cost_start = time.perf_counter() + print('{} -- Hydraulic simulation started.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + + if client.ping(): + pass + else: + print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + query_api = client.query_api() + # 将 start_date 的北京时间转换为 UTC 时间 + start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat() + # 构建 Flux 查询语句 + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_time}) + |> filter(fn: (r) => r["_measurement"] == "{query_type}" and r["date"] == "{query_date}") + |> pivot( + rowKey:["_time"], + columnKey:["_field"], + valueColumn:"_value" + ) + ''' + # 执行查询 + tables = query_api.query(flux_query) + result_records = [] + # 解析查询结果 + for table in tables: + for record in table.records: + # print(record.values) # 打印完整记录内容 + measurement = record["_measurement"] + # 处理 node 数据 + if measurement == "node": + result_records.append({ + "time": record["_time"], + "ID": record["ID"], + "head": record["head"], + "pressure": record["pressure"], + "actualdemand": record["actualdemand"], + "quality": record["quality"] + }) + # 处理 link 数据 + elif measurement == "link": + result_records.append({ + "time": record["_time"], + "linkID": record["ID"], + "flow": record["flow"], + "velocity": record["velocity"], + "headloss": record["headloss"], + "quality": record["quality"], + "status": record["status"], + "setting": record["setting"], + "reaction": record["reaction"], + "friction": record["friction"] + }) + time_cost_end = time.perf_counter() + print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( + datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), + time_cost_end - time_cost_start)) + + return result_records # 2025/02/21 def query_all_record_by_date_property(query_date: str, type: str, property: str, diff --git a/main.py b/main.py index 39668f5..561d6c6 100644 --- a/main.py +++ b/main.py @@ -2147,6 +2147,27 @@ async def fastapi_query_all_records_by_date(querydate: str) -> dict[str, list]: return { "nodes": results[0], "links": results[1] } +#2025-03-15, DingZQ +@app.get("/queryallrecordsbydatewithtype/") +async def fastapi_query_all_records_by_date_with_type(querydate: str, querytype: str) -> list: + # 缓存查询结果提高性能 + global redis_client + cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}" + data = redis_client.get(cache_key) + if data: + # 使用自定义的反序列化函数 + loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + return loaded_dict + + results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype, client=influx_client) + packed = msgpack.packb(result_dict, default=default_encoder) + redis_client.set(cache_key, packed) + + return results + + + + # 查询指定日期、类型、属性的所有记录 # 返回 [{'time': '2024-01-01T00:00:00Z', 'ID': '1', 'value': 1.0}, {'time': '2024-01-01T00:00:00Z', 'ID': '2', 'value': 2.0}] @app.get("/queryallrecordsbydateproperty/")