from logging.handlers import TimedRotatingFileHandler import influxdb_api import os import logging import globals from datetime import datetime, timedelta, timezone import schedule import time import shutil from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi import simulation import influxdb_info def setup_logger(): # 创建日志目录 log_dir = "logs" os.makedirs(log_dir, exist_ok=True) # 配置基础日志格式 log_format = "%(asctime)s - %(levelname)s - %(message)s" formatter = logging.Formatter(log_format) # 创建主 Logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # 全局日志级别 # --- 1. 按日期分割的日志文件 Handler --- log_file = os.path.join(log_dir, "simulation.log") file_handler = TimedRotatingFileHandler( filename=log_file, when="midnight", # 每天午夜轮转 interval=1, backupCount=7, encoding="utf-8" ) file_handler.suffix = "simulation-%Y-%m-%d.log" # 文件名格式 file_handler.setFormatter(formatter) file_handler.setLevel(logging.DEBUG) # 文件记录所有级别日志 # --- 2. 控制台实时输出 Handler --- console_handler = logging.StreamHandler() # 默认输出到 sys.stderr (控制台) console_handler.setFormatter(formatter) console_handler.setLevel(logging.INFO) # 控制台仅显示 INFO 及以上级别 # 将 Handler 添加到 Logger logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger = setup_logger() # 2025/02/01 def get_next_time() -> str: """ 获取下一个1分钟时间点,返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00' :return: 返回字符串格式的时间,表示下一个1分钟的时间点 """ # 获取当前时间,并设定为北京时间 now = datetime.now() # now 类型为 datetime,表示当前本地时间 # 获取当前的分钟,并且将秒和微秒置为零 current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime,时间的秒和微秒部分被清除 return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') # 2025/02/06 def store_realtime_SCADA_data_job() -> None: """ 定义的任务1,每分钟执行1次,每次执行时,更新get_real_value_time并调用store_realtime_SCADA_data_to_influxdb函数 :return: None """ # 获取当前时间并更新get_real_value_time,转换为字符串格式 get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str,格式为'2025-02-01T18:45:00+08:00' # 调用函数执行任务 influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time) logger.info('{} -- Successfully store realtime SCADA data.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) # 2025/02/06 def get_next_15minute_time() -> str: """ 获取下一个15分钟的时间点,返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00' :return: 返回字符串格式的时间,表示下一个15分钟执行时间点 """ now = datetime.now() # 向上舍入到下一个15分钟 next_15minute = (now.minute // 15 + 1) * 15 - 15 if next_15minute == 60: next_15minute = 0 now = now + timedelta(hours=1) next_time = now.replace(minute=next_15minute, second=0, microsecond=0) return next_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') # 2025/02/07 def run_simulation_job() -> None: """ 定义的任务3,每15分钟执行一次在store_realtime_SCADA_data_to_influxdb之后执行run_simulation。 :return: None """ # 获取当前时间,并检查是否是整点15分钟 current_time = datetime.now() if current_time.minute % 15 == 0: print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.") # 计算前,获取scada_info中的信息,按照设定的方法修改pg数据库 simulation.query_corresponding_element_id_and_query_id("bb") simulation.query_corresponding_pattern_id_and_query_id('bb') region_result = simulation.query_non_realtime_region('bb') globals.source_outflow_region_id = simulation.get_source_outflow_region_id('bb', region_result) globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id('bb', region_result) globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns('bb') globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns('bb', region_result) globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns('bb', globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id) modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点 # print(modify_pattern_start_time) simulation.run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time=modify_pattern_start_time) logger.info('{} -- Successfully run simulation and store realtime simulation result.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) else: logger.info(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.") # 2025/02/06 def realtime_task() -> None: """ 定时执行任务1和,使用schedule库每1分钟执行一次store_realtime_SCADA_data_job函数。 该任务会一直运行,定期调用store_realtime_SCADA_data_job获取SCADA数据。 :return: """ # 等待到整分对齐 now = datetime.now() wait_seconds = 60 - now.second time.sleep(wait_seconds) # 使用 .at(":00") 指定在每分钟的第0秒执行 schedule.every(1).minute.at(":00").do(store_realtime_SCADA_data_job) # 每15分钟执行一次run_simulation_job schedule.every(1).minute.at(":00").do(run_simulation_job) # 持续执行任务,检查是否有待执行的任务 while True: schedule.run_pending() # 执行所有待处理的定时任务 time.sleep(1) # 暂停1秒,避免过于频繁的任务检查 if __name__ == "__main__": url = influxdb_info.url token = influxdb_info.token org_name = influxdb_info.org client = InfluxDBClient(url=url, token=token) # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 influxdb_api.query_pg_scada_info_realtime('bb') # 自动执行 realtime_task()