import requests from datetime import datetime import pytz from typing import List, Dict, Union, Tuple def convert_to_beijing_time(utc_time_str): # 解析UTC时间字符串为datetime对象 utc_time = datetime.strptime(utc_time_str, '%Y-%m-%dT%H:%M:%SZ') # 设定UTC时区 utc_timezone = pytz.timezone('UTC') # 转换为北京时间 beijing_timezone = pytz.timezone('Asia/Shanghai') beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone) return beijing_time def get_realValue(ids) -> List[Dict[str, Union[str, datetime, int, float]]]: # 数据接口的地址 url = 'http://183.64.62.100:9057/loong/api/mpoints/realValue' # url = 'http://10.101.15.16:9000/loong/api/mpoints/realValue' # 内网 # 设置GET请求的参数 params = { 'ids': ids } # 创建一个字典来存储数据 data_list = [] try: # 发送GET请求获取数据 response = requests.get(url, params=params) # 检查响应状态码,200表示请求成功 if response.status_code == 200: # 解析响应的JSON数据 data = response.json() # 只打印'id'、'datadt'和'realValue'数据 for realValue in data: # print("id:", realValue['id']) # print("mpointName:",realValue['mpointName']) # print("datadt:", realValue['datadt']) # 转换datadt字段为北京时间 beijing_time = convert_to_beijing_time(realValue['datadt']) # print("datadt (Beijing Time):", beijing_time.strftime('%Y-%m-%d %H:%M:%S')) # print("realValue:", realValue['realValue']) # print() # 打印空行分隔不同条目 # 将数据添加到字典中,值为一个字典,包含其他需要的字段 data_list.append({ 'device_ID': realValue['id'], 'description': realValue['mpointName'], 'time': beijing_time.strftime('%Y-%m-%d %H:%M:%S'), 'monitored_value': realValue['realValue'] }) else: # 如果请求不成功,打印错误信息 print("请求失败,状态码:", response.status_code) except Exception as e: # 捕获异常 print("发生异常:", e) return data_list # 使用示例 # data_list = get_realValue(ids='2498,2500') # print(data_list)