Refine msgpack usage for datetime

This commit is contained in:
DingZQ
2025-04-04 10:56:40 +08:00
parent 2f14395d1e
commit b8deadbd11
2 changed files with 48 additions and 40 deletions

View File

@@ -1823,7 +1823,7 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str,
# 2025/02/21 # 2025/02/21
def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result") -> tuple: def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_result") -> tuple:
""" """
查询指定日期的所有记录包括nodelink分别以指定的格式返回 查询指定日期的所有记录包括nodelink分别以指定的格式返回
:param query_date: 输入的日期格式为2025-02-14 :param query_date: 输入的日期格式为2025-02-14
@@ -1834,7 +1834,7 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r
# 记录开始时间 # 记录开始时间
time_cost_start = time.perf_counter() time_cost_start = time.perf_counter()
print('{} -- query_all_record_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) print('{} -- query_all_records_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
if not client.ping(): if not client.ping():
print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
@@ -1895,7 +1895,7 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r
}) })
time_cost_end = time.perf_counter() time_cost_end = time.perf_counter()
print('{} -- query_all_record_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) print('{} -- query_all_records_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start))
client.close() client.close()

82
main.py
View File

@@ -49,16 +49,24 @@ app = FastAPI()
# 将 Query的信息 序列号到 redis/json 默认不支持datetime需要自定义 # 将 Query的信息 序列号到 redis/json 默认不支持datetime需要自定义
# 自定义序列化函数 # 自定义序列化函数
def default_encoder(obj): # 序列化处理器
if isinstance(obj, datetime): def encode_datetime(obj):
return {"__datetime__": obj.isoformat()} """datetime转换为可序列化的字典结构"""
raise TypeError("Type not serializable") if isinstance(obj, datetime.datetime):
return {
'__datetime__': True,
'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")
}
return obj
# 自定义反序列化函数 # 反序列化处理器
def object_hook(dct): def decode_datetime(obj):
if "__datetime__" in dct: """将字典还原为datetime对象"""
return datetime.fromisoformat(dct["__datetime__"]) if '__datetime__' in obj:
return dct return datetime.datetime.strptime(
obj['as_str'], "%Y%m%dT%H:%M:%S.%f"
)
return obj
# 初始化 Redis 连接 # 初始化 Redis 连接
# 用redis 限制并发访u # 用redis 限制并发访u
@@ -658,11 +666,11 @@ async def fastapi_get_all_junction_properties(network: str) -> list[dict[str, An
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = get_all_junctions(network) results = get_all_junctions(network)
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -765,11 +773,11 @@ async def fastapi_get_all_reservoir_properties(network: str) -> list[dict[str, A
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = get_all_reservoirs(network) results = get_all_reservoirs(network)
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -945,11 +953,11 @@ async def fastapi_get_all_tank_properties(network: str) -> list[dict[str, Any]]:
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = get_all_tanks(network) results = get_all_tanks(network)
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -1080,11 +1088,11 @@ async def fastapi_get_all_pipe_properties(network: str) -> list[dict[str, Any]]:
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = get_all_pipes(network) results = get_all_pipes(network)
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -1152,11 +1160,11 @@ async def fastapi_get_all_pump_properties(network: str) -> list[dict[str, Any]]:
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = get_all_pumps(network) results = get_all_pumps(network)
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -1264,11 +1272,11 @@ async def fastapi_get_all_valve_properties(network: str) -> list[dict[str, Any]]
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = get_all_valves(network) results = get_all_valves(network)
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -1690,7 +1698,7 @@ async def fastapi_get_network_geometries(network: str) -> dict[str, Any] | None:
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
coords = get_network_node_coords(network) coords = get_network_node_coords(network)
@@ -1710,7 +1718,7 @@ async def fastapi_get_network_geometries(network: str) -> dict[str, Any] | None:
'scadas': scadas } 'scadas': scadas }
# 缓存查询结果提高性能 # 缓存查询结果提高性能
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -2320,14 +2328,14 @@ async def fastapi_query_all_records_by_date(querydate: str) -> dict[str, list]:
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
nodes_links: tuple = influxdb_api.query_all_record_by_date(query_date=querydate) nodes_links: tuple = influxdb_api.query_all_record_by_date(query_date=querydate)
results = { "nodes": nodes_links[0], results = { "nodes": nodes_links[0],
"links": nodes_links[1] } "links": nodes_links[1] }
redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime))
return results return results
@@ -2340,12 +2348,12 @@ async def fastapi_query_all_records_by_date_with_type(querydate: str, querytype:
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype) results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype)
packed = msgpack.packb(results, default=default_encoder) packed = msgpack.packb(results, default=encode_datetime)
redis_client.set(cache_key, packed) redis_client.set(cache_key, packed)
return results return results
@@ -2359,10 +2367,10 @@ async def fastapi_query_all_records_by_ids_date_type(ids:str, querydate: str, qu
results = [] results = []
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
results = msgpack.unpackb(data, object_hook=object_hook) results = msgpack.unpackb(data, object_hook=decode_datetime)
else: else:
results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype) results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype)
packed = msgpack.packb(results, default=default_encoder) packed = msgpack.packb(results, default=encode_datetime)
redis_client.set(cache_key, packed) redis_client.set(cache_key, packed)
query_ids = ids.split(",") query_ids = ids.split(",")
@@ -2382,11 +2390,11 @@ async def fastapi_query_all_records_by_date_property(querydate: str, querytype:
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
result_dict = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property) result_dict = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property)
packed = msgpack.packb(result_dict, default=default_encoder) packed = msgpack.packb(result_dict, default=encode_datetime)
redis_client.set(cache_key, packed) redis_client.set(cache_key, packed)
return result_dict return result_dict
@@ -2433,11 +2441,11 @@ async def fastapi_query_all_scada_records_by_date(querydate: str):
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate) result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate)
packed = msgpack.packb(result_dict, default=default_encoder) packed = msgpack.packb(result_dict, default=encode_datetime)
redis_client.set(cache_key, packed) redis_client.set(cache_key, packed)
return result_dict return result_dict
@@ -2452,11 +2460,11 @@ async def fastapi_query_all_scheme_all_records(schemetype: str, schemename: str,
data = redis_client.get(cache_key) data = redis_client.get(cache_key)
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
loaded_dict = msgpack.unpackb(data, object_hook=object_hook) loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime)
return loaded_dict return loaded_dict
results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate) results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate)
packed = msgpack.packb(results, default=default_encoder) packed = msgpack.packb(results, default=encode_datetime)
redis_client.set(cache_key, packed) redis_client.set(cache_key, packed)
return results return results
@@ -2472,10 +2480,10 @@ async def fastapi_query_all_scheme_all_records_property(schemetype: str, schemen
all_results = None all_results = None
if data: if data:
# 使用自定义的反序列化函数 # 使用自定义的反序列化函数
all_results = msgpack.unpackb(data, object_hook=object_hook) all_results = msgpack.unpackb(data, object_hook=decode_datetime)
else: else:
all_results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate) all_results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate)
packed = msgpack.packb(all_results, default=default_encoder) packed = msgpack.packb(all_results, default=encode_datetime)
redis_client.set(cache_key, packed) redis_client.set(cache_key, packed)
results = None results = None