import influxdb_api import globals from datetime import datetime, timedelta, timezone import schedule import time from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi import influxdb_info # 2025/02/01 def get_next_time() -> str: """ 获取下一个1分钟时间点,返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00' :return: 返回字符串格式的时间,表示下一个1分钟的时间点 """ # 获取当前时间,并设定为北京时间 now = datetime.now() # now 类型为 datetime,表示当前本地时间 # 获取当前的分钟,并且将秒和微秒置为零 current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime,时间的秒和微秒部分被清除 return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') # 2025/02/06 def store_realtime_SCADA_data_job() -> None: """ 定义的任务1,每分钟执行1次,每次执行时,更新get_real_value_time并调用store_realtime_SCADA_data_to_influxdb函数 :return: None """ # 获取当前时间并更新get_real_value_time,转换为字符串格式 get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str,格式为'2025-02-01T18:45:00+08:00' # 调用函数执行任务 influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time) print('{} -- Successfully store realtime SCADA data.'.format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 2025/02/06 def store_realtime_SCADA_data_task() -> None: """ 定时执行任务1和,使用schedule库每1分钟执行一次store_realtime_SCADA_data_job函数。 该任务会一直运行,定期调用store_realtime_SCADA_data_job获取SCADA数据。 :return: """ # 每1分钟执行一次store_realtime_SCADA_data_job schedule.every(1).minute.do(store_realtime_SCADA_data_job) # 持续执行任务,检查是否有待执行的任务 while True: schedule.run_pending() # 执行所有待处理的定时任务 time.sleep(1) # 暂停1秒,避免过于频繁的任务检查 if __name__ == "__main__": url = influxdb_info.url token = influxdb_info.token org_name = influxdb_info.org client = InfluxDBClient(url=url, token=token) # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 influxdb_api.query_pg_scada_info_realtime('bb') # 自动执行 store_realtime_SCADA_data_task()