import requests from datetime import datetime import pytz def convert_timestamp_to_beijing_time(timestamp): # 将毫秒级时间戳转换为秒级时间戳 timestamp_seconds = timestamp / 1000 # 将时间戳转换为datetime对象 utc_time = datetime.utcfromtimestamp(timestamp_seconds) # 设定UTC时区 utc_timezone = pytz.timezone('UTC') # 转换为北京时间 beijing_timezone = pytz.timezone('Asia/Shanghai') beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone) return beijing_time def conver_beingtime_to_ucttime(timestr:str): beijing_time=datetime.strptime(timestr,'%Y-%m-%d %H:%M:%S') utc_time=beijing_time.astimezone(pytz.utc) str_utc=utc_time.strftime('%Y-%m-%dT%H:%M:%SZ') #print(str_utc) return str_utc def get_hist_data(ids, begin_date,end_date)->dict[str,dict[datetime,float]]: # 数据接口的地址 url = 'http://183.64.62.100:9057/loong/api/curves/data' # 设置 GET 请求的参数 params = { 'ids': ids, 'beginDate': begin_date, 'endDate': end_date } lst_data={} try: # 发送 GET 请求获取数据 response = requests.get(url, params=params) # 检查响应状态码,200 表示请求成功 if response.status_code == 200: # 解析响应的 JSON 数据 data = response.json() # 这里可以对获取到的数据进行进一步处理 # 打印 'mpointId' 和 'mpointName' for item in data['items']: #print("mpointId:", item['mpointId']) #print("mpointName:", item['mpointName']) # 打印 'dataDate' 和 'dataValue' data_seriers={} for item_data in item['data']: # print("dataDate:", item_data['dataDate']) # 将时间戳转换为北京时间 beijing_time = convert_timestamp_to_beijing_time(item_data['dataDate']) print("dataDate (Beijing Time):", beijing_time.strftime('%Y-%m-%d %H:%M:%S')) print("dataValue:", item_data['dataValue']) print() # 打印空行分隔不同条目 r=float(item_data['dataValue']) data_seriers[beijing_time]=r lst_data[item['mpointId']]=data_seriers return lst_data else: # 如果请求不成功,打印错误信息 print("请求失败,状态码:", response.status_code) except Exception as e: # 捕获异常 print("发生异常:", e) # 使用示例 # get_hist_data(ids='2498,2500', # begin_date='2024-03-31T16:00:00Z', # end_date='2024-04-01T16:00:00Z')