Add more code from WMH

This commit is contained in:
DingZQ
2025-02-08 20:11:27 +08:00
parent f6f37d012b
commit e360540989
6 changed files with 382 additions and 61 deletions

120
auto_realtime.py Normal file
View File

@@ -0,0 +1,120 @@
import influxdb_api
import globals
from datetime import datetime, timedelta, timezone
import schedule
import time
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
import simulation
# 2025/02/01
def get_next_time() -> str:
"""
获取下一个1分钟时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个1分钟的时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的分钟,并且将秒和微秒置为零
current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime时间的秒和微秒部分被清除
return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/06
def store_realtime_SCADA_data_job() -> None:
"""
定义的任务1每分钟执行1次每次执行时更新get_real_value_time并调用store_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间并更新get_real_value_time转换为字符串格式
get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str格式为'2025-02-01T18:45:00+08:00'
# 调用函数执行任务
influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time)
print('{} -- Successfully store realtime SCADA data.'.format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 2025/02/06
def get_next_15minute_time() -> str:
"""
获取下一个15分钟的时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个15分钟执行时间点
"""
now = datetime.now()
# 向上舍入到下一个15分钟
next_15minute = (now.minute // 15 + 1) * 15 - 15
if next_15minute == 60:
next_15minute = 0
now = now + timedelta(hours=1)
next_time = now.replace(minute=next_15minute, second=0, microsecond=0)
return next_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/07
def run_simulation_job() -> None:
"""
定义的任务3每15分钟执行一次在store_realtime_SCADA_data_to_influxdb之后执行run_simulation。
:return: None
"""
# 获取当前时间并检查是否是整点15分钟
current_time = datetime.now()
if current_time.minute % 15 == 0:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.")
# 计算前获取scada_info中的信息按照设定的方法修改pg数据库
simulation.query_corresponding_element_id_and_query_id("bb")
simulation.query_corresponding_pattern_id_and_query_id('bb')
region_result = simulation.query_non_realtime_region('bb')
globals.source_outflow_region_id = simulation.get_source_outflow_region_id('bb', region_result)
globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id('bb', region_result)
globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns('bb')
globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns('bb', region_result)
globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns('bb',
globals.source_outflow_region_id,
globals.realtime_region_pipe_flow_and_demand_id)
modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点
# print(modify_pattern_start_time)
simulation.run_simulation(name='bb', simulation_type="realtime",
modify_pattern_start_time=modify_pattern_start_time)
print('{} -- Successfully run simulation and store realtime simulation result.'.format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.")
# 2025/02/06
def realtime_task() -> None:
"""
定时执行任务1和使用schedule库每1分钟执行一次store_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
# 等待到整分对齐
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
# 使用 .at(":00") 指定在每分钟的第0秒执行
schedule.every(1).minute.at(":00").do(store_realtime_SCADA_data_job)
# 每15分钟执行一次run_simulation_job
schedule.every(1).minute.at(":00").do(run_simulation_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
if __name__ == "__main__":
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token
org_name = "beibei" # 替换为你的Organization名称
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_realtime('bb')
# 自动执行
realtime_task()

View File

@@ -15,7 +15,7 @@ def get_next_period_time() -> str:
# 获取当前时间,并设定为北京时间 # 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间 now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的小时数并计算下一个6小时时间点 # 获取当前的小时数并计算下一个6小时时间点
next_period_hour = (now.hour // 6 + 1) * 6 # next_period_hour 类型为 int表示下一个6小时时间点的小时部分 next_period_hour = (now.hour // 6 + 1) * 6 - 6 # next_period_hour 类型为 int表示下一个6小时时间点的小时部分
# 如果计算的小时大于23表示进入第二天调整为00:00 # 如果计算的小时大于23表示进入第二天调整为00:00
if next_period_hour >= 24: if next_period_hour >= 24:
next_period_hour = 0 next_period_hour = 0
@@ -37,6 +37,7 @@ def store_non_realtime_SCADA_data_job() -> None:
if current_time.hour % 6 == 0 and current_time.minute == 0: if current_time.hour % 6 == 0 and current_time.minute == 0:
# 获取下一个6小时的时间点并更新get_history_data_end_time # 获取下一个6小时的时间点并更新get_history_data_end_time
get_history_data_end_time: str = get_next_period_time() # get_history_data_end_time 类型为 str格式为'2025-02-06T12:00:00+08:00' get_history_data_end_time: str = get_next_period_time() # get_history_data_end_time 类型为 str格式为'2025-02-06T12:00:00+08:00'
print(get_history_data_end_time)
# 调用函数执行任务 # 调用函数执行任务
influxdb_api.store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time) influxdb_api.store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time)
print('{} -- Successfully store non realtime SCADA data.'.format( print('{} -- Successfully store non realtime SCADA data.'.format(
@@ -50,9 +51,13 @@ def store_non_realtime_SCADA_data_task() -> None:
该任务会一直运行定期调用store_non_realtime_SCADA_data_job获取SCADA数据。 该任务会一直运行定期调用store_non_realtime_SCADA_data_job获取SCADA数据。
:return: :return:
""" """
# 等待到整分对齐
now = datetime.now()
wait_seconds = 60 - now.second
time.sleep(wait_seconds)
try: try:
# 每分钟检查一次执行store_non_realtime_SCADA_data_job # 每分钟检查一次执行store_non_realtime_SCADA_data_job
schedule.every(1).minute.do(store_non_realtime_SCADA_data_job) schedule.every(1).minute.at(":00").do(store_non_realtime_SCADA_data_job)
# 持续执行任务,检查是否有待执行的任务 # 持续执行任务,检查是否有待执行的任务
while True: while True:

165
get_data.py Normal file
View File

@@ -0,0 +1,165 @@
import requests
from datetime import datetime
import pytz
from typing import List, Dict, Union, Optional
import csv
def convert_timestamp_to_beijing_time(timestamp: Union[int, float]) -> datetime:
# 将毫秒级时间戳转换为秒级时间戳
timestamp_seconds = timestamp / 1000
# 将时间戳转换为datetime对象
utc_time = datetime.utcfromtimestamp(timestamp_seconds)
# 设定UTC时区
utc_timezone = pytz.timezone('UTC')
# 转换为北京时间
beijing_timezone = pytz.timezone('Asia/Shanghai')
beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone)
return beijing_time
def beijing_time_to_utc(beijing_time_str: str) -> str:
# 定义北京时区
beijing_timezone = pytz.timezone('Asia/Shanghai')
# 将字符串转换为datetime对象
beijing_time = datetime.strptime(beijing_time_str, '%Y-%m-%d %H:%M:%S')
# 本地化时间对象
beijing_time = beijing_timezone.localize(beijing_time)
# 转换为UTC时间
utc_time = beijing_time.astimezone(pytz.utc)
# 转换为ISO 8601格式的字符串
return utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> List[Dict[str, Union[str, datetime, int, float]]]:
# def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> None:
# 转换输入的北京时间为UTC时间
begin_date_utc = beijing_time_to_utc(begin_date)
end_date_utc = beijing_time_to_utc(end_date)
# 数据接口的地址
url = 'http://183.64.62.100:9057/loong/api/curves/data'
# url = 'http://10.101.15.16:9000/loong/api/curves/data'
# url_path = 'http://10.101.15.16:9000/loong' # 内网
# 设置 GET 请求的参数
params = {
'ids': ids,
'beginDate': begin_date_utc,
'endDate': end_date_utc,
'downsample': downsample
}
history_data_list =[]
try:
# 发送 GET 请求获取数据
response = requests.get(url, params=params)
# 检查响应状态码200 表示请求成功
if response.status_code == 200:
# 解析响应的 JSON 数据
data = response.json()
# 这里可以对获取到的数据进行进一步处理
# 打印 'mpointId' 和 'mpointName'
for item in data['items']:
mpoint_id = str(item['mpointId'])
mpoint_name = item['mpointName']
# print("mpointId:", item['mpointId'])
# print("mpointName:", item['mpointName'])
# 打印 'dataDate' 和 'dataValue'
for item_data in item['data']:
# 将时间戳转换为北京时间
beijing_time = convert_timestamp_to_beijing_time(item_data['dataDate'])
data_value = item_data['dataValue']
# 创建一个字典存储每条数据
data_dict = {
'time': beijing_time,
'device_ID': str(mpoint_id),
'description': mpoint_name,
# 'dataDate (Beijing Time)': beijing_time.strftime('%Y-%m-%d %H:%M:%S'),
'monitored_value': data_value # 保留原有类型
}
history_data_list.append(data_dict)
else:
# 如果请求不成功,打印错误信息
print("请求失败,状态码:", response.status_code)
except Exception as e:
# 捕获异常
print("发生异常:", e)
return history_data_list
# 使用示例
# data_list = get_history_data(ids='9572',
# begin_date='2025-02-08 06:00:00',
# end_date='2025-02-08 12:00:00',
# downsample='1m')
#
# # 打印数据列表
# for data in data_list:
# print(data)
# # 定义 CSV 文件的路径
# csv_file_path = './influxdb_data_4984.csv'
# # 将数据写入 CSV 文件
# # with open(csv_file_path, mode='w', newline='') as file:
# # writer = csv.writer(file)
# #
# # # 写入表头
# # writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime'])
# #
# # # 写入数据
# # for data in data_list:
# # measurement = data['mpointName']
# # mpointId = data['mpointId']
# # date = data['datetime'].strftime('%Y-%m-%d')
# # dataValue = data['dataValue']
# # datetime_str = data['datetime']
# #
# # # 写入一行
# # writer.writerow([measurement, mpointId, date, dataValue, datetime_str])
# #
# #
# # print(f"数据已保存到 {csv_file_path}")
#
# filtered_csv_file_path = './filtered_influxdb_data_4984.csv'
# #
# # # # 读取并筛选数据
# data_list1 = []
#
# with open(csv_file_path, mode='r') as file:
# csv_reader = csv.DictReader(file)
# for row in csv_reader:
# # 将 datetime 列解析为 datetime 对象
# datetime_value = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S%z')
#
# # 只保留时间为 15 分钟倍数的行
# if datetime_value.minute % 15 == 0:
# data_list1.append(row)
#
# # 将筛选后的数据写入新的 CSV 文件
# with open(filtered_csv_file_path, mode='w', newline='') as file:
# writer = csv.writer(file)
#
# # 写入表头
# writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime'])
#
# # 写入筛选后的数据
# for data in data_list1:
# writer.writerow([data['measurement'], data['mpointId'], data['date'], data['dataValue'], data['datetime']])
#
# print(f"筛选后的数据已保存到 {filtered_csv_file_path}")

View File

@@ -1,6 +1,7 @@
import requests import requests
from datetime import datetime from datetime import datetime
import pytz import pytz
from typing import List, Dict, Union, Tuple
def convert_to_beijing_time(utc_time_str): def convert_to_beijing_time(utc_time_str):
@@ -17,15 +18,18 @@ def convert_to_beijing_time(utc_time_str):
return beijing_time return beijing_time
def get_realValue(ids)->dict[str,float]: def get_realValue(ids) -> List[Dict[str, Union[str, datetime, int, float]]]:
# 数据接口的地址 # 数据接口的地址
url = 'http://183.64.62.100:9057/loong/api/mpoints/realValue' url = 'http://183.64.62.100:9057/loong/api/mpoints/realValue'
# url = 'http://10.101.15.16:9000/loong/api/mpoints/realValue' # 内网
# 设置GET请求的参数 # 设置GET请求的参数
params = { params = {
'ids': ids 'ids': ids
} }
lst_data={} # 创建一个字典来存储数据
data_list = []
try: try:
# 发送GET请求获取数据 # 发送GET请求获取数据
response = requests.get(url, params=params) response = requests.get(url, params=params)
@@ -37,24 +41,33 @@ def get_realValue(ids)->dict[str,float]:
# 只打印'id'、'datadt'和'realValue'数据 # 只打印'id'、'datadt'和'realValue'数据
for realValue in data: for realValue in data:
#print("id:", realValue['id']) # print("id:", realValue['id'])
#print("mpointName:",realValue['mpointName']) # print("mpointName:",realValue['mpointName'])
# print("datadt:", realValue['datadt']) # print("datadt:", realValue['datadt'])
# 转换datadt字段为北京时间 # 转换datadt字段为北京时间
#beijing_time = convert_to_beijing_time(realValue['datadt']) beijing_time = convert_to_beijing_time(realValue['datadt'])
#print("datadt (Beijing Time):", beijing_time.strftime('%Y-%m-%d %H:%M:%S')) # print("datadt (Beijing Time):", beijing_time.strftime('%Y-%m-%d %H:%M:%S'))
#print("realValue:", realValue['realValue']) # print("realValue:", realValue['realValue'])
#print() # 打印空行分隔不同条目 # print() # 打印空行分隔不同条目
r=float(realValue['realValue']) # 将数据添加到字典中,值为一个字典,包含其他需要的字段
lst_data[str(realValue['id'])]=r data_list.append({
'device_ID': realValue['id'],
'description': realValue['mpointName'],
'time': beijing_time.strftime('%Y-%m-%d %H:%M:%S'),
'monitored_value': realValue['realValue']
})
else: else:
# 如果请求不成功,打印错误信息 # 如果请求不成功,打印错误信息
print("请求失败,状态码:", response.status_code) print("请求失败,状态码:", response.status_code)
return lst_data
except Exception as e: except Exception as e:
# 捕获异常 # 捕获异常
print("发生异常:", e) print("发生异常:", e)
return data_list
# 使用示例 # 使用示例
#get_realValue(ids='2498,2500') # data_list = get_realValue(ids='2498,2500')
# print(data_list)

View File

@@ -1,7 +1,7 @@
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
from typing import List, Dict from typing import List, Dict
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
from dateutil import parser from dateutil import parser
import get_realValue import get_realValue
import get_data import get_data
@@ -363,7 +363,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
else: else:
print("{} -- Failed to connect to InfluxDB.".format( print("{} -- Failed to connect to InfluxDB.".format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
write_api = client.write_api(write_options=SYNCHRONOUS) write_api = client.write_api(write_options=ASYNCHRONOUS)
try_count = 0 try_count = 0
reservoir_liquid_level_realtime_data_list = [] reservoir_liquid_level_realtime_data_list = []
tank_liquid_level_realtime_data_list = [] tank_liquid_level_realtime_data_list = []
@@ -433,6 +433,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if tank_liquid_level_realtime_data_list: if tank_liquid_level_realtime_data_list:
for data in tank_liquid_level_realtime_data_list: for data in tank_liquid_level_realtime_data_list:
@@ -464,6 +465,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if fixed_pump_realtime_data_list: if fixed_pump_realtime_data_list:
for data in fixed_pump_realtime_data_list: for data in fixed_pump_realtime_data_list:
@@ -495,6 +497,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if variable_pump_realtime_data_list: if variable_pump_realtime_data_list:
for data in variable_pump_realtime_data_list: for data in variable_pump_realtime_data_list:
@@ -526,6 +529,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if source_outflow_realtime_data_list: if source_outflow_realtime_data_list:
for data in source_outflow_realtime_data_list: for data in source_outflow_realtime_data_list:
@@ -557,6 +561,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if pipe_flow_realtime_data_list: if pipe_flow_realtime_data_list:
for data in pipe_flow_realtime_data_list: for data in pipe_flow_realtime_data_list:
@@ -588,6 +593,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if pressure_realtime_data_list: if pressure_realtime_data_list:
for data in pressure_realtime_data_list: for data in pressure_realtime_data_list:
@@ -619,6 +625,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if demand_realtime_data_list: if demand_realtime_data_list:
for data in demand_realtime_data_list: for data in demand_realtime_data_list:
@@ -650,6 +657,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
if quality_realtime_data_list: if quality_realtime_data_list:
for data in quality_realtime_data_list: for data in quality_realtime_data_list:
@@ -681,6 +689,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str
.time(get_real_value_time_utc) .time(get_real_value_time_utc)
) )
write_api.write(bucket=bucket, org=org_name, record=point) write_api.write(bucket=bucket, org=org_name, record=point)
write_api.flush()
def convert_time_format(original_time: str) -> str: def convert_time_format(original_time: str) -> str:
@@ -726,6 +735,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu
transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1) transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1)
get_history_data_start_time = end_date_dt - transmission_frequency_dt get_history_data_start_time = end_date_dt - transmission_frequency_dt
begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S') begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S')
# print(begin_date)
# print(end_date)
reservoir_liquid_level_non_realtime_data_list = [] reservoir_liquid_level_non_realtime_data_list = []
tank_liquid_level_non_realtime_data_list = [] tank_liquid_level_non_realtime_data_list = []
@@ -786,6 +797,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu
ids=','.join(globals.pressure_non_realtime_ids), ids=','.join(globals.pressure_non_realtime_ids),
begin_date=begin_date, end_date=end_date, begin_date=begin_date, end_date=end_date,
downsample='1m') downsample='1m')
# print(pressure_non_realtime_data_list)
if globals.demand_non_realtime_ids: if globals.demand_non_realtime_ids:
demand_non_realtime_data_list = get_data.get_history_data( demand_non_realtime_data_list = get_data.get_history_data(
@@ -1049,6 +1061,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str
# 写入数据到 InfluxDB多个 field 在同一个 point 中 # 写入数据到 InfluxDB多个 field 在同一个 point 中
write_api.write(bucket=bucket, org=org_name, record=node_point) write_api.write(bucket=bucket, org=org_name, record=node_point)
write_api.flush()
print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。") print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。")
for result in link_result_list: for result in link_result_list:
link_id = result.get('link') link_id = result.get('link')
@@ -1068,6 +1081,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str
.field("friction", data.get('friction', 0.0)) \ .field("friction", data.get('friction', 0.0)) \
.time(time_beijing) .time(time_beijing)
write_api.write(bucket=bucket, org=org_name, record=link_point) write_api.write(bucket=bucket, org=org_name, record=link_point)
write_api.flush()
print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。")
except Exception as e: except Exception as e:
@@ -1305,12 +1319,12 @@ if __name__ == "__main__":
# client.close() # client.close()
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里 # step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
# query_pg_scada_info_realtime('bb') query_pg_scada_info_realtime('bb')
# query_pg_scada_info_non_realtime('bb') query_pg_scada_info_non_realtime('bb')
# 手动执行 # 手动执行
# store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-07T16:52:00+08:00') # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-07T16:52:00+08:00')
# store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-01T12:00:00+08:00') store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00')
# step3: 查询测试示例 # step3: 查询测试示例
# with InfluxDBClient(url=url, token=token, org=org_name) as client: # with InfluxDBClient(url=url, token=token, org=org_name) as client:
@@ -1338,5 +1352,5 @@ if __name__ == "__main__":
# print(curve_result) # print(curve_result)
# 示例4query_SCADA_data_by_device_ID_and_time # 示例4query_SCADA_data_by_device_ID_and_time
# SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.reservoir_liquid_level_realtime_ids, query_time='2024-12-13T11:30:00+08:00') # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-08T10:30:00+08:00')
# print(SCADA_result_dict) # print(SCADA_result_dict)

View File

@@ -791,7 +791,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
fixed_pump_dict = {key: fixed_pump_SCADA_data_dict[value] for key, value in globals.fixed_pumps_id.items()} fixed_pump_dict = {key: fixed_pump_SCADA_data_dict[value] for key, value in globals.fixed_pumps_id.items()}
for fixed_pump_name, value in fixed_pump_dict.items(): for fixed_pump_name, value in fixed_pump_dict.items():
if value and float(value) != 0: if value:
pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern'])
pump_pattern['factors'][modify_index] = float(value) pump_pattern['factors'][modify_index] = float(value)
cs = ChangeSet() cs = ChangeSet()
@@ -802,11 +802,13 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
# 修改变频泵的pattern # 修改变频泵的pattern
variable_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( variable_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time) query_ids_list=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time)
# print(variable_pump_SCADA_data_dict)
variable_pump_dict = {key: variable_pump_SCADA_data_dict[value] for key, value in globals.variable_pumps_id.items()} variable_pump_dict = {key: variable_pump_SCADA_data_dict[value] for key, value in globals.variable_pumps_id.items()}
# print(variable_pump_dict)
for variable_pump_name, value in variable_pump_dict.items(): for variable_pump_name, value in variable_pump_dict.items():
if value and float(value) != 0: if value:
pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern'])
pump_pattern['factors'][modify_index] = float(value) / 50 pump_pattern['factors'][modify_index] = float(value) / 50
cs = ChangeSet() cs = ChangeSet()
@@ -821,7 +823,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
demand_dict = {key: demand_SCADA_data_dict[value] for key, value in globals.demand_id.items()} demand_dict = {key: demand_SCADA_data_dict[value] for key, value in globals.demand_id.items()}
for demand_name, value in demand_dict.items(): for demand_name, value in demand_dict.items():
if value and float(value) != 0: if value:
demand_pattern = get_pattern(name_c, get_demand(name_c, demand_name)['pattern']) demand_pattern = get_pattern(name_c, get_demand(name_c, demand_name)['pattern'])
if get_option(name_c)['UNITS'] == 'LPS': if get_option(name_c)['UNITS'] == 'LPS':
demand_pattern['factors'][modify_index] = float(value) / 3.6 # m3/h 转换为 L/s demand_pattern['factors'][modify_index] = float(value) / 3.6 # m3/h 转换为 L/s
@@ -838,25 +840,26 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
# 基于实时的出厂流量计数据修改出厂流量计绑定的pattern # 基于实时的出厂流量计数据修改出厂流量计绑定的pattern
source_outflow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( source_outflow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time) query_ids_list=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time)
print(source_outflow_SCADA_data_dict) # print(source_outflow_SCADA_data_dict)
source_outflow_dict = {key: source_outflow_SCADA_data_dict[value] for key, value in globals.source_outflow_pattern_id.items()} source_outflow_dict = {key: source_outflow_SCADA_data_dict[value] for key, value in globals.source_outflow_pattern_id.items()}
print(source_outflow_dict) # print(source_outflow_dict)
for pattern_name in source_outflow_dict.keys(): for pattern_name in source_outflow_dict.keys():
print(pattern_name) # print(pattern_name)
history_source_outflow_list = get_history_pattern_info(name_c, pattern_name) history_source_outflow_list = get_history_pattern_info(name_c, pattern_name)
history_source_outflow = history_source_outflow_list[modify_index] history_source_outflow = history_source_outflow_list[modify_index]
print(source_outflow_dict[pattern_name]) # print(source_outflow_dict[pattern_name])
realtime_source_outflow = float(source_outflow_dict[pattern_name]) if source_outflow_dict[pattern_name]:
realtime_source_outflow = float(source_outflow_dict[pattern_name])
multiply_factor = realtime_source_outflow / history_source_outflow multiply_factor = realtime_source_outflow / history_source_outflow
pattern = get_pattern(name_c, pattern_name) pattern = get_pattern(name_c, pattern_name)
pattern['factors'][modify_index] *= multiply_factor pattern['factors'][modify_index] *= multiply_factor
cs = ChangeSet() cs = ChangeSet()
cs.append(pattern) cs.append(pattern)
set_pattern(name_c, cs) set_pattern(name_c, cs)
if globals.realtime_pipe_flow_pattern_id: if globals.realtime_pipe_flow_pattern_id:
# 基于实时的pipe_flow类数据修改pipe_flow类绑定的pattern # 基于实时的pipe_flow类数据修改pipe_flow类绑定的pattern
@@ -868,16 +871,16 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
for pattern_name in realtime_pipe_flow_dict.keys(): for pattern_name in realtime_pipe_flow_dict.keys():
history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name) history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name)
history_pipe_flow = history_pipe_flow_list[modify_index] history_pipe_flow = history_pipe_flow_list[modify_index]
if realtime_pipe_flow_dict[pattern_name]:
realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name])
realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name]) multiply_factor = realtime_pipe_flow / history_pipe_flow
multiply_factor = realtime_pipe_flow / history_pipe_flow pattern = get_pattern(name_c, pattern_name)
pattern['factors'][modify_index] *= multiply_factor
pattern = get_pattern(name_c, pattern_name) cs = ChangeSet()
pattern['factors'][modify_index] *= multiply_factor cs.append(pattern)
cs = ChangeSet() set_pattern(name_c, cs)
cs.append(pattern)
set_pattern(name_c, cs)
if globals.pipe_flow_region_patterns: if globals.pipe_flow_region_patterns:
# 基于实时的pipe_flow类数据修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern # 基于实时的pipe_flow类数据修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern
@@ -896,18 +899,18 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
for pattern_name in temp_realtime_pipe_flow_dict.keys(): for pattern_name in temp_realtime_pipe_flow_dict.keys():
temp_history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name) temp_history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name)
temp_history_pipe_flow = temp_history_pipe_flow_list[modify_index] temp_history_pipe_flow = temp_history_pipe_flow_list[modify_index]
if temp_realtime_pipe_flow_dict[pattern_name]:
temp_realtime_pipe_flow = float(temp_realtime_pipe_flow_dict[pattern_name])
temp_realtime_pipe_flow = float(temp_realtime_pipe_flow_dict[pattern_name]) temp_multiply_factor = temp_realtime_pipe_flow / temp_history_pipe_flow
temp_multiply_factor = temp_realtime_pipe_flow / temp_history_pipe_flow temp_non_realtime_demand_pattern_list = globals.pipe_flow_region_patterns[pattern_name]
for demand_pattern_name in temp_non_realtime_demand_pattern_list:
temp_non_realtime_demand_pattern_list = globals.pipe_flow_region_patterns[pattern_name] pattern = get_pattern(name_c, demand_pattern_name)
for demand_pattern_name in temp_non_realtime_demand_pattern_list: pattern['factors'][modify_index] *= temp_multiply_factor
pattern = get_pattern(name_c, demand_pattern_name) cs = ChangeSet()
pattern['factors'][modify_index] *= temp_multiply_factor cs.append(pattern)
cs = ChangeSet() set_pattern(name_c, cs)
cs.append(pattern)
set_pattern(name_c, cs)
if globals.source_outflow_region: if globals.source_outflow_region:
# 根据associated_source_outflow_id进行分区各分区用出厂的流量计 - 实时的pipe_flow和demand进行数据更新 # 根据associated_source_outflow_id进行分区各分区用出厂的流量计 - 实时的pipe_flow和demand进行数据更新
@@ -936,13 +939,14 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
temp_history_pipe_flow_and_demand_list = get_history_pattern_info(name_c, pipe_flow_and_demand_pattern_name) temp_history_pipe_flow_and_demand_list = get_history_pattern_info(name_c, pipe_flow_and_demand_pattern_name)
history_region_total_realtime_region_pipe_flow_and_demand += temp_history_pipe_flow_and_demand_list[modify_index] history_region_total_realtime_region_pipe_flow_and_demand += temp_history_pipe_flow_and_demand_list[modify_index]
temp_multiply_factor = (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand) / (history_region_total_source_outflow - history_region_total_realtime_region_pipe_flow_and_demand) if (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand):
for non_realtime_region_pattern_name in temp_non_realtime_region_patterns: temp_multiply_factor = (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand) / (history_region_total_source_outflow - history_region_total_realtime_region_pipe_flow_and_demand)
pattern = get_pattern(name_c, non_realtime_region_pattern_name) for non_realtime_region_pattern_name in temp_non_realtime_region_patterns:
pattern['factors'][modify_index] *= temp_multiply_factor pattern = get_pattern(name_c, non_realtime_region_pattern_name)
cs = ChangeSet() pattern['factors'][modify_index] *= temp_multiply_factor
cs.append(pattern) cs = ChangeSet()
set_pattern(name_c, cs) cs.append(pattern)
set_pattern(name_c, cs)
# 根据高压出厂流量,更改高压用水模式 # 根据高压出厂流量,更改高压用水模式
# hp_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( # hp_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
@@ -1047,7 +1051,7 @@ if __name__ == "__main__":
# print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id) # print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id)
# print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns) # print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns)
run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-07T22:15:00+08:00') run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-08T10:30:00+08:00')