From e3605409890e74fdb74dbc3c85608a26b46b44e5 Mon Sep 17 00:00:00 2001 From: DingZQ Date: Sat, 8 Feb 2025 20:11:27 +0800 Subject: [PATCH] Add more code from WMH --- auto_realtime.py | 120 +++++++++++++++++++ auto_store_non_realtime_SCADA_data.py | 9 +- get_data.py | 165 ++++++++++++++++++++++++++ get_realValue.py | 39 ++++-- influxdb_api.py | 26 +++- simulation.py | 84 ++++++------- 6 files changed, 382 insertions(+), 61 deletions(-) create mode 100644 auto_realtime.py create mode 100644 get_data.py diff --git a/auto_realtime.py b/auto_realtime.py new file mode 100644 index 0000000..4ca2cb9 --- /dev/null +++ b/auto_realtime.py @@ -0,0 +1,120 @@ +import influxdb_api +import globals +from datetime import datetime, timedelta, timezone +import schedule +import time +from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi +import simulation + + +# 2025/02/01 +def get_next_time() -> str: + """ + 获取下一个1分钟时间点,返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00' + :return: 返回字符串格式的时间,表示下一个1分钟的时间点 + """ + # 获取当前时间,并设定为北京时间 + now = datetime.now() # now 类型为 datetime,表示当前本地时间 + # 获取当前的分钟,并且将秒和微秒置为零 + current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime,时间的秒和微秒部分被清除 + return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') + + +# 2025/02/06 +def store_realtime_SCADA_data_job() -> None: + """ + 定义的任务1,每分钟执行1次,每次执行时,更新get_real_value_time并调用store_realtime_SCADA_data_to_influxdb函数 + :return: None + """ + # 获取当前时间并更新get_real_value_time,转换为字符串格式 + get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str,格式为'2025-02-01T18:45:00+08:00' + # 调用函数执行任务 + influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time) + print('{} -- Successfully store realtime SCADA data.'.format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + + +# 2025/02/06 +def get_next_15minute_time() -> str: + """ + 获取下一个15分钟的时间点,返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00' + :return: 返回字符串格式的时间,表示下一个15分钟执行时间点 + """ + now = datetime.now() + # 向上舍入到下一个15分钟 + next_15minute = (now.minute // 15 + 1) * 15 - 15 + if next_15minute == 60: + next_15minute = 0 + now = now + timedelta(hours=1) + + next_time = now.replace(minute=next_15minute, second=0, microsecond=0) + return next_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') + + +# 2025/02/07 +def run_simulation_job() -> None: + """ + 定义的任务3,每15分钟执行一次在store_realtime_SCADA_data_to_influxdb之后执行run_simulation。 + :return: None + """ + # 获取当前时间,并检查是否是整点15分钟 + current_time = datetime.now() + if current_time.minute % 15 == 0: + print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.") + + # 计算前,获取scada_info中的信息,按照设定的方法修改pg数据库 + simulation.query_corresponding_element_id_and_query_id("bb") + simulation.query_corresponding_pattern_id_and_query_id('bb') + region_result = simulation.query_non_realtime_region('bb') + + globals.source_outflow_region_id = simulation.get_source_outflow_region_id('bb', region_result) + globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id('bb', region_result) + globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns('bb') + + globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns('bb', region_result) + globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns('bb', + globals.source_outflow_region_id, + globals.realtime_region_pipe_flow_and_demand_id) + modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点 + # print(modify_pattern_start_time) + simulation.run_simulation(name='bb', simulation_type="realtime", + modify_pattern_start_time=modify_pattern_start_time) + print('{} -- Successfully run simulation and store realtime simulation result.'.format( + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) + else: + print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Skipping the simulation task.") + + +# 2025/02/06 +def realtime_task() -> None: + """ + 定时执行任务1和,使用schedule库每1分钟执行一次store_realtime_SCADA_data_job函数。 + 该任务会一直运行,定期调用store_realtime_SCADA_data_job获取SCADA数据。 + :return: + """ + # 等待到整分对齐 + now = datetime.now() + wait_seconds = 60 - now.second + time.sleep(wait_seconds) + + # 使用 .at(":00") 指定在每分钟的第0秒执行 + schedule.every(1).minute.at(":00").do(store_realtime_SCADA_data_job) + # 每15分钟执行一次run_simulation_job + schedule.every(1).minute.at(":00").do(run_simulation_job) + # 持续执行任务,检查是否有待执行的任务 + while True: + schedule.run_pending() # 执行所有待处理的定时任务 + time.sleep(1) # 暂停1秒,避免过于频繁的任务检查 + + +if __name__ == "__main__": + url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 + token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token + org_name = "beibei" # 替换为你的Organization名称 + + client = InfluxDBClient(url=url, token=token) + + # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 + influxdb_api.query_pg_scada_info_realtime('bb') + # 自动执行 + realtime_task() diff --git a/auto_store_non_realtime_SCADA_data.py b/auto_store_non_realtime_SCADA_data.py index 15ff023..403f274 100644 --- a/auto_store_non_realtime_SCADA_data.py +++ b/auto_store_non_realtime_SCADA_data.py @@ -15,7 +15,7 @@ def get_next_period_time() -> str: # 获取当前时间,并设定为北京时间 now = datetime.now() # now 类型为 datetime,表示当前本地时间 # 获取当前的小时数并计算下一个6小时时间点 - next_period_hour = (now.hour // 6 + 1) * 6 # next_period_hour 类型为 int,表示下一个6小时时间点的小时部分 + next_period_hour = (now.hour // 6 + 1) * 6 - 6 # next_period_hour 类型为 int,表示下一个6小时时间点的小时部分 # 如果计算的小时大于23,表示进入第二天,调整为00:00 if next_period_hour >= 24: next_period_hour = 0 @@ -37,6 +37,7 @@ def store_non_realtime_SCADA_data_job() -> None: if current_time.hour % 6 == 0 and current_time.minute == 0: # 获取下一个6小时的时间点,并更新get_history_data_end_time get_history_data_end_time: str = get_next_period_time() # get_history_data_end_time 类型为 str,格式为'2025-02-06T12:00:00+08:00' + print(get_history_data_end_time) # 调用函数执行任务 influxdb_api.store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time) print('{} -- Successfully store non realtime SCADA data.'.format( @@ -50,9 +51,13 @@ def store_non_realtime_SCADA_data_task() -> None: 该任务会一直运行,定期调用store_non_realtime_SCADA_data_job获取SCADA数据。 :return: """ + # 等待到整分对齐 + now = datetime.now() + wait_seconds = 60 - now.second + time.sleep(wait_seconds) try: # 每分钟检查一次,执行store_non_realtime_SCADA_data_job - schedule.every(1).minute.do(store_non_realtime_SCADA_data_job) + schedule.every(1).minute.at(":00").do(store_non_realtime_SCADA_data_job) # 持续执行任务,检查是否有待执行的任务 while True: diff --git a/get_data.py b/get_data.py new file mode 100644 index 0000000..33d93c8 --- /dev/null +++ b/get_data.py @@ -0,0 +1,165 @@ +import requests +from datetime import datetime +import pytz +from typing import List, Dict, Union, Optional +import csv + + +def convert_timestamp_to_beijing_time(timestamp: Union[int, float]) -> datetime: + # 将毫秒级时间戳转换为秒级时间戳 + timestamp_seconds = timestamp / 1000 + + # 将时间戳转换为datetime对象 + utc_time = datetime.utcfromtimestamp(timestamp_seconds) + + # 设定UTC时区 + utc_timezone = pytz.timezone('UTC') + + # 转换为北京时间 + beijing_timezone = pytz.timezone('Asia/Shanghai') + beijing_time = utc_time.replace(tzinfo=utc_timezone).astimezone(beijing_timezone) + + return beijing_time + + +def beijing_time_to_utc(beijing_time_str: str) -> str: + # 定义北京时区 + beijing_timezone = pytz.timezone('Asia/Shanghai') + + # 将字符串转换为datetime对象 + beijing_time = datetime.strptime(beijing_time_str, '%Y-%m-%d %H:%M:%S') + + # 本地化时间对象 + beijing_time = beijing_timezone.localize(beijing_time) + + # 转换为UTC时间 + utc_time = beijing_time.astimezone(pytz.utc) + + # 转换为ISO 8601格式的字符串 + return utc_time.strftime('%Y-%m-%dT%H:%M:%SZ') + + +def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> List[Dict[str, Union[str, datetime, int, float]]]: +# def get_history_data(ids: str, begin_date: str, end_date: str, downsample: Optional[str]) -> None: + # 转换输入的北京时间为UTC时间 + begin_date_utc = beijing_time_to_utc(begin_date) + end_date_utc = beijing_time_to_utc(end_date) + + # 数据接口的地址 + url = 'http://183.64.62.100:9057/loong/api/curves/data' + # url = 'http://10.101.15.16:9000/loong/api/curves/data' + # url_path = 'http://10.101.15.16:9000/loong' # 内网 + + # 设置 GET 请求的参数 + params = { + 'ids': ids, + 'beginDate': begin_date_utc, + 'endDate': end_date_utc, + 'downsample': downsample + } + + history_data_list =[] + + try: + # 发送 GET 请求获取数据 + response = requests.get(url, params=params) + + # 检查响应状态码,200 表示请求成功 + if response.status_code == 200: + # 解析响应的 JSON 数据 + data = response.json() + # 这里可以对获取到的数据进行进一步处理 + + # 打印 'mpointId' 和 'mpointName' + for item in data['items']: + mpoint_id = str(item['mpointId']) + mpoint_name = item['mpointName'] + # print("mpointId:", item['mpointId']) + # print("mpointName:", item['mpointName']) + + # 打印 'dataDate' 和 'dataValue' + for item_data in item['data']: + # 将时间戳转换为北京时间 + beijing_time = convert_timestamp_to_beijing_time(item_data['dataDate']) + data_value = item_data['dataValue'] + # 创建一个字典存储每条数据 + data_dict = { + 'time': beijing_time, + 'device_ID': str(mpoint_id), + 'description': mpoint_name, + # 'dataDate (Beijing Time)': beijing_time.strftime('%Y-%m-%d %H:%M:%S'), + 'monitored_value': data_value # 保留原有类型 + } + + history_data_list.append(data_dict) + else: + # 如果请求不成功,打印错误信息 + print("请求失败,状态码:", response.status_code) + + except Exception as e: + # 捕获异常 + print("发生异常:", e) + + return history_data_list + + +# 使用示例 +# data_list = get_history_data(ids='9572', +# begin_date='2025-02-08 06:00:00', +# end_date='2025-02-08 12:00:00', +# downsample='1m') +# +# # 打印数据列表 +# for data in data_list: +# print(data) + +# # 定义 CSV 文件的路径 +# csv_file_path = './influxdb_data_4984.csv' +# # 将数据写入 CSV 文件 +# # with open(csv_file_path, mode='w', newline='') as file: +# # writer = csv.writer(file) +# # +# # # 写入表头 +# # writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime']) +# # +# # # 写入数据 +# # for data in data_list: +# # measurement = data['mpointName'] +# # mpointId = data['mpointId'] +# # date = data['datetime'].strftime('%Y-%m-%d') +# # dataValue = data['dataValue'] +# # datetime_str = data['datetime'] +# # +# # # 写入一行 +# # writer.writerow([measurement, mpointId, date, dataValue, datetime_str]) +# # +# # +# # print(f"数据已保存到 {csv_file_path}") +# +# filtered_csv_file_path = './filtered_influxdb_data_4984.csv' +# # +# # # # 读取并筛选数据 +# data_list1 = [] +# +# with open(csv_file_path, mode='r') as file: +# csv_reader = csv.DictReader(file) +# for row in csv_reader: +# # 将 datetime 列解析为 datetime 对象 +# datetime_value = datetime.strptime(row['datetime'], '%Y-%m-%d %H:%M:%S%z') +# +# # 只保留时间为 15 分钟倍数的行 +# if datetime_value.minute % 15 == 0: +# data_list1.append(row) +# +# # 将筛选后的数据写入新的 CSV 文件 +# with open(filtered_csv_file_path, mode='w', newline='') as file: +# writer = csv.writer(file) +# +# # 写入表头 +# writer.writerow(['measurement', 'mpointId', 'date', 'dataValue', 'datetime']) +# +# # 写入筛选后的数据 +# for data in data_list1: +# writer.writerow([data['measurement'], data['mpointId'], data['date'], data['dataValue'], data['datetime']]) +# +# print(f"筛选后的数据已保存到 {filtered_csv_file_path}") \ No newline at end of file diff --git a/get_realValue.py b/get_realValue.py index 31788e2..410d49f 100644 --- a/get_realValue.py +++ b/get_realValue.py @@ -1,6 +1,7 @@ import requests from datetime import datetime import pytz +from typing import List, Dict, Union, Tuple def convert_to_beijing_time(utc_time_str): @@ -17,15 +18,18 @@ def convert_to_beijing_time(utc_time_str): return beijing_time -def get_realValue(ids)->dict[str,float]: +def get_realValue(ids) -> List[Dict[str, Union[str, datetime, int, float]]]: # 数据接口的地址 url = 'http://183.64.62.100:9057/loong/api/mpoints/realValue' + # url = 'http://10.101.15.16:9000/loong/api/mpoints/realValue' # 内网 # 设置GET请求的参数 params = { 'ids': ids } - lst_data={} + # 创建一个字典来存储数据 + data_list = [] + try: # 发送GET请求获取数据 response = requests.get(url, params=params) @@ -34,27 +38,36 @@ def get_realValue(ids)->dict[str,float]: if response.status_code == 200: # 解析响应的JSON数据 data = response.json() - + # 只打印'id'、'datadt'和'realValue'数据 for realValue in data: - #print("id:", realValue['id']) - #print("mpointName:",realValue['mpointName']) + # print("id:", realValue['id']) + # print("mpointName:",realValue['mpointName']) # print("datadt:", realValue['datadt']) # 转换datadt字段为北京时间 - #beijing_time = convert_to_beijing_time(realValue['datadt']) - #print("datadt (Beijing Time):", beijing_time.strftime('%Y-%m-%d %H:%M:%S')) - #print("realValue:", realValue['realValue']) - #print() # 打印空行分隔不同条目 - r=float(realValue['realValue']) - lst_data[str(realValue['id'])]=r + beijing_time = convert_to_beijing_time(realValue['datadt']) + # print("datadt (Beijing Time):", beijing_time.strftime('%Y-%m-%d %H:%M:%S')) + # print("realValue:", realValue['realValue']) + # print() # 打印空行分隔不同条目 + # 将数据添加到字典中,值为一个字典,包含其他需要的字段 + data_list.append({ + 'device_ID': realValue['id'], + 'description': realValue['mpointName'], + 'time': beijing_time.strftime('%Y-%m-%d %H:%M:%S'), + 'monitored_value': realValue['realValue'] + }) + else: # 如果请求不成功,打印错误信息 print("请求失败,状态码:", response.status_code) - return lst_data + except Exception as e: # 捕获异常 print("发生异常:", e) + return data_list + # 使用示例 -#get_realValue(ids='2498,2500') \ No newline at end of file +# data_list = get_realValue(ids='2498,2500') +# print(data_list) diff --git a/influxdb_api.py b/influxdb_api.py index 0be8ea3..1dca474 100644 --- a/influxdb_api.py +++ b/influxdb_api.py @@ -1,7 +1,7 @@ from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from typing import List, Dict from datetime import datetime, timedelta, timezone -from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS from dateutil import parser import get_realValue import get_data @@ -363,7 +363,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str else: print("{} -- Failed to connect to InfluxDB.".format( datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) - write_api = client.write_api(write_options=SYNCHRONOUS) + write_api = client.write_api(write_options=ASYNCHRONOUS) try_count = 0 reservoir_liquid_level_realtime_data_list = [] tank_liquid_level_realtime_data_list = [] @@ -433,6 +433,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if tank_liquid_level_realtime_data_list: for data in tank_liquid_level_realtime_data_list: @@ -464,6 +465,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if fixed_pump_realtime_data_list: for data in fixed_pump_realtime_data_list: @@ -495,6 +497,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if variable_pump_realtime_data_list: for data in variable_pump_realtime_data_list: @@ -526,6 +529,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if source_outflow_realtime_data_list: for data in source_outflow_realtime_data_list: @@ -557,6 +561,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if pipe_flow_realtime_data_list: for data in pipe_flow_realtime_data_list: @@ -588,6 +593,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if pressure_realtime_data_list: for data in pressure_realtime_data_list: @@ -619,6 +625,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if demand_realtime_data_list: for data in demand_realtime_data_list: @@ -650,6 +657,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() if quality_realtime_data_list: for data in quality_realtime_data_list: @@ -681,6 +689,7 @@ def store_realtime_SCADA_data_to_influxdb(get_real_value_time: str, bucket: str .time(get_real_value_time_utc) ) write_api.write(bucket=bucket, org=org_name, record=point) + write_api.flush() def convert_time_format(original_time: str) -> str: @@ -726,6 +735,8 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu transmission_frequency_dt = datetime.strptime(globals.transmission_frequency, '%H:%M:%S') - datetime(1900, 1, 1) get_history_data_start_time = end_date_dt - transmission_frequency_dt begin_date = get_history_data_start_time.strftime('%Y-%m-%d %H:%M:%S') + # print(begin_date) + # print(end_date) reservoir_liquid_level_non_realtime_data_list = [] tank_liquid_level_non_realtime_data_list = [] @@ -786,6 +797,7 @@ def store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time: str, bu ids=','.join(globals.pressure_non_realtime_ids), begin_date=begin_date, end_date=end_date, downsample='1m') + # print(pressure_non_realtime_data_list) if globals.demand_non_realtime_ids: demand_non_realtime_data_list = get_data.get_history_data( @@ -1049,6 +1061,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str # 写入数据到 InfluxDB,多个 field 在同一个 point 中 write_api.write(bucket=bucket, org=org_name, record=node_point) + write_api.flush() print(f"成功将 {len(node_result_list)} 条node数据写入 InfluxDB。") for result in link_result_list: link_id = result.get('link') @@ -1068,6 +1081,7 @@ def store_realtime_simulation_result_to_influxdb(node_result_list: List[Dict[str .field("friction", data.get('friction', 0.0)) \ .time(time_beijing) write_api.write(bucket=bucket, org=org_name, record=link_point) + write_api.flush() print(f"成功将 {len(link_result_list)} 条link数据写入 InfluxDB。") except Exception as e: @@ -1305,12 +1319,12 @@ if __name__ == "__main__": # client.close() # step2: 先查询pg数据库中scada_info的信息,然后存储SCADA数据到SCADA_data这个bucket里 - # query_pg_scada_info_realtime('bb') - # query_pg_scada_info_non_realtime('bb') + query_pg_scada_info_realtime('bb') + query_pg_scada_info_non_realtime('bb') # 手动执行 # store_realtime_SCADA_data_to_influxdb(get_real_value_time='2025-02-07T16:52:00+08:00') - # store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-01T12:00:00+08:00') + store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time='2025-02-08T12:00:00+08:00') # step3: 查询测试示例 # with InfluxDBClient(url=url, token=token, org=org_name) as client: @@ -1338,5 +1352,5 @@ if __name__ == "__main__": # print(curve_result) # 示例4:query_SCADA_data_by_device_ID_and_time - # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.reservoir_liquid_level_realtime_ids, query_time='2024-12-13T11:30:00+08:00') + # SCADA_result_dict = query_SCADA_data_by_device_ID_and_time(globals.variable_pump_realtime_ids, query_time='2025-02-08T10:30:00+08:00') # print(SCADA_result_dict) diff --git a/simulation.py b/simulation.py index db5e935..7c87eed 100644 --- a/simulation.py +++ b/simulation.py @@ -791,7 +791,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s fixed_pump_dict = {key: fixed_pump_SCADA_data_dict[value] for key, value in globals.fixed_pumps_id.items()} for fixed_pump_name, value in fixed_pump_dict.items(): - if value and float(value) != 0: + if value: pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) pump_pattern['factors'][modify_index] = float(value) cs = ChangeSet() @@ -802,11 +802,13 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 修改变频泵的pattern variable_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time) + # print(variable_pump_SCADA_data_dict) variable_pump_dict = {key: variable_pump_SCADA_data_dict[value] for key, value in globals.variable_pumps_id.items()} + # print(variable_pump_dict) for variable_pump_name, value in variable_pump_dict.items(): - if value and float(value) != 0: + if value: pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern']) pump_pattern['factors'][modify_index] = float(value) / 50 cs = ChangeSet() @@ -821,7 +823,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s demand_dict = {key: demand_SCADA_data_dict[value] for key, value in globals.demand_id.items()} for demand_name, value in demand_dict.items(): - if value and float(value) != 0: + if value: demand_pattern = get_pattern(name_c, get_demand(name_c, demand_name)['pattern']) if get_option(name_c)['UNITS'] == 'LPS': demand_pattern['factors'][modify_index] = float(value) / 3.6 # m3/h 转换为 L/s @@ -838,25 +840,26 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s # 基于实时的出厂流量计数据,修改出厂流量计绑定的pattern source_outflow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time) - print(source_outflow_SCADA_data_dict) + # print(source_outflow_SCADA_data_dict) source_outflow_dict = {key: source_outflow_SCADA_data_dict[value] for key, value in globals.source_outflow_pattern_id.items()} - print(source_outflow_dict) + # print(source_outflow_dict) for pattern_name in source_outflow_dict.keys(): - print(pattern_name) + # print(pattern_name) history_source_outflow_list = get_history_pattern_info(name_c, pattern_name) history_source_outflow = history_source_outflow_list[modify_index] - print(source_outflow_dict[pattern_name]) - realtime_source_outflow = float(source_outflow_dict[pattern_name]) + # print(source_outflow_dict[pattern_name]) + if source_outflow_dict[pattern_name]: + realtime_source_outflow = float(source_outflow_dict[pattern_name]) - multiply_factor = realtime_source_outflow / history_source_outflow + multiply_factor = realtime_source_outflow / history_source_outflow - pattern = get_pattern(name_c, pattern_name) - pattern['factors'][modify_index] *= multiply_factor - cs = ChangeSet() - cs.append(pattern) - set_pattern(name_c, cs) + pattern = get_pattern(name_c, pattern_name) + pattern['factors'][modify_index] *= multiply_factor + cs = ChangeSet() + cs.append(pattern) + set_pattern(name_c, cs) if globals.realtime_pipe_flow_pattern_id: # 基于实时的pipe_flow类数据,修改pipe_flow类绑定的pattern @@ -868,16 +871,16 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s for pattern_name in realtime_pipe_flow_dict.keys(): history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name) history_pipe_flow = history_pipe_flow_list[modify_index] + if realtime_pipe_flow_dict[pattern_name]: + realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name]) - realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name]) + multiply_factor = realtime_pipe_flow / history_pipe_flow - multiply_factor = realtime_pipe_flow / history_pipe_flow - - pattern = get_pattern(name_c, pattern_name) - pattern['factors'][modify_index] *= multiply_factor - cs = ChangeSet() - cs.append(pattern) - set_pattern(name_c, cs) + pattern = get_pattern(name_c, pattern_name) + pattern['factors'][modify_index] *= multiply_factor + cs = ChangeSet() + cs.append(pattern) + set_pattern(name_c, cs) if globals.pipe_flow_region_patterns: # 基于实时的pipe_flow类数据,修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern @@ -896,18 +899,18 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s for pattern_name in temp_realtime_pipe_flow_dict.keys(): temp_history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name) temp_history_pipe_flow = temp_history_pipe_flow_list[modify_index] + if temp_realtime_pipe_flow_dict[pattern_name]: + temp_realtime_pipe_flow = float(temp_realtime_pipe_flow_dict[pattern_name]) - temp_realtime_pipe_flow = float(temp_realtime_pipe_flow_dict[pattern_name]) + temp_multiply_factor = temp_realtime_pipe_flow / temp_history_pipe_flow - temp_multiply_factor = temp_realtime_pipe_flow / temp_history_pipe_flow - - temp_non_realtime_demand_pattern_list = globals.pipe_flow_region_patterns[pattern_name] - for demand_pattern_name in temp_non_realtime_demand_pattern_list: - pattern = get_pattern(name_c, demand_pattern_name) - pattern['factors'][modify_index] *= temp_multiply_factor - cs = ChangeSet() - cs.append(pattern) - set_pattern(name_c, cs) + temp_non_realtime_demand_pattern_list = globals.pipe_flow_region_patterns[pattern_name] + for demand_pattern_name in temp_non_realtime_demand_pattern_list: + pattern = get_pattern(name_c, demand_pattern_name) + pattern['factors'][modify_index] *= temp_multiply_factor + cs = ChangeSet() + cs.append(pattern) + set_pattern(name_c, cs) if globals.source_outflow_region: # 根据associated_source_outflow_id进行分区,各分区用(出厂的流量计 - 实时的pipe_flow和demand)进行数据更新 @@ -936,13 +939,14 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s temp_history_pipe_flow_and_demand_list = get_history_pattern_info(name_c, pipe_flow_and_demand_pattern_name) history_region_total_realtime_region_pipe_flow_and_demand += temp_history_pipe_flow_and_demand_list[modify_index] - temp_multiply_factor = (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand) / (history_region_total_source_outflow - history_region_total_realtime_region_pipe_flow_and_demand) - for non_realtime_region_pattern_name in temp_non_realtime_region_patterns: - pattern = get_pattern(name_c, non_realtime_region_pattern_name) - pattern['factors'][modify_index] *= temp_multiply_factor - cs = ChangeSet() - cs.append(pattern) - set_pattern(name_c, cs) + if (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand): + temp_multiply_factor = (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand) / (history_region_total_source_outflow - history_region_total_realtime_region_pipe_flow_and_demand) + for non_realtime_region_pattern_name in temp_non_realtime_region_patterns: + pattern = get_pattern(name_c, non_realtime_region_pattern_name) + pattern['factors'][modify_index] *= temp_multiply_factor + cs = ChangeSet() + cs.append(pattern) + set_pattern(name_c, cs) # 根据高压出厂流量,更改高压用水模式 # hp_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time( @@ -1047,7 +1051,7 @@ if __name__ == "__main__": # print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id) # print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns) - run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-07T22:15:00+08:00') + run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-08T10:30:00+08:00')