117 lines
3.4 KiB
Python
117 lines
3.4 KiB
Python
import schedule
|
||
import time
|
||
import datetime
|
||
import shutil
|
||
import redis
|
||
import urllib.request
|
||
import influxdb_api
|
||
import msgpack
|
||
import datetime
|
||
|
||
# 将 Query的信息 序列号到 redis/json, 默认不支持datetime,需要自定义
|
||
# 自定义序列化函数
|
||
# 序列化处理器
|
||
def encode_datetime(obj):
|
||
"""将datetime转换为可序列化的字典结构"""
|
||
if isinstance(obj, datetime.datetime):
|
||
return {
|
||
'__datetime__': True,
|
||
'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")
|
||
}
|
||
return obj
|
||
|
||
# 反序列化处理器
|
||
def decode_datetime(obj):
|
||
"""将字典还原为datetime对象"""
|
||
if '__datetime__' in obj:
|
||
return datetime.datetime.strptime(
|
||
obj['as_str'], "%Y%m%dT%H:%M:%S.%f"
|
||
)
|
||
return obj
|
||
|
||
##########################
|
||
# 需要用Python 3.12 来运行才能提高performance
|
||
##########################
|
||
|
||
def queryallrecordsbydate(querydate: str, redis_client: redis.Redis):
|
||
cache_key = f"queryallrecordsbydate_{querydate}"
|
||
exists = redis_client.exists(cache_key)
|
||
|
||
if not exists:
|
||
nodes_links: tuple = influxdb_api.query_all_records_by_date(query_date=querydate)
|
||
redis_client.set(cache_key, msgpack.packb(nodes_links, default=encode_datetime))
|
||
|
||
def queryallrecordsbydate_by_url(querydate: str):
|
||
print(f'queryallrecordsbydateproperty: {querydate}')
|
||
|
||
try:
|
||
response = urllib.request.urlopen(
|
||
f"http://localhost/queryallrecordsbydate/?querydate={querydate}"
|
||
)
|
||
html = response.read().decode("utf-8")
|
||
|
||
except urllib.error.URLError as e:
|
||
print("Error")
|
||
|
||
def queryallscadarecordsbydate(querydate: str, redis_client: redis.Redis):
|
||
cache_key = f"queryallscadarecordsbydate_{querydate}"
|
||
exists = redis_client.exists(cache_key)
|
||
|
||
if not exists:
|
||
result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate)
|
||
redis_client.set(cache_key, msgpack.packb(result_dict, default=encode_datetime))
|
||
|
||
def queryallscadarecordsbydate_by_url(querydate: str):
|
||
print(f'queryallscadarecordsbydate_by_url: {querydate}')
|
||
|
||
try:
|
||
response = urllib.request.urlopen(
|
||
f"http://localhost/queryallrecordsbydate/?querydate={querydate}"
|
||
)
|
||
html = response.read().decode("utf-8")
|
||
|
||
except urllib.error.URLError as e:
|
||
print("Error")
|
||
|
||
|
||
def auto_cache_data():
|
||
# 初始化 Redis 连接
|
||
# 用redis 限制并发访u
|
||
redis_client = redis.Redis(host="localhost", port=6379, db=0)
|
||
|
||
# auto cache data for the last 3 days
|
||
today = datetime.date.today()
|
||
for i in range(1, 4):
|
||
prev_day = today - datetime.timedelta(days=i)
|
||
str_prev_day = prev_day.strftime('%Y-%m-%d')
|
||
print(str_prev_day)
|
||
|
||
queryallrecordsbydate(str_prev_day, redis_client)
|
||
queryallscadarecordsbydate(str_prev_day, redis_client)
|
||
|
||
redis_client.close()
|
||
|
||
def auto_cache_data_by_url():
|
||
# auto cache data for the last 3 days
|
||
today = datetime.date.today()
|
||
for i in range(1, 4):
|
||
prev_day = today - datetime.timedelta(days=i)
|
||
str_prev_day = prev_day.strftime('%Y-%m-%d')
|
||
print(str_prev_day)
|
||
|
||
queryallrecordsbydate_by_url(str_prev_day)
|
||
queryallscadarecordsbydate_by_url(str_prev_day)
|
||
|
||
redis_client.close()
|
||
|
||
if __name__ == "__main__":
|
||
|
||
auto_cache_data_by_url()
|
||
|
||
# auto run in the midnight
|
||
schedule.every().day.at("03:00").do(auto_cache_data_by_url)
|
||
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|