Files
TJWaterServer/auto_store_non_realtime_SCADA_data.py
2025-02-08 20:11:27 +08:00

82 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import influxdb_api
import globals
from datetime import datetime, timedelta, timezone
import schedule
import time
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
# 2025/02/06
def get_next_period_time() -> str:
"""
获取下一个6小时时间点返回格式为字符串'YYYY-MM-DDTHH:00:00+08:00'
:return: 返回字符串格式的时间表示下一个6小时执行时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的小时数并计算下一个6小时时间点
next_period_hour = (now.hour // 6 + 1) * 6 - 6 # next_period_hour 类型为 int表示下一个6小时时间点的小时部分
# 如果计算的小时大于23表示进入第二天调整为00:00
if next_period_hour >= 24:
next_period_hour = 0
now = now + timedelta(days=1) # 如果超过24小时日期增加1天
# 将秒和微秒部分清除构建出下一个6小时点的datetime对象
next_period_time = now.replace(hour=next_period_hour, minute=0, second=0, microsecond=0)
return next_period_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') # 格式化为指定的字符串格式并返回
# 2025/02/06
def store_non_realtime_SCADA_data_job() -> None:
"""
定义的任务2每6小时执行一次在0点、6点、12点、18点执行执行时更新get_history_data_end_time并调用store_non_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间
current_time = datetime.now()
# 只在0点、6点、12点、18点执行任务
if current_time.hour % 6 == 0 and current_time.minute == 0:
# 获取下一个6小时的时间点并更新get_history_data_end_time
get_history_data_end_time: str = get_next_period_time() # get_history_data_end_time 类型为 str格式为'2025-02-06T12:00:00+08:00'
print(get_history_data_end_time)
# 调用函数执行任务
influxdb_api.store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time)
print('{} -- Successfully store non realtime SCADA data.'.format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 2025/02/06
def store_non_realtime_SCADA_data_task() -> None:
"""
定时执行6小时的任务使用schedule库每分钟执行一次store_non_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_non_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
# 等待到整分对齐
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
try:
# 每分钟检查一次执行store_non_realtime_SCADA_data_job
schedule.every(1).minute.at(":00").do(store_non_realtime_SCADA_data_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
pass
except Exception as e:
print(f"Error occurred in store_non_realtime_SCADA_data_task: {e}")
if __name__ == "__main__":
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token
org_name = "beibei" # 替换为你的Organization名称
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_non_realtime('bb')
# 自动执行
store_non_realtime_SCADA_data_task()