Files
TJWaterServerBinary/auto_realtime.py
2025-10-26 08:54:35 +08:00

157 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from logging.handlers import TimedRotatingFileHandler
import influxdb_api
import os
import logging
import globals
from datetime import datetime, timedelta, timezone
import schedule
import time
import shutil
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
import simulation
import influxdb_info
import project_info
def setup_logger():
# 创建日志目录
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 配置基础日志格式
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
# 创建主 Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 全局日志级别
# --- 1. 按日期分割的日志文件 Handler ---
log_file = os.path.join(log_dir, "simulation.log")
file_handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight", # 每天午夜轮转
interval=1,
backupCount=7,
encoding="utf-8"
)
file_handler.suffix = "simulation-%Y-%m-%d.log" # 文件名格式
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO) # 文件记录所有级别日志
# --- 2. 控制台实时输出 Handler ---
console_handler = logging.StreamHandler() # 默认输出到 sys.stderr (控制台)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO) # 控制台仅显示 INFO 及以上级别
# 将 Handler 添加到 Logger
logger.addHandler(file_handler)
#logger.addHandler(console_handler)
return logger
logger = setup_logger()
# 2025/02/01
def get_next_time() -> str:
"""
获取下一个1分钟时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个1分钟的时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的分钟,并且将秒和微秒置为零
current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime时间的秒和微秒部分被清除
return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/06
def store_realtime_SCADA_data_job() -> None:
"""
定义的任务1每分钟执行1次每次执行时更新get_real_value_time并调用store_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间并更新get_real_value_time转换为字符串格式
get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str格式为'2025-02-01T18:45:00+08:00'
# 调用函数执行任务
influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time)
logger.info('{} -- Successfully store realtime SCADA data.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 2025/02/06
def get_next_15minute_time() -> str:
"""
获取下一个15分钟的时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个15分钟执行时间点
"""
now = datetime.now()
# 向上舍入到下一个15分钟
next_15minute = (now.minute // 15 + 1) * 15 - 15
if next_15minute == 60:
next_15minute = 0
now = now + timedelta(hours=1)
next_time = now.replace(minute=next_15minute, second=0, microsecond=0)
return next_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/07
def run_simulation_job() -> None:
"""
定义的任务3每15分钟执行一次在store_realtime_SCADA_data_to_influxdb之后执行run_simulation。
:return: None
"""
# 获取当前时间并检查是否是整点15分钟
current_time = datetime.now()
if current_time.minute % 15 == 0:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.")
# 计算前获取scada_info中的信息按照设定的方法修改pg数据库
simulation.query_corresponding_element_id_and_query_id(project_info.name)
simulation.query_corresponding_pattern_id_and_query_id(project_info.name)
region_result = simulation.query_non_realtime_region(project_info.name)
globals.source_outflow_region_id = simulation.get_source_outflow_region_id(project_info.name, region_result)
globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result)
globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns(project_info.name)
globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns(project_info.name, region_result)
globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns(project_info.name,
globals.source_outflow_region_id,
globals.realtime_region_pipe_flow_and_demand_id)
modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点
# print(modify_pattern_start_time)
simulation.run_simulation(name=project_info.name, simulation_type="realtime", modify_pattern_start_time=modify_pattern_start_time)
logger.info('{} -- Successfully run simulation and store realtime simulation result.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
logger.info(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.")
# 2025/02/06
def realtime_task() -> None:
"""
定时执行任务1和使用schedule库每1分钟执行一次store_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
# 等待到整分对齐
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
# 使用 .at(":00") 指定在每分钟的第0秒执行
schedule.every(1).minute.at(":00").do(store_realtime_SCADA_data_job)
# 每15分钟执行一次run_simulation_job
schedule.every(1).minute.at(":00").do(run_simulation_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_realtime(project_info.name)
# 自动执行
realtime_task()