Refine
This commit is contained in:
@@ -1877,6 +1877,78 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r
|
|||||||
time_cost_end - time_cost_start))
|
time_cost_end - time_cost_start))
|
||||||
return node_records, link_records
|
return node_records, link_records
|
||||||
|
|
||||||
|
# 2025/03/15 DingZQ
|
||||||
|
def query_all_records_by_date_with_type(query_date: str, query_type: str, bucket: str="realtime_simulation_result", client: InfluxDBClient=client) -> list:
|
||||||
|
"""
|
||||||
|
查询指定日期的所有记录,包括‘node’和‘link’,分别以指定的格式返回
|
||||||
|
:param query_date: 输入的日期,格式为‘2025-02-14’
|
||||||
|
:param query_type: type 可以是 node 或者 link
|
||||||
|
:param bucket: 数据存储的bucket名称
|
||||||
|
:param client: 已初始化的InfluxDBClient 实例。
|
||||||
|
:return: dict: tuple: (node_records, link_records)
|
||||||
|
"""
|
||||||
|
# 记录开始时间
|
||||||
|
time_cost_start = time.perf_counter()
|
||||||
|
print('{} -- Hydraulic simulation started.'.format(
|
||||||
|
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
|
||||||
|
if client.ping():
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
|
||||||
|
|
||||||
|
query_api = client.query_api()
|
||||||
|
# 将 start_date 的北京时间转换为 UTC 时间
|
||||||
|
start_time = (datetime.strptime(query_date, "%Y-%m-%d") - timedelta(days=1)).replace(hour=16, minute=0, second=0, tzinfo=timezone.utc).isoformat()
|
||||||
|
# 构建 Flux 查询语句
|
||||||
|
flux_query = f'''
|
||||||
|
from(bucket: "{bucket}")
|
||||||
|
|> range(start: {start_time})
|
||||||
|
|> filter(fn: (r) => r["_measurement"] == "{query_type}" and r["date"] == "{query_date}")
|
||||||
|
|> pivot(
|
||||||
|
rowKey:["_time"],
|
||||||
|
columnKey:["_field"],
|
||||||
|
valueColumn:"_value"
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
# 执行查询
|
||||||
|
tables = query_api.query(flux_query)
|
||||||
|
result_records = []
|
||||||
|
# 解析查询结果
|
||||||
|
for table in tables:
|
||||||
|
for record in table.records:
|
||||||
|
# print(record.values) # 打印完整记录内容
|
||||||
|
measurement = record["_measurement"]
|
||||||
|
# 处理 node 数据
|
||||||
|
if measurement == "node":
|
||||||
|
result_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"ID": record["ID"],
|
||||||
|
"head": record["head"],
|
||||||
|
"pressure": record["pressure"],
|
||||||
|
"actualdemand": record["actualdemand"],
|
||||||
|
"quality": record["quality"]
|
||||||
|
})
|
||||||
|
# 处理 link 数据
|
||||||
|
elif measurement == "link":
|
||||||
|
result_records.append({
|
||||||
|
"time": record["_time"],
|
||||||
|
"linkID": record["ID"],
|
||||||
|
"flow": record["flow"],
|
||||||
|
"velocity": record["velocity"],
|
||||||
|
"headloss": record["headloss"],
|
||||||
|
"quality": record["quality"],
|
||||||
|
"status": record["status"],
|
||||||
|
"setting": record["setting"],
|
||||||
|
"reaction": record["reaction"],
|
||||||
|
"friction": record["friction"]
|
||||||
|
})
|
||||||
|
time_cost_end = time.perf_counter()
|
||||||
|
print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format(
|
||||||
|
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'),
|
||||||
|
time_cost_end - time_cost_start))
|
||||||
|
|
||||||
|
return result_records
|
||||||
|
|
||||||
# 2025/02/21
|
# 2025/02/21
|
||||||
def query_all_record_by_date_property(query_date: str, type: str, property: str,
|
def query_all_record_by_date_property(query_date: str, type: str, property: str,
|
||||||
|
|||||||
21
main.py
21
main.py
@@ -2147,6 +2147,27 @@ async def fastapi_query_all_records_by_date(querydate: str) -> dict[str, list]:
|
|||||||
return { "nodes": results[0],
|
return { "nodes": results[0],
|
||||||
"links": results[1] }
|
"links": results[1] }
|
||||||
|
|
||||||
|
#2025-03-15, DingZQ
|
||||||
|
@app.get("/queryallrecordsbydatewithtype/")
|
||||||
|
async def fastapi_query_all_records_by_date_with_type(querydate: str, querytype: str) -> list:
|
||||||
|
# 缓存查询结果提高性能
|
||||||
|
global redis_client
|
||||||
|
cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}"
|
||||||
|
data = redis_client.get(cache_key)
|
||||||
|
if data:
|
||||||
|
# 使用自定义的反序列化函数
|
||||||
|
loaded_dict = msgpack.unpackb(data, object_hook=object_hook)
|
||||||
|
return loaded_dict
|
||||||
|
|
||||||
|
results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype, client=influx_client)
|
||||||
|
packed = msgpack.packb(result_dict, default=default_encoder)
|
||||||
|
redis_client.set(cache_key, packed)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# 查询指定日期、类型、属性的所有记录
|
# 查询指定日期、类型、属性的所有记录
|
||||||
# 返回 [{'time': '2024-01-01T00:00:00Z', 'ID': '1', 'value': 1.0}, {'time': '2024-01-01T00:00:00Z', 'ID': '2', 'value': 2.0}]
|
# 返回 [{'time': '2024-01-01T00:00:00Z', 'ID': '1', 'value': 1.0}, {'time': '2024-01-01T00:00:00Z', 'ID': '2', 'value': 2.0}]
|
||||||
@app.get("/queryallrecordsbydateproperty/")
|
@app.get("/queryallrecordsbydateproperty/")
|
||||||
|
|||||||
Reference in New Issue
Block a user