diff --git a/influxdb_api.py b/influxdb_api.py index f47aa7c..163a079 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1823,7 +1823,7 @@ def query_all_record_by_time_property(query_time: str, type: str, property: str, # 2025/02/21 -def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_result") -> tuple: +def query_all_records_by_date(query_date: str, bucket: str="realtime_simulation_result") -> tuple: """ 查询指定日期的所有记录,包括‘node’和‘link’,分别以指定的格式返回 :param query_date: 输入的日期,格式为‘2025-02-14’ @@ -1834,7 +1834,7 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r # 记录开始时间 time_cost_start = time.perf_counter() - print('{} -- query_all_record_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) + print('{} -- query_all_records_by_date started.'.format(datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'))) if not client.ping(): print("{} -- Failed to connect to InfluxDB.".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) @@ -1895,7 +1895,7 @@ def query_all_record_by_date(query_date: str, bucket: str="realtime_simulation_r }) time_cost_end = time.perf_counter() - print('{} -- query_all_record_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) + print('{} -- query_all_records_by_date finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) client.close() diff --git a/main.py b/main.py index d71de39..d342268 100644 --- a/main.py +++ b/main.py @@ -49,16 +49,24 @@ app = FastAPI() # 将 Query的信息 序列号到 redis/json, 默认不支持datetime,需要自定义 # 自定义序列化函数 -def default_encoder(obj): - if isinstance(obj, datetime): - return {"__datetime__": obj.isoformat()} - raise TypeError("Type not serializable") +# 序列化处理器 +def encode_datetime(obj): + """将datetime转换为可序列化的字典结构""" + if isinstance(obj, datetime.datetime): + return { + '__datetime__': True, + 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f") + } + return obj -# 自定义反序列化函数 -def object_hook(dct): - if "__datetime__" in dct: - return datetime.fromisoformat(dct["__datetime__"]) - return dct +# 反序列化处理器 +def decode_datetime(obj): + """将字典还原为datetime对象""" + if '__datetime__' in obj: + return datetime.datetime.strptime( + obj['as_str'], "%Y%m%dT%H:%M:%S.%f" + ) + return obj # 初始化 Redis 连接 # 用redis 限制并发访u @@ -658,11 +666,11 @@ async def fastapi_get_all_junction_properties(network: str) -> list[dict[str, An data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_junctions(network) - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -765,11 +773,11 @@ async def fastapi_get_all_reservoir_properties(network: str) -> list[dict[str, A data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_reservoirs(network) - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -945,11 +953,11 @@ async def fastapi_get_all_tank_properties(network: str) -> list[dict[str, Any]]: data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_tanks(network) - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -1080,11 +1088,11 @@ async def fastapi_get_all_pipe_properties(network: str) -> list[dict[str, Any]]: data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_pipes(network) - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -1152,11 +1160,11 @@ async def fastapi_get_all_pump_properties(network: str) -> list[dict[str, Any]]: data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_pumps(network) - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -1264,11 +1272,11 @@ async def fastapi_get_all_valve_properties(network: str) -> list[dict[str, Any]] data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_valves(network) - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -1690,7 +1698,7 @@ async def fastapi_get_network_geometries(network: str) -> dict[str, Any] | None: data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict coords = get_network_node_coords(network) @@ -1710,7 +1718,7 @@ async def fastapi_get_network_geometries(network: str) -> dict[str, Any] | None: 'scadas': scadas } # 缓存查询结果提高性能 - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -2320,14 +2328,14 @@ async def fastapi_query_all_records_by_date(querydate: str) -> dict[str, list]: data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict nodes_links: tuple = influxdb_api.query_all_record_by_date(query_date=querydate) results = { "nodes": nodes_links[0], "links": nodes_links[1] } - redis_client.set(cache_key, msgpack.packb(results, default=object_hook)) + redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @@ -2340,12 +2348,12 @@ async def fastapi_query_all_records_by_date_with_type(querydate: str, querytype: data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype) - packed = msgpack.packb(results, default=default_encoder) + packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results @@ -2359,10 +2367,10 @@ async def fastapi_query_all_records_by_ids_date_type(ids:str, querydate: str, qu results = [] if data: # 使用自定义的反序列化函数 - results = msgpack.unpackb(data, object_hook=object_hook) + results = msgpack.unpackb(data, object_hook=decode_datetime) else: results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype) - packed = msgpack.packb(results, default=default_encoder) + packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) query_ids = ids.split(",") @@ -2382,11 +2390,11 @@ async def fastapi_query_all_records_by_date_property(querydate: str, querytype: data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict result_dict = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property) - packed = msgpack.packb(result_dict, default=default_encoder) + packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) return result_dict @@ -2433,11 +2441,11 @@ async def fastapi_query_all_scada_records_by_date(querydate: str): data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate) - packed = msgpack.packb(result_dict, default=default_encoder) + packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) return result_dict @@ -2452,11 +2460,11 @@ async def fastapi_query_all_scheme_all_records(schemetype: str, schemename: str, data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 - loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate) - packed = msgpack.packb(results, default=default_encoder) + packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results @@ -2472,10 +2480,10 @@ async def fastapi_query_all_scheme_all_records_property(schemetype: str, schemen all_results = None if data: # 使用自定义的反序列化函数 - all_results = msgpack.unpackb(data, object_hook=object_hook) + all_results = msgpack.unpackb(data, object_hook=decode_datetime) else: all_results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate) - packed = msgpack.packb(all_results, default=default_encoder) + packed = msgpack.packb(all_results, default=encode_datetime) redis_client.set(cache_key, packed) results = None