Files
TJWaterServerBinary/auto_store_non_realtime_SCADA_data.py
2025-10-26 08:54:35 +08:00

140 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import influxdb_api
import globals
from datetime import datetime, timedelta, timezone
import schedule
import os
import logging
from logging.handlers import TimedRotatingFileHandler
import time
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
import influxdb_info
import project_info
def setup_logger():
# 创建日志目录
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 配置基础日志格式
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
# 创建主 Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 全局日志级别
# --- 1. 按日期分割的日志文件 Handler ---
log_file = os.path.join(log_dir, "scada.log")
file_handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight", # 每天午夜轮转
interval=1,
backupCount=7,
encoding="utf-8"
)
file_handler.suffix = "scada-%Y-%m-%d.log" # 文件名格式
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO) # 文件记录 INFO 及以上级别
# --- 2. 控制台实时输出 Handler ---
console_handler = logging.StreamHandler() # 默认输出到 sys.stderr (控制台)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO) # 控制台仅显示 INFO 及以上级别
# 将 Handler 添加到 Logger
logger.addHandler(file_handler)
# logger.addHandler(console_handler)
return logger
logger = setup_logger()
# 2025/02/01
def get_next_time() -> str:
"""
获取下一个1分钟时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个1分钟的时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的分钟,并且将秒和微秒置为零
current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime时间的秒和微秒部分被清除
return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/06
def get_next_period_time() -> str:
"""
获取下一个6小时时间点返回格式为字符串'YYYY-MM-DDTHH:00:00+08:00'
:return: 返回字符串格式的时间表示下一个6小时执行时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的小时数并计算下一个6小时时间点
next_period_hour = (now.hour // 6 + 1) * 6 - 6 # next_period_hour 类型为 int表示下一个6小时时间点的小时部分
# 如果计算的小时大于23表示进入第二天调整为00:00
if next_period_hour >= 24:
next_period_hour = 0
now = now + timedelta(days=1) # 如果超过24小时日期增加1天
# 将秒和微秒部分清除构建出下一个6小时点的datetime对象
next_period_time = now.replace(hour=next_period_hour, minute=0, second=0, microsecond=0)
return next_period_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') # 格式化为指定的字符串格式并返回
# 2025/02/06
def store_non_realtime_SCADA_data_job() -> None:
"""
定义的任务2每6小时执行一次在0点、6点、12点、18点执行执行时更新get_history_data_end_time并调用store_non_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间
current_time = datetime.now()
# 只在0点、6点、12点、18点执行任务
# if current_time.hour % 6 == 0 and current_time.minute == 0:
if current_time.minute % 10 == 0:
logger.info(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start store non realtime SCADA data task.")
# 获取下一个6小时的时间点并更新get_history_data_end_time
get_history_data_end_time: str = get_next_time() # get_history_data_end_time 类型为 str格式为'2025-02-06T12:00:00+08:00'
# print(get_next_time)
# 调用函数执行任务
influxdb_api.store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time)
logger.info('{} -- Successfully store non realtime SCADA data.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
logger.info(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping store non realtime SCADA data task.")
# 2025/02/06
def store_non_realtime_SCADA_data_task() -> None:
"""
定时执行6小时的任务使用schedule库每分钟执行一次store_non_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_non_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
# 等待到整分对齐
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
try:
# 每分钟检查一次执行store_non_realtime_SCADA_data_job
schedule.every(1).minute.at(":00").do(store_non_realtime_SCADA_data_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
pass
except Exception as e:
logger.error(f"Error occurred in store_non_realtime_SCADA_data_task: {e}")
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_non_realtime(project_info.name)
# 自动执行
store_non_realtime_SCADA_data_task()