173 lines
5.8 KiB
Python
173 lines
5.8 KiB
Python
import requests
|
||
from datetime import datetime
|
||
import pytz
|
||
from typing import List, Dict, Union, Optional
|
||
import csv
|
||
|
||
# get_data 是用来获取 历史数据,也就是非实时数据的接口
|
||
# get_realtime 是用来获取 实时数据
|
||
|
||
|
||
def convert_timestamp_to_beijing_time(timestamp: Union[int, float]) -> datetime:
|
||
# 将毫秒级时间戳转换为秒级时间戳
|
||
timestamp_seconds = timestamp / 1000
|
||
|
||
# 将时间戳转换为datetime对象
|
||
utc_time = datetime.fromtimestamp(timestamp_seconds)
|
||
|
||
# 设定UTC时区
|
||
utc_timezone = pytz.timezone("UTC")
|
||
|
||
# 转换为北京时间
|
||
beijing_timezone = pytz.timezone("Asia/Shanghai")
|
||
beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone)
|
||
|
||
return beijing_time
|
||
|
||
|
||
def beijing_time_to_utc(beijing_time_str: str) -> str:
|
||
# 定义北京时区
|
||
beijing_timezone = pytz.timezone("Asia/Shanghai")
|
||
|
||
# 将字符串转换为datetime对象
|
||
beijing_time = datetime.strptime(beijing_time_str, "%Y-%m-%d %H:%M:%S")
|
||
|
||
# 本地化时间对象
|
||
beijing_time = beijing_timezone.localize(beijing_time)
|
||
|
||
# 转换为UTC时间
|
||
utc_time = beijing_time.astimezone(pytz.utc)
|
||
|
||
# 转换为ISO 8601格式的字符串
|
||
return utc_time.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||
|
||
|
||
def get_history_data(
|
||
ids: str, begin_date: str, end_date: str, downsample: Optional[str]
|
||
) -> List[Dict[str, Union[str, datetime, int, float]]]:
|
||
# def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> None:
|
||
# 转换输入的北京时间为UTC时间
|
||
begin_date_utc = beijing_time_to_utc(begin_date)
|
||
end_date_utc = beijing_time_to_utc(end_date)
|
||
|
||
# 数据接口的地址
|
||
url = "http://183.64.62.100:9057/loong/api/curves/data"
|
||
# url = 'http://10.101.15.16:9000/loong/api/curves/data'
|
||
# url_path = 'http://10.101.15.16:9000/loong' # 内网
|
||
|
||
# 设置 GET 请求的参数
|
||
params = {
|
||
"ids": ids,
|
||
"beginDate": begin_date_utc,
|
||
"endDate": end_date_utc,
|
||
"downsample": downsample,
|
||
}
|
||
|
||
history_data_list = []
|
||
|
||
try:
|
||
# 发送 GET 请求获取数据
|
||
response = requests.get(url, params=params)
|
||
|
||
# 检查响应状态码,200 表示请求成功
|
||
if response.status_code == 200:
|
||
# 解析响应的 JSON 数据
|
||
data = response.json()
|
||
# 这里可以对获取到的数据进行进一步处理
|
||
|
||
# 打印 'mpointId' 和 'mpointName'
|
||
for item in data["items"]:
|
||
mpoint_id = str(item["mpointId"])
|
||
mpoint_name = item["mpointName"]
|
||
# print("mpointId:", item['mpointId'])
|
||
# print("mpointName:", item['mpointName'])
|
||
|
||
# 打印 'dataDate' 和 'dataValue'
|
||
for item_data in item["data"]:
|
||
# 将时间戳转换为北京时间
|
||
beijing_time = convert_timestamp_to_beijing_time(
|
||
item_data["dataDate"]
|
||
)
|
||
data_value = item_data["dataValue"]
|
||
# 创建一个字典存储每条数据
|
||
data_dict = {
|
||
"time": beijing_time,
|
||
"device_ID": str(mpoint_id),
|
||
"description": mpoint_name,
|
||
# 'dataDate (Beijing Time)': beijing_time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
"monitored_value": data_value, # 保留原有类型
|
||
}
|
||
|
||
history_data_list.append(data_dict)
|
||
else:
|
||
# 如果请求不成功,打印错误信息
|
||
print("请求失败,状态码:", response.status_code)
|
||
|
||
except Exception as e:
|
||
# 捕获异常
|
||
print("发生异常:", e)
|
||
|
||
return history_data_list
|
||
|
||
|
||
# 使用示例
|
||
# data_list = get_history_data(ids='9572',
|
||
# begin_date='2025-02-08 06:00:00',
|
||
# end_date='2025-02-08 12:00:00',
|
||
# downsample='1m')
|
||
#
|
||
# # 打印数据列表
|
||
# for data in data_list:
|
||
# print(data)
|
||
|
||
# # 定义 CSV 文件的路径
|
||
# csv_file_path = './influxdb_data_4984.csv'
|
||
# # 将数据写入 CSV 文件
|
||
# # with open(csv_file_path, mode='w', newline='') as file:
|
||
# # writer = csv.writer(file)
|
||
# #
|
||
# # # 写入表头
|
||
# # writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime'])
|
||
# #
|
||
# # # 写入数据
|
||
# # for data in data_list:
|
||
# # measurement = data['mpointName']
|
||
# # mpointId = data['mpointId']
|
||
# # date = data['datetime'].strftime('%Y-%m-%d')
|
||
# # dataValue = data['dataValue']
|
||
# # datetime_str = data['datetime']
|
||
# #
|
||
# # # 写入一行
|
||
# # writer.writerow([measurement, mpointId, date, dataValue, datetime_str])
|
||
# #
|
||
# #
|
||
# # print(f"数据已保存到 {csv_file_path}")
|
||
#
|
||
# filtered_csv_file_path = './filtered_influxdb_data_4984.csv'
|
||
# #
|
||
# # # # 读取并筛选数据
|
||
# data_list1 = []
|
||
#
|
||
# with open(csv_file_path, mode='r') as file:
|
||
# csv_reader = csv.DictReader(file)
|
||
# for row in csv_reader:
|
||
# # 将 datetime 列解析为 datetime 对象
|
||
# datetime_value = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S%z')
|
||
#
|
||
# # 只保留时间为 15 分钟倍数的行
|
||
# if datetime_value.minute % 15 == 0:
|
||
# data_list1.append(row)
|
||
#
|
||
# # 将筛选后的数据写入新的 CSV 文件
|
||
# with open(filtered_csv_file_path, mode='w', newline='') as file:
|
||
# writer = csv.writer(file)
|
||
#
|
||
# # 写入表头
|
||
# writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime'])
|
||
#
|
||
# # 写入筛选后的数据
|
||
# for data in data_list1:
|
||
# writer.writerow([data['measurement'], data['mpointId'], data['date'], data['dataValue'], data['datetime']])
|
||
#
|
||
# print(f"筛选后的数据已保存到 {filtered_csv_file_path}")
|