import schedule import time import datetime import shutil import redis import urllib.request import influxdb_api import msgpack # 将 Query的信息 序列号到 redis/json, 默认不支持datetime,需要自定义 # 自定义序列化函数 # 序列化处理器 def encode_datetime(obj): """将datetime转换为可序列化的字典结构""" if isinstance(obj, datetime): return { '__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f") } return obj # 反序列化处理器 def decode_datetime(obj): """将字典还原为datetime对象""" if '__datetime__' in obj: return datetime.strptime( obj['as_str'], "%Y%m%dT%H:%M:%S.%f" ) return obj ########################## # 需要用Python 3.12 来运行才能提高performance ########################## def queryallrecordsbydate(querydate: str, redis_client: redis.Redis): cache_key = f"queryallrecordsbydate_{querydate}" data = redis_client.get(cache_key) if not data: nodes_links: tuple = influxdb_api.query_all_records_by_date(query_date=querydate) redis_client.set(cache_key, msgpack.packb(nodes_links, default=encode_datetime)) def queryallscadarecordsbydate(querydate: str, redis_client: redis.Redis): cache_key = f"queryallscadarecordsbydate_{querydate}" data = redis_client.get(cache_key) if not data: result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate) redis_client.set(cache_key, msgpack.packb(result_dict, default=encode_datetime)) def auto_cache_data(): # 初始化 Redis 连接 # 用redis 限制并发访u redis_client = redis.Redis(host="localhost", port=6379, db=0) # auto cache data for the last 3 days today = datetime.date.today() for i in range(1, 4): prev_day = today - datetime.timedelta(days=i) str_prev_day = prev_day.strftime('%Y-%m-%d') print(str_prev_day) queryallrecordsbydate(str_prev_day, redis_client) queryallscadarecordsbydate(str_prev_day, redis_client) redis_client.close() if __name__ == "__main__": auto_cache_data() # auto run in the midnight schedule.every().day.at("03:00").do(auto_cache_data) while True: schedule.run_pending() time.sleep(1)