This commit is contained in:
DingZQ
2025-03-24 21:33:00 +08:00
parent eedd04549d
commit 0f18355a9c
4 changed files with 2889 additions and 9 deletions

24
all_auto_task.py Normal file
View File

@@ -0,0 +1,24 @@
import auto_realtime
import auto_store_non_realtime_SCADA_data
import asyncio
import influxdb_api
# 为了让多个任务并发运行,我们可以用 asyncio.to_thread 分别启动它们
async def main():
task1 = asyncio.to_thread(auto_realtime.realtime_task)
task2 = asyncio.to_thread(auto_store_non_realtime_SCADA_data.store_non_realtime_SCADA_data_task)
await asyncio.gather(task1, task2)
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
influxdb_api.query_pg_scada_info_realtime('bb')
influxdb_api.query_pg_scada_info_non_realtime('bb')
# 用 asyncio 并发启动两个任务
asyncio.run(main())

View File

@@ -47,7 +47,6 @@ def get_next_15minute_time() -> str:
if next_15minute == 60:
next_15minute = 0
now = now + timedelta(hours=1)
next_time = now.replace(minute=next_15minute, second=0, microsecond=0)
return next_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
@@ -62,16 +61,13 @@ def run_simulation_job() -> None:
current_time = datetime.now()
if current_time.minute % 15 == 0:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.")
# 计算前获取scada_info中的信息按照设定的方法修改pg数据库
simulation.query_corresponding_element_id_and_query_id("bb")
simulation.query_corresponding_pattern_id_and_query_id('bb')
region_result = simulation.query_non_realtime_region('bb')
globals.source_outflow_region_id = simulation.get_source_outflow_region_id('bb', region_result)
globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id('bb', region_result)
globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns('bb')
globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns('bb', region_result)
globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns('bb',
globals.source_outflow_region_id,
@@ -97,7 +93,6 @@ def realtime_task() -> None:
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
# 使用 .at(":00") 指定在每分钟的第0秒执行
schedule.every(1).minute.at(":00").do(store_realtime_SCADA_data_job)
# 每15分钟执行一次run_simulation_job
@@ -114,7 +109,6 @@ if __name__ == "__main__":
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_realtime('bb')
# 自动执行

View File

@@ -61,7 +61,6 @@ def store_non_realtime_SCADA_data_job() -> None:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping store non realtime SCADA data task.")
# 2025/02/06
def store_non_realtime_SCADA_data_task() -> None:
"""
@@ -76,7 +75,6 @@ def store_non_realtime_SCADA_data_task() -> None:
try:
# 每分钟检查一次执行store_non_realtime_SCADA_data_job
schedule.every(1).minute.at(":00").do(store_non_realtime_SCADA_data_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
@@ -92,7 +90,6 @@ if __name__ == "__main__":
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_non_realtime('bb')
# 自动执行

2865
influxdb_api(2).py Normal file

File diff suppressed because it is too large Load Diff