import requests from datetime import datetime import pytz def convert_timestamp_to_beijing_time(timestamp): # 将毫秒级时间戳转换为秒级时间戳 timestamp_seconds = timestamp / 1000 # 将时间戳转换为datetime对象 utc_time = datetime.fromtimestamp(timestamp_seconds) # 设定UTC时区 utc_timezone = pytz.timezone("UTC") # 转换为北京时间 beijing_timezone = pytz.timezone("Asia/Shanghai") beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone) return beijing_time def conver_beingtime_to_ucttime(timestr: str): beijing_time = datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S") utc_time = beijing_time.astimezone(pytz.utc) str_utc = utc_time.strftime("%Y-%m-%dT%H:%M:%SZ") # print(str_utc) return str_utc def get_hist_data(ids, begin_date, end_date) -> dict[str, dict[datetime, float]]: # 数据接口的地址 url = "http://183.64.62.100:9057/loong/api/curves/data" # 设置 GET 请求的参数 params = {"ids": ids, "beginDate": begin_date, "endDate": end_date} lst_data = {} try: # 发送 GET 请求获取数据 response = requests.get(url, params=params) # 检查响应状态码,200 表示请求成功 if response.status_code == 200: # 解析响应的 JSON 数据 data = response.json() # 这里可以对获取到的数据进行进一步处理 # 打印 'mpointId' 和 'mpointName' for item in data["items"]: # print("mpointId:", item['mpointId']) # print("mpointName:", item['mpointName']) # 打印 'dataDate' 和 'dataValue' data_seriers = {} for item_data in item["data"]: # print("dataDate:", item_data['dataDate']) # 将时间戳转换为北京时间 beijing_time = convert_timestamp_to_beijing_time( item_data["dataDate"] ) print( "dataDate (Beijing Time):", beijing_time.strftime("%Y-%m-%d %H:%M:%S"), ) print("dataValue:", item_data["dataValue"]) print() # 打印空行分隔不同条目 r = float(item_data["dataValue"]) data_seriers[beijing_time] = r lst_data[item["mpointId"]] = data_seriers return lst_data else: # 如果请求不成功,打印错误信息 print("请求失败,状态码:", response.status_code) except Exception as e: # 捕获异常 print("发生异常:", e) # 使用示例 # get_hist_data(ids='2498,2500', # begin_date='2024-03-31T16:00:00Z', # end_date='2024-04-01T16:00:00Z')