重构现代化 FastAPI 后端项目框架

This commit is contained in:
2026-01-21 16:50:57 +08:00
parent 9e06e68a15
commit c56f2fd1db
352 changed files with 176 additions and 70 deletions

167
scripts/get_data.py Normal file
View File

@@ -0,0 +1,167 @@
import requests
from datetime import datetime
import pytz
from typing import List, Dict, Union, Optional
import csv
# get_data 是用来获取 历史数据,也就是非实时数据的接口
# get_realtime 是用来获取 实时数据
def convert_timestamp_to_beijing_time(timestamp: Union[int, float]) -> datetime:
# 将毫秒级时间戳转换为秒级时间戳
timestamp_seconds = timestamp / 1000
# 将时间戳转换为datetime对象
utc_time = datetime.utcfromtimestamp(timestamp_seconds)
# 设定UTC时区
utc_timezone = pytz.timezone('UTC')
# 转换为北京时间
beijing_timezone = pytz.timezone('Asia/Shanghai')
beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone)
return beijing_time
def beijing_time_to_utc(beijing_time_str: str) -> str:
# 定义北京时区
beijing_timezone = pytz.timezone('Asia/Shanghai')
# 将字符串转换为datetime对象
beijing_time = datetime.strptime(beijing_time_str, '%Y-%m-%d %H:%M:%S')
# 本地化时间对象
beijing_time = beijing_timezone.localize(beijing_time)
# 转换为UTC时间
utc_time = beijing_time.astimezone(pytz.utc)
# 转换为ISO 8601格式的字符串
return utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> List[Dict[str, Union[str, datetime, int, float]]]:
# def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> None:
# 转换输入的北京时间为UTC时间
begin_date_utc = beijing_time_to_utc(begin_date)
end_date_utc = beijing_time_to_utc(end_date)
# 数据接口的地址
url = 'http://183.64.62.100:9057/loong/api/curves/data'
# url = 'http://10.101.15.16:9000/loong/api/curves/data'
# url_path = 'http://10.101.15.16:9000/loong' # 内网
# 设置 GET 请求的参数
params = {
'ids': ids,
'beginDate': begin_date_utc,
'endDate': end_date_utc,
'downsample': downsample
}
history_data_list =[]
try:
# 发送 GET 请求获取数据
response = requests.get(url, params=params)
# 检查响应状态码200 表示请求成功
if response.status_code == 200:
# 解析响应的 JSON 数据
data = response.json()
# 这里可以对获取到的数据进行进一步处理
# 打印 'mpointId' 和 'mpointName'
for item in data['items']:
mpoint_id = str(item['mpointId'])
mpoint_name = item['mpointName']
# print("mpointId:", item['mpointId'])
# print("mpointName:", item['mpointName'])
# 打印 'dataDate' 和 'dataValue'
for item_data in item['data']:
# 将时间戳转换为北京时间
beijing_time = convert_timestamp_to_beijing_time(item_data['dataDate'])
data_value = item_data['dataValue']
# 创建一个字典存储每条数据
data_dict = {
'time': beijing_time,
'device_ID': str(mpoint_id),
'description': mpoint_name,
# 'dataDate (Beijing Time)': beijing_time.strftime('%Y-%m-%d %H:%M:%S'),
'monitored_value': data_value # 保留原有类型
}
history_data_list.append(data_dict)
else:
# 如果请求不成功,打印错误信息
print("请求失败,状态码:", response.status_code)
except Exception as e:
# 捕获异常
print("发生异常:", e)
return history_data_list
# 使用示例
# data_list = get_history_data(ids='9572',
# begin_date='2025-02-08 06:00:00',
# end_date='2025-02-08 12:00:00',
# downsample='1m')
#
# # 打印数据列表
# for data in data_list:
# print(data)
# # 定义 CSV 文件的路径
# csv_file_path = './influxdb_data_4984.csv'
# # 将数据写入 CSV 文件
# # with open(csv_file_path, mode='w', newline='') as file:
# # writer = csv.writer(file)
# #
# # # 写入表头
# # writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime'])
# #
# # # 写入数据
# # for data in data_list:
# # measurement = data['mpointName']
# # mpointId = data['mpointId']
# # date = data['datetime'].strftime('%Y-%m-%d')
# # dataValue = data['dataValue']
# # datetime_str = data['datetime']
# #
# # # 写入一行
# # writer.writerow([measurement, mpointId, date, dataValue, datetime_str])
# #
# #
# # print(f"数据已保存到 {csv_file_path}")
#
# filtered_csv_file_path = './filtered_influxdb_data_4984.csv'
# #
# # # # 读取并筛选数据
# data_list1 = []
#
# with open(csv_file_path, mode='r') as file:
# csv_reader = csv.DictReader(file)
# for row in csv_reader:
# # 将 datetime 列解析为 datetime 对象
# datetime_value = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S%z')
#
# # 只保留时间为 15 分钟倍数的行
# if datetime_value.minute % 15 == 0:
# data_list1.append(row)
#
# # 将筛选后的数据写入新的 CSV 文件
# with open(filtered_csv_file_path, mode='w', newline='') as file:
# writer = csv.writer(file)
#
# # 写入表头
# writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime'])
#
# # 写入筛选后的数据
# for data in data_list1:
# writer.writerow([data['measurement'], data['mpointId'], data['date'], data['dataValue'], data['datetime']])
#
# print(f"筛选后的数据已保存到 {filtered_csv_file_path}")