Initial commit

This commit is contained in:
DingZQ
2025-10-26 08:54:35 +08:00
commit ae6510ac37
679 changed files with 3963666 additions and 0 deletions

5
.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,5 @@
{
"cSpell.words": [
"Fastapi"
]
}

79974
20250309beibei_v2.inp Normal file

File diff suppressed because it is too large Load Diff

9
AutoPullGitChanges.bat Normal file
View File

@@ -0,0 +1,9 @@
@echo off
:loop
echo 正在执行 git pull...
git pull
echo 等待10分钟...
timeout /t 600 /nobreak >nul
goto loop

14
InfluxDb Information.txt Normal file
View File

@@ -0,0 +1,14 @@
UserName tjwater
Password Tjwater@123456
Organizatio TJWATEORG
Bucket TJWATERBUCKET
API Token : xGDM5RZqRJAuzAGS-otXUdC2NFdY75qJAjRLqAB4p5WcIIAlIUpOpT8_yA16AOHmJWerwQ_08gwb84sy42jnZQ==
influx config create --config-name onboarding `
--host-url "http://localhost:8086" `
--org "TJWATERORG" `
--token "p4Hq6DQ4xI6yA2tZQgo-VGzjWObylyWd4B45vMoiae0XJeNUlL87FdEUU5cJ63O87W7-nAhhGWl_0FGJiL801w==" `
--active

4
ReadMe.txt Normal file
View File

@@ -0,0 +1,4 @@
当前 适配 szh 项目的分支 是 dingsu/szh
Binary 适配的是 代码 中dingsu/szh 的部分
当前只是把 API目录也就是TJNetwork的部分加密了

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

25
all_auto_task.py Normal file
View File

@@ -0,0 +1,25 @@
import auto_realtime
import auto_store_non_realtime_SCADA_data
import asyncio
import influxdb_api
import influxdb_info
import project_info
# 为了让多个任务并发运行,我们可以用 asyncio.to_thread 分别启动它们
async def main():
task1 = asyncio.to_thread(auto_realtime.realtime_task)
task2 = asyncio.to_thread(auto_store_non_realtime_SCADA_data.store_non_realtime_SCADA_data_task)
await asyncio.gather(task1, task2)
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
influxdb_api.query_pg_scada_info_realtime(project_info.name)
influxdb_api.query_pg_scada_info_non_realtime(project_info.name)
# 用 asyncio 并发启动两个任务
asyncio.run(main())

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

115
auto_cache.py Normal file
View File

@@ -0,0 +1,115 @@
import schedule
import time
import datetime
import shutil
import redis
import urllib.request
import influxdb_api
import msgpack
import datetime
# 将 Query的信息 序列号到 redis/json 默认不支持datetime需要自定义
# 自定义序列化函数
# 序列化处理器
def encode_datetime(obj):
"""将datetime转换为可序列化的字典结构"""
if isinstance(obj, datetime.datetime):
return {
'__datetime__': True,
'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f")
}
return obj
# 反序列化处理器
def decode_datetime(obj):
"""将字典还原为datetime对象"""
if '__datetime__' in obj:
return datetime.datetime.strptime(
obj['as_str'], "%Y%m%dT%H:%M:%S.%f"
)
return obj
##########################
# 需要用Python 3.12 来运行才能提高performance
##########################
def queryallrecordsbydate(querydate: str, redis_client: redis.Redis):
cache_key = f"queryallrecordsbydate_{querydate}"
exists = redis_client.exists(cache_key)
if not exists:
nodes_links: tuple = influxdb_api.query_all_records_by_date(query_date=querydate)
redis_client.set(cache_key, msgpack.packb(nodes_links, default=encode_datetime))
def queryallrecordsbydate_by_url(querydate: str):
print(f'queryallrecordsbydate: {querydate}')
try:
response = urllib.request.urlopen(
f"http://localhost/queryallrecordsbydate/?querydate={querydate}"
)
html = response.read().decode("utf-8")
except urllib.error.URLError as e:
print("Error")
def queryallscadarecordsbydate(querydate: str, redis_client: redis.Redis):
cache_key = f"queryallscadarecordsbydate_{querydate}"
exists = redis_client.exists(cache_key)
if not exists:
result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate)
redis_client.set(cache_key, msgpack.packb(result_dict, default=encode_datetime))
def queryallscadarecordsbydate_by_url(querydate: str):
print(f'queryallscadarecordsbydate: {querydate}')
try:
response = urllib.request.urlopen(
f"http://localhost/queryallscadarecordsbydate/?querydate={querydate}"
)
html = response.read().decode("utf-8")
except urllib.error.URLError as e:
print("Error")
def auto_cache_data():
# 初始化 Redis 连接
# 用redis 限制并发访u
redis_client = redis.Redis(host="localhost", port=6379, db=0)
# auto cache data for the last 3 days
today = datetime.date.today()
for i in range(1, 4):
prev_day = today - datetime.timedelta(days=i)
str_prev_day = prev_day.strftime('%Y-%m-%d')
print(str_prev_day)
queryallrecordsbydate(str_prev_day, redis_client)
queryallscadarecordsbydate(str_prev_day, redis_client)
redis_client.close()
def auto_cache_data_by_url():
# auto cache data for the last 3 days
today = datetime.date.today()
for i in range(1, 4):
prev_day = today - datetime.timedelta(days=i)
str_prev_day = prev_day.strftime('%Y-%m-%d')
print(str_prev_day)
queryallrecordsbydate_by_url(str_prev_day)
queryallscadarecordsbydate_by_url(str_prev_day)
if __name__ == "__main__":
auto_cache_data_by_url()
# auto run in the midnight
schedule.every().day.at("03:00").do(auto_cache_data_by_url)
while True:
schedule.run_pending()
time.sleep(1)

156
auto_realtime.py Normal file
View File

@@ -0,0 +1,156 @@
from logging.handlers import TimedRotatingFileHandler
import influxdb_api
import os
import logging
import globals
from datetime import datetime, timedelta, timezone
import schedule
import time
import shutil
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
import simulation
import influxdb_info
import project_info
def setup_logger():
# 创建日志目录
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 配置基础日志格式
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
# 创建主 Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 全局日志级别
# --- 1. 按日期分割的日志文件 Handler ---
log_file = os.path.join(log_dir, "simulation.log")
file_handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight", # 每天午夜轮转
interval=1,
backupCount=7,
encoding="utf-8"
)
file_handler.suffix = "simulation-%Y-%m-%d.log" # 文件名格式
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO) # 文件记录所有级别日志
# --- 2. 控制台实时输出 Handler ---
console_handler = logging.StreamHandler() # 默认输出到 sys.stderr (控制台)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO) # 控制台仅显示 INFO 及以上级别
# 将 Handler 添加到 Logger
logger.addHandler(file_handler)
#logger.addHandler(console_handler)
return logger
logger = setup_logger()
# 2025/02/01
def get_next_time() -> str:
"""
获取下一个1分钟时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个1分钟的时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的分钟,并且将秒和微秒置为零
current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime时间的秒和微秒部分被清除
return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/06
def store_realtime_SCADA_data_job() -> None:
"""
定义的任务1每分钟执行1次每次执行时更新get_real_value_time并调用store_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间并更新get_real_value_time转换为字符串格式
get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str格式为'2025-02-01T18:45:00+08:00'
# 调用函数执行任务
influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time)
logger.info('{} -- Successfully store realtime SCADA data.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 2025/02/06
def get_next_15minute_time() -> str:
"""
获取下一个15分钟的时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个15分钟执行时间点
"""
now = datetime.now()
# 向上舍入到下一个15分钟
next_15minute = (now.minute // 15 + 1) * 15 - 15
if next_15minute == 60:
next_15minute = 0
now = now + timedelta(hours=1)
next_time = now.replace(minute=next_15minute, second=0, microsecond=0)
return next_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/07
def run_simulation_job() -> None:
"""
定义的任务3每15分钟执行一次在store_realtime_SCADA_data_to_influxdb之后执行run_simulation。
:return: None
"""
# 获取当前时间并检查是否是整点15分钟
current_time = datetime.now()
if current_time.minute % 15 == 0:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.")
# 计算前获取scada_info中的信息按照设定的方法修改pg数据库
simulation.query_corresponding_element_id_and_query_id(project_info.name)
simulation.query_corresponding_pattern_id_and_query_id(project_info.name)
region_result = simulation.query_non_realtime_region(project_info.name)
globals.source_outflow_region_id = simulation.get_source_outflow_region_id(project_info.name, region_result)
globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result)
globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns(project_info.name)
globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns(project_info.name, region_result)
globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns(project_info.name,
globals.source_outflow_region_id,
globals.realtime_region_pipe_flow_and_demand_id)
modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点
# print(modify_pattern_start_time)
simulation.run_simulation(name=project_info.name, simulation_type="realtime", modify_pattern_start_time=modify_pattern_start_time)
logger.info('{} -- Successfully run simulation and store realtime simulation result.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
logger.info(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.")
# 2025/02/06
def realtime_task() -> None:
"""
定时执行任务1和使用schedule库每1分钟执行一次store_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
# 等待到整分对齐
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
# 使用 .at(":00") 指定在每分钟的第0秒执行
schedule.every(1).minute.at(":00").do(store_realtime_SCADA_data_job)
# 每15分钟执行一次run_simulation_job
schedule.every(1).minute.at(":00").do(run_simulation_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_realtime(project_info.name)
# 自动执行
realtime_task()

View File

@@ -0,0 +1,139 @@
import influxdb_api
import globals
from datetime import datetime, timedelta, timezone
import schedule
import os
import logging
from logging.handlers import TimedRotatingFileHandler
import time
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
import influxdb_info
import project_info
def setup_logger():
# 创建日志目录
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 配置基础日志格式
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
# 创建主 Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 全局日志级别
# --- 1. 按日期分割的日志文件 Handler ---
log_file = os.path.join(log_dir, "scada.log")
file_handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight", # 每天午夜轮转
interval=1,
backupCount=7,
encoding="utf-8"
)
file_handler.suffix = "scada-%Y-%m-%d.log" # 文件名格式
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO) # 文件记录 INFO 及以上级别
# --- 2. 控制台实时输出 Handler ---
console_handler = logging.StreamHandler() # 默认输出到 sys.stderr (控制台)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO) # 控制台仅显示 INFO 及以上级别
# 将 Handler 添加到 Logger
logger.addHandler(file_handler)
# logger.addHandler(console_handler)
return logger
logger = setup_logger()
# 2025/02/01
def get_next_time() -> str:
"""
获取下一个1分钟时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个1分钟的时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的分钟,并且将秒和微秒置为零
current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime时间的秒和微秒部分被清除
return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/06
def get_next_period_time() -> str:
"""
获取下一个6小时时间点返回格式为字符串'YYYY-MM-DDTHH:00:00+08:00'
:return: 返回字符串格式的时间表示下一个6小时执行时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的小时数并计算下一个6小时时间点
next_period_hour = (now.hour // 6 + 1) * 6 - 6 # next_period_hour 类型为 int表示下一个6小时时间点的小时部分
# 如果计算的小时大于23表示进入第二天调整为00:00
if next_period_hour >= 24:
next_period_hour = 0
now = now + timedelta(days=1) # 如果超过24小时日期增加1天
# 将秒和微秒部分清除构建出下一个6小时点的datetime对象
next_period_time = now.replace(hour=next_period_hour, minute=0, second=0, microsecond=0)
return next_period_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') # 格式化为指定的字符串格式并返回
# 2025/02/06
def store_non_realtime_SCADA_data_job() -> None:
"""
定义的任务2每6小时执行一次在0点、6点、12点、18点执行执行时更新get_history_data_end_time并调用store_non_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间
current_time = datetime.now()
# 只在0点、6点、12点、18点执行任务
# if current_time.hour % 6 == 0 and current_time.minute == 0:
if current_time.minute % 10 == 0:
logger.info(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start store non realtime SCADA data task.")
# 获取下一个6小时的时间点并更新get_history_data_end_time
get_history_data_end_time: str = get_next_time() # get_history_data_end_time 类型为 str格式为'2025-02-06T12:00:00+08:00'
# print(get_next_time)
# 调用函数执行任务
influxdb_api.store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time)
logger.info('{} -- Successfully store non realtime SCADA data.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
logger.info(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping store non realtime SCADA data task.")
# 2025/02/06
def store_non_realtime_SCADA_data_task() -> None:
"""
定时执行6小时的任务使用schedule库每分钟执行一次store_non_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_non_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
# 等待到整分对齐
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
try:
# 每分钟检查一次执行store_non_realtime_SCADA_data_job
schedule.every(1).minute.at(":00").do(store_non_realtime_SCADA_data_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
pass
except Exception as e:
logger.error(f"Error occurred in store_non_realtime_SCADA_data_task: {e}")
if __name__ == "__main__":
url = influxdb_info.url
token = influxdb_info.token
org_name = influxdb_info.org
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_non_realtime(project_info.name)
# 自动执行
store_non_realtime_SCADA_data_task()

Some files were not shown because too many files have changed in this diff Show More