Files

84 lines
2.8 KiB
Python

import requests
from datetime import datetime
import pytz
def convert_timestamp_to_beijing_time(timestamp):
# 将毫秒级时间戳转换为秒级时间戳
timestamp_seconds = timestamp / 1000
# 将时间戳转换为datetime对象
utc_time = datetime.fromtimestamp(timestamp_seconds)
# 设定UTC时区
utc_timezone = pytz.timezone("UTC")
# 转换为北京时间
beijing_timezone = pytz.timezone("Asia/Shanghai")
beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone)
return beijing_time
def conver_beingtime_to_ucttime(timestr: str):
beijing_time = datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S")
utc_time = beijing_time.astimezone(pytz.utc)
str_utc = utc_time.strftime("%Y-%m-%dT%H:%M:%SZ")
# print(str_utc)
return str_utc
def get_hist_data(ids, begin_date, end_date) -> dict[str, dict[datetime, float]]:
# 数据接口的地址
url = "http://183.64.62.100:9057/loong/api/curves/data"
# 设置 GET 请求的参数
params = {"ids": ids, "beginDate": begin_date, "endDate": end_date}
lst_data = {}
try:
# 发送 GET 请求获取数据
response = requests.get(url, params=params)
# 检查响应状态码,200 表示请求成功
if response.status_code == 200:
# 解析响应的 JSON 数据
data = response.json()
# 这里可以对获取到的数据进行进一步处理
# 打印 'mpointId' 和 'mpointName'
for item in data["items"]:
# print("mpointId:", item['mpointId'])
# print("mpointName:", item['mpointName'])
# 打印 'dataDate' 和 'dataValue'
data_seriers = {}
for item_data in item["data"]:
# print("dataDate:", item_data['dataDate'])
# 将时间戳转换为北京时间
beijing_time = convert_timestamp_to_beijing_time(
item_data["dataDate"]
)
print(
"dataDate (Beijing Time):",
beijing_time.strftime("%Y-%m-%d %H:%M:%S"),
)
print("dataValue:", item_data["dataValue"])
print() # 打印空行分隔不同条目
r = float(item_data["dataValue"])
data_seriers[beijing_time] = r
lst_data[item["mpointId"]] = data_seriers
return lst_data
else:
# 如果请求不成功,打印错误信息
print("请求失败,状态码:", response.status_code)
except Exception as e:
# 捕获异常
print("发生异常:", e)
# 使用示例
# get_hist_data(ids='2498,2500',
# begin_date='2024-03-31T16:00:00Z',
# end_date='2024-04-01T16:00:00Z')