Merge branch 'dingsu/shadell2' into TencentServer2
This commit is contained in:
74
main.py
74
main.py
@@ -1,7 +1,5 @@
|
||||
import os
|
||||
import io
|
||||
import json
|
||||
import time
|
||||
import asyncio, os, io, json, time, pickle, redis, datetime, logging, threading, uvicorn, multiprocessing, asyncio, shutil, random
|
||||
|
||||
from typing import *
|
||||
from urllib.request import Request
|
||||
from xml.dom import minicompat
|
||||
@@ -13,21 +11,13 @@ from fastapi import FastAPI, File, UploadFile, Response, status, Request, Body,
|
||||
from fastapi.responses import PlainTextResponse
|
||||
from fastapi.middleware.gzip import GZipMiddleware
|
||||
from tjnetwork import *
|
||||
import asyncio
|
||||
import threading
|
||||
import uvicorn
|
||||
from multiprocessing import Value
|
||||
import uvicorn
|
||||
import msgpack
|
||||
from run_simulation import run_simulation, run_simulation_ex
|
||||
from online_Analysis import *
|
||||
import logging
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
import random
|
||||
from datetime import datetime
|
||||
import shutil
|
||||
import logging
|
||||
import redis
|
||||
import datetime
|
||||
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
|
||||
from typing import List, Dict
|
||||
from datetime import datetime, timedelta, timezone
|
||||
@@ -48,9 +38,6 @@ tmpDir = "C:/tmpfiles/"
|
||||
|
||||
lockedPrjs = {}
|
||||
|
||||
# 缓存 influxdb 查询结果提高性能
|
||||
influxdb_cache = {}
|
||||
|
||||
if not os.path.exists(inpDir):
|
||||
os.mkdir(inpDir)
|
||||
|
||||
@@ -59,6 +46,19 @@ if not os.path.exists(tmpDir):
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
# 将 Query的信息 序列号到 redis/json, 默认不支持datetime,需要自定义
|
||||
# 自定义序列化函数
|
||||
def default_encoder(obj):
|
||||
if isinstance(obj, datetime):
|
||||
return {"__datetime__": obj.isoformat()}
|
||||
raise TypeError("Type not serializable")
|
||||
|
||||
# 自定义反序列化函数
|
||||
def object_hook(dct):
|
||||
if "__datetime__" in dct:
|
||||
return datetime.fromisoformat(dct["__datetime__"])
|
||||
return dct
|
||||
|
||||
# 初始化 Redis 连接
|
||||
# 用redis 限制并发访u
|
||||
redis_client = redis.Redis(host="localhost", port=6379, db=0)
|
||||
@@ -2152,15 +2152,19 @@ async def fastapi_query_all_records_by_date(querydate: str) -> dict[str, list]:
|
||||
@app.get("/queryallrecordsbydateproperty/")
|
||||
async def fastapi_query_all_records_by_date_property(querydate: str, querytype: str, property: str) -> list[dict]:
|
||||
# 缓存查询结果提高性能
|
||||
global influxdb_cache
|
||||
global redis_client
|
||||
cache_key = f"{querydate}_{querytype}_{property}"
|
||||
if influxdb_cache.get(cache_key) is not None:
|
||||
return influxdb_cache.get(cache_key)
|
||||
data = redis_client.get(cache_key)
|
||||
if data:
|
||||
# 使用自定义的反序列化函数
|
||||
loaded_dict = msgpack.unpackb(data, object_hook=object_hook)
|
||||
return loaded_dict
|
||||
|
||||
result = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property, client=influx_client)
|
||||
influxdb_cache[cache_key] = result
|
||||
result_dict = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property, client=influx_client)
|
||||
packed = msgpack.packb(result_dict, default=default_encoder)
|
||||
redis_client.set(cache_key, packed)
|
||||
|
||||
return result
|
||||
return result_dict
|
||||
|
||||
|
||||
# def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list:
|
||||
@@ -2199,15 +2203,29 @@ async def fastapi_query_scada_data_by_device_id_and_date(ids: str, querydate: st
|
||||
@app.get("/queryallscadarecordsbydate/")
|
||||
async def fastapi_query_all_scada_records_by_date(querydate: str):
|
||||
# 缓存查询结果提高性能
|
||||
global influxdb_cache
|
||||
global redis_client
|
||||
cache_key = f"{querydate}"
|
||||
if influxdb_cache.get(cache_key) is not None:
|
||||
return influxdb_cache.get(cache_key)
|
||||
data = redis_client.get(cache_key)
|
||||
if data:
|
||||
# 使用自定义的反序列化函数
|
||||
loaded_dict = msgpack.unpackb(data, object_hook=object_hook)
|
||||
return loaded_dict
|
||||
|
||||
result = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate, client=influx_client)
|
||||
influxdb_cache[cache_key] = result
|
||||
result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate, client=influx_client)
|
||||
packed = msgpack.packb(result_dict, default=default_encoder)
|
||||
redis_client.set(cache_key, packed)
|
||||
|
||||
return result
|
||||
return result_dict
|
||||
|
||||
|
||||
@app.post("/clearredis/")
|
||||
async def fastapi_clear_redis(key: str):
|
||||
redis_client.delete(key)
|
||||
return True
|
||||
|
||||
@app.get("/queryredis/")
|
||||
async def fastapi_query_redis():
|
||||
return redis_client.keys("*")
|
||||
|
||||
|
||||
@app.get("/queryinfluxdbbuckets/")
|
||||
|
||||
@@ -3,4 +3,5 @@ REM cd "f:\DEV\GitHub\TJWaterServer"
|
||||
|
||||
REM call startpg.bat
|
||||
cd C:\SourceCode\Server
|
||||
uvicorn main:app --host 0.0.0.0 --port 80 --reload
|
||||
REM uvicorn main:app --host 0.0.0.0 --port 80 --reload
|
||||
uvicorn main:app --host 0.0.0.0 --port 80
|
||||
|
||||
Reference in New Issue
Block a user