From 246c6592edc62ace83eb3526e60e7cb184c41f38 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sun, 9 Mar 2025 10:18:54 +0800 Subject: [PATCH] Use redis the store the cache --- main.py | 69 +++++++++++++++++++++++++----------------- startfastapiserver.bat | 3 +- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/main.py b/main.py index c3d597a..82db425 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,5 @@ -import os -import io -import json -import time +import asyncio, os, io, json, time, pickle, redis, datetime, logging, threading, uvicorn, multiprocessing, asyncio, shutil, random + from typing import * from urllib.request import Request from xml.dom import minicompat @@ -13,21 +11,13 @@ from fastapi import FastAPI, File, UploadFile, Response, status, Request, Body, from fastapi.responses import PlainTextResponse from fastapi.middleware.gzip import GZipMiddleware from tjnetwork import * -import asyncio -import threading -import uvicorn from multiprocessing import Value import uvicorn +import msgpack from run_simulation import run_simulation, run_simulation_ex from online_Analysis import * -import logging from fastapi.middleware.cors import CORSMiddleware -import random from datetime import datetime -import shutil -import logging -import redis -import datetime from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from typing import List, Dict from datetime import datetime, timedelta, timezone @@ -48,9 +38,6 @@ tmpDir = "C:/tmpfiles/" lockedPrjs = {} -# 缓存 influxdb 查询结果提高性能 -influxdb_cache = {} - if not os.path.exists(inpDir): os.mkdir(inpDir) @@ -59,6 +46,19 @@ if not os.path.exists(tmpDir): app = FastAPI() +# 将 Query的信息 序列号到 redis/json, 默认不支持datetime,需要自定义 +# 自定义序列化函数 +def default_encoder(obj): + if isinstance(obj, datetime): + return {"__datetime__": obj.isoformat()} + raise TypeError("Type not serializable") + +# 自定义反序列化函数 +def object_hook(dct): + if "__datetime__" in dct: + return datetime.fromisoformat(dct["__datetime__"]) + return dct + # 初始化 Redis 连接 # 用redis 限制并发访u redis_client = redis.Redis(host="localhost", port=6379, db=0) @@ -2152,15 +2152,19 @@ async def fastapi_query_all_records_by_date(querydate: str) -> dict[str, list]: @app.get("/queryallrecordsbydateproperty/") async def fastapi_query_all_records_by_date_property(querydate: str, querytype: str, property: str) -> list[dict]: # 缓存查询结果提高性能 - global influxdb_cache + global redis_client cache_key = f"{querydate}_{querytype}_{property}" - if influxdb_cache.get(cache_key) is not None: - return influxdb_cache.get(cache_key) + data = redis_client.get(cache_key) + if data: + # 使用自定义的反序列化函数 + loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + return loaded_dict - result = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property, client=influx_client) - influxdb_cache[cache_key] = result + result_dict = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property, client=influx_client) + packed = msgpack.packb(result_dict, default=default_encoder) + redis_client.set(cache_key, packed) - return result + return result_dict # def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list: @@ -2199,17 +2203,26 @@ async def fastapi_query_scada_data_by_device_id_and_date(ids: str, querydate: st @app.get("/queryallscadarecordsbydate/") async def fastapi_query_all_scada_records_by_date(querydate: str): # 缓存查询结果提高性能 - global influxdb_cache + global redis_client cache_key = f"{querydate}" - if influxdb_cache.get(cache_key) is not None: - return influxdb_cache.get(cache_key) + data = redis_client.get(cache_key) + if data: + # 使用自定义的反序列化函数 + loaded_dict = msgpack.unpackb(data, object_hook=object_hook) + return loaded_dict - result = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate, client=influx_client) - influxdb_cache[cache_key] = result + result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate, client=influx_client) + packed = msgpack.packb(result_dict, default=default_encoder) + redis_client.set(cache_key, packed) - return result + return result_dict +@app.get("/clearredis/") +async def fastapi_clear_redis(): + redis_client.flushall() + return True + @app.get("/queryinfluxdbbuckets/") async def fastapi_query_influxdb_buckets(): return influxdb_api.query_buckets() diff --git a/startfastapiserver.bat b/startfastapiserver.bat index 8c91522..07438d6 100644 --- a/startfastapiserver.bat +++ b/startfastapiserver.bat @@ -3,4 +3,5 @@ REM cd "f:\DEV\GitHub\TJWaterServer" REM call startpg.bat cd C:\SourceCode\Server -uvicorn main:app --host 0.0.0.0 --port 80 --reload +REM uvicorn main:app --host 0.0.0.0 --port 80 --reload +uvicorn main:app --host 0.0.0.0 --port 80