import requests from datetime import datetime import pytz from typing import List, Dict, Union, Optional import csv # get_data 是用来获取 历史数据,也就是非实时数据的接口 # get_realtime 是用来获取 实时数据 def convert_timestamp_to_beijing_time(timestamp: Union[int, float]) -> datetime: # 将毫秒级时间戳转换为秒级时间戳 timestamp_seconds = timestamp / 1000 # 将时间戳转换为datetime对象 utc_time = datetime.utcfromtimestamp(timestamp_seconds) # 设定UTC时区 utc_timezone = pytz.timezone('UTC') # 转换为北京时间 beijing_timezone = pytz.timezone('Asia/Shanghai') beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone) return beijing_time def beijing_time_to_utc(beijing_time_str: str) -> str: # 定义北京时区 beijing_timezone = pytz.timezone('Asia/Shanghai') # 将字符串转换为datetime对象 beijing_time = datetime.strptime(beijing_time_str, '%Y-%m-%d %H:%M:%S') # 本地化时间对象 beijing_time = beijing_timezone.localize(beijing_time) # 转换为UTC时间 utc_time = beijing_time.astimezone(pytz.utc) # 转换为ISO 8601格式的字符串 return utc_time.strftime('%Y-%m-%dT%H:%M:%SZ') def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> List[Dict[str, Union[str, datetime, int, float]]]: # def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> None: # 转换输入的北京时间为UTC时间 begin_date_utc = beijing_time_to_utc(begin_date) end_date_utc = beijing_time_to_utc(end_date) # 数据接口的地址 url = 'http://183.64.62.100:9057/loong/api/curves/data' # url = 'http://10.101.15.16:9000/loong/api/curves/data' # url_path = 'http://10.101.15.16:9000/loong' # 内网 # 设置 GET 请求的参数 params = { 'ids': ids, 'beginDate': begin_date_utc, 'endDate': end_date_utc, 'downsample': downsample } history_data_list =[] try: # 发送 GET 请求获取数据 response = requests.get(url, params=params) # 检查响应状态码,200 表示请求成功 if response.status_code == 200: # 解析响应的 JSON 数据 data = response.json() # 这里可以对获取到的数据进行进一步处理 # 打印 'mpointId' 和 'mpointName' for item in data['items']: mpoint_id = str(item['mpointId']) mpoint_name = item['mpointName'] # print("mpointId:", item['mpointId']) # print("mpointName:", item['mpointName']) # 打印 'dataDate' 和 'dataValue' for item_data in item['data']: # 将时间戳转换为北京时间 beijing_time = convert_timestamp_to_beijing_time(item_data['dataDate']) data_value = item_data['dataValue'] # 创建一个字典存储每条数据 data_dict = { 'time': beijing_time, 'device_ID': str(mpoint_id), 'description': mpoint_name, # 'dataDate (Beijing Time)': beijing_time.strftime('%Y-%m-%d %H:%M:%S'), 'monitored_value': data_value # 保留原有类型 } history_data_list.append(data_dict) else: # 如果请求不成功,打印错误信息 print("请求失败,状态码:", response.status_code) except Exception as e: # 捕获异常 print("发生异常:", e) return history_data_list # 使用示例 # data_list = get_history_data(ids='9572', # begin_date='2025-02-08 06:00:00', # end_date='2025-02-08 12:00:00', # downsample='1m') # # # 打印数据列表 # for data in data_list: # print(data) # # 定义 CSV 文件的路径 # csv_file_path = './influxdb_data_4984.csv' # # 将数据写入 CSV 文件 # # with open(csv_file_path, mode='w', newline='') as file: # # writer = csv.writer(file) # # # # # 写入表头 # # writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime']) # # # # # 写入数据 # # for data in data_list: # # measurement = data['mpointName'] # # mpointId = data['mpointId'] # # date = data['datetime'].strftime('%Y-%m-%d') # # dataValue = data['dataValue'] # # datetime_str = data['datetime'] # # # # # 写入一行 # # writer.writerow([measurement, mpointId, date, dataValue, datetime_str]) # # # # # # print(f"数据已保存到 {csv_file_path}") # # filtered_csv_file_path = './filtered_influxdb_data_4984.csv' # # # # # # 读取并筛选数据 # data_list1 = [] # # with open(csv_file_path, mode='r') as file: # csv_reader = csv.DictReader(file) # for row in csv_reader: # # 将 datetime 列解析为 datetime 对象 # datetime_value = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S%z') # # # 只保留时间为 15 分钟倍数的行 # if datetime_value.minute % 15 == 0: # data_list1.append(row) # # # 将筛选后的数据写入新的 CSV 文件 # with open(filtered_csv_file_path, mode='w', newline='') as file: # writer = csv.writer(file) # # # 写入表头 # writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime']) # # # 写入筛选后的数据 # for data in data_list1: # writer.writerow([data['measurement'], data['mpointId'], data['date'], data['dataValue'], data['datetime']]) # # print(f"筛选后的数据已保存到 {filtered_csv_file_path}")