Update influxdb_api

This commit is contained in:
DingZQ
2025-03-04 20:59:45 +08:00
parent de0b4403d4
commit 05569f8859
2 changed files with 677 additions and 334 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -38,11 +38,9 @@ def query_corresponding_element_id_and_query_id(name: str) -> None:
WHERE transmission_mode = 'realtime';
""")
records = cur.fetchall()
# 遍历查询结果,并根据 type 将数据存储到相应的字典中
for record in records:
type_, associated_element_id, api_query_id = record
if type_ == 'reservoir_liquid_level':
globals.reservoirs_id[associated_element_id] = api_query_id
elif type_ == 'tank_liquid_level':
@@ -84,11 +82,9 @@ def query_corresponding_pattern_id_and_query_id(name: str) -> None:
AND type IN ('source_outflow', 'pipe_flow');
""")
records = cur.fetchall()
# 遍历查询结果,并根据 type 将数据存储到相应的字典中
for record in records:
type_, associated_pattern, api_query_id = record
if type_ == 'source_outflow':
globals.source_outflow_pattern_id[associated_pattern] = api_query_id
elif type_ == 'pipe_flow':
@@ -109,7 +105,6 @@ def query_non_realtime_region(name: str) -> dict:
source_outflow_regions = [] # 用于存储所有 region包含重复的
# 构建连接字符串
conn_string = f"dbname={name} host=127.0.0.1"
try:
# 连接到数据库
with psycopg.connect(conn_string) as conn:
@@ -121,26 +116,21 @@ def query_non_realtime_region(name: str) -> dict:
WHERE transmission_mode = 'non_realtime'
AND type = 'pipe_flow';
""")
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 找出所有以 'associated_source_outflow_id' 开头的列
source_outflow_cols = [col for col in col_names if col.startswith('associated_source_outflow_id')]
logging.info(f"Identified source_outflow columns: {source_outflow_cols}")
for record in records:
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
values = [record[col_names.index(col)] for col in source_outflow_cols if
record[col_names.index(col)] is not None]
# 如果该记录有相关的值,则将其作为一个 region
if values:
# 将值排序以确保相同的组合顺序一致(如果顺序不重要)
# 如果顺序重要,请删除排序步骤
region_tuple = tuple(sorted(values))
source_outflow_regions.append(region_tuple)
# 移除重复的 regions
unique_regions = []
seen = set()
@@ -148,18 +138,15 @@ def query_non_realtime_region(name: str) -> dict:
if region not in seen:
seen.add(region)
unique_regions.append(region)
# 为每个唯一的 region 分配一个 region 键
for idx, region in enumerate(unique_regions, 1):
region_key = f"region{idx}"
globals.source_outflow_region[region_key] = list(region)
logging.info("查询并处理数据成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.source_outflow_region
@@ -176,7 +163,6 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c
globals.non_realtime_region_patterns = {region: [] for region in globals.source_outflow_region.keys()}
region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()}
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -186,31 +172,24 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c
FROM scada_info
WHERE transmission_mode = 'non_realtime'
""")
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 找出所有以指定前缀开头的列
source_outflow_cols = [col for col in col_names if col.startswith(column_prefix)]
logging.info(f"Identified source_outflow columns: {source_outflow_cols}")
# 确保 'associated_pattern' 列存在
if 'associated_pattern' not in col_names:
logging.error("'associated_pattern' column not found in scada_info table.")
return globals.non_realtime_region_patterns
# 获取 'associated_pattern' 列的索引
pattern_idx = col_names.index('associated_pattern')
for record in records:
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
values = [record[col_names.index(col)] for col in source_outflow_cols if
record[col_names.index(col)] is not None]
if values:
# 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配
region_frozenset = frozenset(values)
# 检查是否存在匹配的 region
region_key = region_tuple_to_key.get(region_frozenset)
if region_key:
@@ -218,20 +197,16 @@ def query_non_realtime_region_patterns(name: str, source_outflow_region: dict, c
associated_pattern = record[pattern_idx]
if associated_pattern is not None:
globals.non_realtime_region_patterns[region_key].append(associated_pattern)
logging.info("生成 regions_patterns 成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
# 获取pipe_flow_region_patterns中的所有区域
exclude_regions = set(region for regions in globals.pipe_flow_region_patterns.values() for region in regions)
# 从non_realtime_region_patterns中去除这些区域
for region_key, regions in globals.non_realtime_region_patterns.items():
globals.non_realtime_region_patterns[region_key] = [region for region in regions if region not in exclude_regions]
return globals.non_realtime_region_patterns
@@ -249,7 +224,6 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi
# 创建一个映射,从 frozenset(ids) 到 region_key
region_tuple_to_key = {frozenset(ids): region for region, ids in globals.source_outflow_region.items()}
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -260,31 +234,24 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi
WHERE transmission_mode = 'realtime'
AND type IN ('pipe_flow', 'demand');
""")
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 找出所有以指定前缀开头的列
source_outflow_cols = [col for col in col_names if col.startswith(column_prefix)]
logging.info(f"Identified source_outflow columns: {source_outflow_cols}")
# 确保 'api_query_id' 列存在
if 'api_query_id' not in col_names:
logging.error("'api_query_id' column not found in scada_info table.")
return globals.realtime_region_pipe_flow_and_demand_id
# 获取 'api_query_id' 列的索引
api_query_id_idx = col_names.index('api_query_id')
for record in records:
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
values = [record[col_names.index(col)] for col in source_outflow_cols if
record[col_names.index(col)] is not None]
if values:
# 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配
region_frozenset = frozenset(values)
# 检查是否存在匹配的 region
region_key = region_tuple_to_key.get(region_frozenset)
if region_key:
@@ -292,13 +259,11 @@ def query_realtime_region_pipe_flow_and_demand_id(name: str, source_outflow_regi
api_query_id = record[api_query_id_idx]
if api_query_id is not None:
globals.realtime_region_pipe_flow_and_demand_id[region_key].append(api_query_id)
logging.info("生成 realtime_region_pipe_flow_and_demand_id 成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.realtime_region_pipe_flow_and_demand_id
@@ -316,7 +281,6 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_
:return: pipe_flow_region_patterns 字典
"""
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -327,18 +291,14 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_
WHERE type = 'demand'
AND transmission_mode = 'non_realtime';
""")
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 获取列索引
pattern_idx = col_names.index('associated_pattern')
pipe_flow_id_idx = col_names.index('associated_pipe_flow_id')
for record in records:
associated_pattern = record[pattern_idx]
associated_pipe_flow_id = record[pipe_flow_id_idx]
if associated_pipe_flow_id:
# 根据 associated_pipe_flow_id 查询对应的记录
cur.execute("""
@@ -346,26 +306,20 @@ def query_pipe_flow_region_patterns(name: str, column_prefix: str = 'associated_
FROM scada_info
WHERE associated_element_id = %s;
""", (associated_pipe_flow_id,))
pipe_flow_record = cur.fetchone()
if pipe_flow_record:
pipe_flow_associated_pattern = pipe_flow_record[0]
transmission_mode = pipe_flow_record[1]
if transmission_mode == 'realtime':
# 将 associated_pattern 记录到字典中
if pipe_flow_associated_pattern not in globals.pipe_flow_region_patterns:
globals.pipe_flow_region_patterns[pipe_flow_associated_pattern] = []
globals.pipe_flow_region_patterns[pipe_flow_associated_pattern].append(associated_pattern)
logging.info("生成 pipe_flow_region_patterns 成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.pipe_flow_region_patterns
@@ -389,10 +343,8 @@ def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict:
WHERE id = %s
"""
cur.execute(query, (SCADA_ID,)) # 执行查询并传递参数
# 获取查询结果
result = cur.fetchone()
if result:
# 将结果转换为字典
associated_info = {
@@ -425,13 +377,10 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict,
all_ids = set()
for ids in globals.source_outflow_region.values():
all_ids.update(ids)
if not all_ids:
logging.warning("No associated_source_outflow_id found in source_outflow_region.")
return globals.source_outflow_region_id
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
@@ -443,7 +392,6 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict,
"""
cur.execute(query, (list(all_ids),))
rows = cur.fetchall()
# 构建 associated_source_outflow_id 到 api_query_id 的映射
id_to_api_query_id = {}
for row in rows:
@@ -451,7 +399,6 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict,
api_query_id = row[1]
if associated_id in all_ids and api_query_id is not None:
id_to_api_query_id[associated_id] = str(api_query_id)
# 替换 source_outflow_region 中的 associated_source_outflow_id 为 api_query_id
for region, ids in globals.source_outflow_region.items():
for id_ in ids:
@@ -460,12 +407,10 @@ def get_source_outflow_region_id(name: str, source_outflow_region: dict,
globals.source_outflow_region_id[region].append(api_id)
else:
logging.warning(f"No api_query_id found for associated_source_outflow_id: {id_}")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.source_outflow_region_id
@@ -483,7 +428,6 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real
globals.source_outflow_region_patterns = {region: [] for region in globals.source_outflow_region_id.keys()}
globals.realtime_region_pipe_flow_and_demand_patterns = {region: [] for region in
globals.realtime_region_pipe_flow_and_demand_id.keys()}
conn_string = f"dbname={name} host=127.0.0.1"
try:
with psycopg.connect(conn_string) as conn:
@@ -503,7 +447,6 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real
globals.source_outflow_region_patterns[region] = [
associated_pattern for _, associated_pattern in results if associated_pattern
]
# 获取 realtime_region_pipe_flow_and_demand_id 的 api_query_id 并查询 associated_pattern
realtime_api_ids = globals.realtime_region_pipe_flow_and_demand_id[region]
if realtime_api_ids:
@@ -517,13 +460,11 @@ def get_realtime_region_patterns(name: str, source_outflow_region_id: dict, real
globals.realtime_region_pipe_flow_and_demand_patterns[region] = [
associated_pattern for _, associated_pattern in results if associated_pattern
]
logging.info("生成 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns
@@ -556,6 +497,7 @@ def get_pattern_index_str(current_time: str) -> str:
str_i = '{}:{}:00'.format(hrN_str, minN_str)
return str_i
def from_seconds_to_clock (secs: int)->str:
"""
从秒格式化为“HH:MM:00”字符串
@@ -620,7 +562,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
time_cost_start = time.perf_counter()
print('{} -- Hydraulic simulation started.'.format(
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')))
# 重新打开数据库
if is_project_open(name):
close_project(name)
@@ -646,12 +587,10 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
time_obj = datetime.strptime(globals.hydraulic_timestep, '%H:%M:%S')
# 转换为分钟浮点数
globals.PATTERN_TIME_STEP = float(time_obj.hour * 60 + time_obj.minute + time_obj.second / 60)
# 对输入的时间参数进行处理
pattern_start_time = convert_time_format(modify_pattern_start_time)
# 获取模拟开始时间是对应pattern的第几个数
modify_index = get_pattern_index(pattern_start_time)
# 遍历水泵的pattern_id并根据输入的pump_pattern修改pattern的值
# for pump_pattern_id in pump_pattern_ids:
# # 检查pump_pattern中pump_pattern_id对应的第一个频率值是否为有效数字非空、非NaN。如果该值有效则继续执行代码块。
@@ -664,7 +603,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
# cs = ChangeSet()
# cs.append(pump_pattern)
# set_pattern(name_c, cs)
# 修改模拟开始的时间
str_pattern_start = get_pattern_index_str(convert_time_format(modify_pattern_start_time))
dic_time = get_time(name_c)
@@ -675,17 +613,14 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.operations.append(dic_time)
set_time(name_c, cs)
# 根据SCADA实时数据进行修改如果没有对应的SCADA数据如未来的时间点则不改变pg数据库的数据
if globals.reservoirs_id:
# reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'}
# 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'}
reservoir_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.reservoirs_id.values()), query_time=modify_pattern_start_time)
# 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'}
reservoir_dict = {key: reservoir_SCADA_data_dict[value] for key, value in globals.reservoirs_id.items()}
# 3.修改reservoir液位模式
for reservoir_name, value in reservoir_dict.items():
if value and float(value) != 0:
@@ -695,14 +630,11 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(reservoir_pattern)
set_pattern(name_c, cs)
if globals.tanks_id:
# 修改tank初始液位
tank_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.tanks_id.values()), query_time=modify_pattern_start_time)
tank_dict = {key: tank_SCADA_data_dict[value] for key, value in globals.tanks_id.items()}
for tank_name, value in tank_dict.items():
if value and float(value) != 0:
tank = get_tank(name_c, tank_name)
@@ -710,14 +642,11 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(tank)
set_tank(name_c, cs)
if globals.fixed_pumps_id:
# 修改工频泵的pattern
fixed_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.fixed_pumps_id.values()), query_time=modify_pattern_start_time)
fixed_pump_dict = {key: fixed_pump_SCADA_data_dict[value] for key, value in globals.fixed_pumps_id.items()}
for fixed_pump_name, value in fixed_pump_dict.items():
if value:
pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern'])
@@ -725,31 +654,23 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
if globals.variable_pumps_id:
# 修改变频泵的pattern
variable_pump_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.variable_pumps_id.values()), query_time=modify_pattern_start_time)
# print(variable_pump_SCADA_data_dict)
variable_pump_dict = {key: variable_pump_SCADA_data_dict[value] for key, value in globals.variable_pumps_id.items()}
# print(variable_pump_dict)
for variable_pump_name, value in variable_pump_dict.items():
if value:
pump_pattern = get_pattern(name_c, get_pump(name_c, fixed_pump_name)['pattern'])
pump_pattern = get_pattern(name_c, get_pump(name_c, variable_pump_name)['pattern'])
pump_pattern['factors'][modify_index] = float(value) / 50
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
if globals.demand_id:
# 基于实时数据修改大用户节点的pattern
demand_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.demand_id.values()), query_time=modify_pattern_start_time)
demand_dict = {key: demand_SCADA_data_dict[value] for key, value in globals.demand_id.items()}
for demand_name, value in demand_dict.items():
if value:
demand_pattern = get_pattern(name_c, get_demand(name_c, demand_name)['pattern'])
@@ -760,19 +681,15 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(demand_pattern)
set_pattern(name_c, cs)
# 水质、压力实时数据使用方法待补充
#############################
if globals.source_outflow_pattern_id:
# 基于实时的出厂流量计数据修改出厂流量计绑定的pattern
source_outflow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.source_outflow_pattern_id.values()), query_time=modify_pattern_start_time)
# print(source_outflow_SCADA_data_dict)
source_outflow_dict = {key: source_outflow_SCADA_data_dict[value] for key, value in globals.source_outflow_pattern_id.items()}
# print(source_outflow_dict)
for pattern_name in source_outflow_dict.keys():
# print(pattern_name)
history_source_outflow_list = get_history_pattern_info(name_c, pattern_name)
@@ -780,36 +697,28 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
# print(source_outflow_dict[pattern_name])
if source_outflow_dict[pattern_name]:
realtime_source_outflow = float(source_outflow_dict[pattern_name])
multiply_factor = realtime_source_outflow / history_source_outflow
pattern = get_pattern(name_c, pattern_name)
pattern['factors'][modify_index] *= multiply_factor
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
if globals.realtime_pipe_flow_pattern_id:
# 基于实时的pipe_flow类数据修改pipe_flow类绑定的pattern
realtime_pipe_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(globals.realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time)
realtime_pipe_flow_dict = {key: realtime_pipe_flow_SCADA_data_dict[value] for key, value in globals.realtime_pipe_flow_pattern_id.items()}
for pattern_name in realtime_pipe_flow_dict.keys():
history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name)
history_pipe_flow = history_pipe_flow_list[modify_index]
if realtime_pipe_flow_dict[pattern_name]:
realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name])
multiply_factor = realtime_pipe_flow / history_pipe_flow
pattern = get_pattern(name_c, pattern_name)
pattern['factors'][modify_index] *= multiply_factor
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
if globals.pipe_flow_region_patterns:
# 基于实时的pipe_flow类数据修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern
temp_realtime_pipe_flow_pattern_id = {}
@@ -818,20 +727,15 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
# 获取对应的实时值
query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region)
temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id
temp_realtime_pipe_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=list(temp_realtime_pipe_flow_pattern_id.values()), query_time=modify_pattern_start_time)
temp_realtime_pipe_flow_dict = {key: temp_realtime_pipe_flow_SCADA_data_dict[value] for key, value in temp_realtime_pipe_flow_pattern_id.items()}
for pattern_name in temp_realtime_pipe_flow_dict.keys():
temp_history_pipe_flow_list = get_history_pattern_info(name_c, pattern_name)
temp_history_pipe_flow = temp_history_pipe_flow_list[modify_index]
if temp_realtime_pipe_flow_dict[pattern_name]:
temp_realtime_pipe_flow = float(temp_realtime_pipe_flow_dict[pattern_name])
temp_multiply_factor = temp_realtime_pipe_flow / temp_history_pipe_flow
temp_non_realtime_demand_pattern_list = globals.pipe_flow_region_patterns[pattern_name]
for demand_pattern_name in temp_non_realtime_demand_pattern_list:
pattern = get_pattern(name_c, demand_pattern_name)
@@ -839,7 +743,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
if globals.source_outflow_region:
# 根据associated_source_outflow_id进行分区各分区用出厂的流量计 - 实时的pipe_flow和demand进行数据更新
for region in globals.source_outflow_region.keys():
@@ -848,13 +751,10 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
temp_source_outflow_region_patterns = globals.source_outflow_region_patterns.get(region, [])
temp_realtime_region_pipe_flow_and_demand_patterns = globals.realtime_region_pipe_flow_and_demand_patterns.get(region, [])
temp_non_realtime_region_patterns = globals.non_realtime_region_patterns.get(region, [])
region_source_outflow_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=temp_source_outflow_region_id, query_time=modify_pattern_start_time)
region_realtime_region_pipe_flow_and_demand_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
query_ids_list=temp_realtime_region_pipe_flow_and_demand_id, query_time=modify_pattern_start_time)
# 2025/02/12 确保 region_source_outflow_data_dict 和
# region_realtime_region_pipe_flow_and_demand_data_dict中的每个值都不是 None 且不为 0
region_source_outflow_valid_values = [float(value) for value in region_source_outflow_data_dict.values() if value not in [None, 0]]
@@ -866,13 +766,11 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
for source_outflow_pattern_name in temp_source_outflow_region_patterns:
temp_history_source_outflow_list = get_history_pattern_info(name_c, source_outflow_pattern_name)
history_region_total_source_outflow += temp_history_source_outflow_list[modify_index]
region_total_realtime_region_pipe_flow_and_demand = sum(valid_values)
history_region_total_realtime_region_pipe_flow_and_demand = 0
for pipe_flow_and_demand_pattern_name in temp_realtime_region_pipe_flow_and_demand_patterns:
temp_history_pipe_flow_and_demand_list = get_history_pattern_info(name_c, pipe_flow_and_demand_pattern_name)
history_region_total_realtime_region_pipe_flow_and_demand += temp_history_pipe_flow_and_demand_list[modify_index]
temp_multiply_factor = (region_total_source_outflow - region_total_realtime_region_pipe_flow_and_demand) / (history_region_total_source_outflow - history_region_total_realtime_region_pipe_flow_and_demand)
for non_realtime_region_pattern_name in temp_non_realtime_region_patterns:
pattern = get_pattern(name_c, non_realtime_region_pattern_name)
@@ -880,7 +778,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
# 根据输入的参数进行数据修改后面修改的可以覆盖前面的用于EXTENDED类的方案模拟
# 修改清水池(reservoir)液位的pattern
if modify_reservoir_head_pattern:
@@ -897,7 +794,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(reservoir_pattern)
set_pattern(name_c, cs)
# 修改调节池(tank)初始液位
if modify_tank_initial_level:
for tank_name in modify_tank_initial_level.keys():
@@ -907,7 +803,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(tank)
set_tank(name_c, cs)
# 修改节点junction基础水量demand
if modify_junction_base_demand:
for junction_name in modify_junction_base_demand.keys():
@@ -917,7 +812,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(junction)
set_demand(name_c, cs)
# 修改节点junction的水量模式pattern
if modify_junction_damand_pattern:
for pattern_name in modify_junction_damand_pattern.keys():
@@ -929,7 +823,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(junction_pattern)
set_pattern(name_c, cs)
# 修改工频水泵fixed_pump的pattern
if modify_fixed_pump_pattern:
for pump_name in modify_fixed_pump_pattern.keys():
@@ -941,7 +834,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
# 修改变频水泵variable_pump的pattern
if modify_variable_pump_pattern:
for pump_name in modify_variable_pump_pattern.keys():
@@ -954,7 +846,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
# 修改阀门valve的状态setting和status
if modify_valve_opening:
for valve_name in modify_valve_opening.keys():
@@ -972,62 +863,12 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs = ChangeSet()
cs.append(valve_status)
set_status(name_c, cs)
# 根据高压出厂流量,更改高压用水模式
# hp_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
# query_ids_list=list(hp_flow_pattern_id.values()), query_time=modify_pattern_start_time)
#
# hp_flow_dict = {key: hp_flow_SCADA_data_dict[value] for key, value in hp_flow_pattern_id.items()}
#
# all_valid = all(value and float(value) != 0 for value in hp_flow_dict.values())
#
# if all_valid:
# hp_total_SCADA_flow = sum(float(value) for value in hp_flow_dict.values())
# hp_total_history_flow = 0
# for pattern_name in hp_flow_dict.keys():
# history_flow_list = get_history_pattern_info(name_c, pattern_name)
# hp_total_history_flow += history_flow_list[modify_index]
#
# multiply_factor1 = hp_total_SCADA_flow / hp_total_history_flow
# hp_pattern_list = regions_patterns['hp']
# for pattern_name in hp_pattern_list:
# pattern = get_pattern(name_c, pattern_name)
# pattern['factors'][modify_index] *= multiply_factor1
# cs = ChangeSet()
# cs.append(pattern)
# set_pattern(name_c, cs)
#
# # 根据低压出厂流量,更改低压用水模式
# lp_flow_SCADA_data_dict = influxdb_api.query_SCADA_data_by_device_ID_and_time(
# query_ids_list=list(lp_flow_pattern_id.values()), query_time=modify_pattern_start_time)
#
# lp_flow_dict = {key: lp_flow_SCADA_data_dict[value] for key, value in lp_flow_pattern_id.items()}
#
# all_valid2 = all(value and float(value) != 0 for value in lp_flow_dict.values())
#
# if all_valid2:
# lp_total_SCADA_flow = sum(float(value) for value in lp_flow_dict.values())
# lp_total_history_flow = 0
# for pattern_name in lp_flow_dict.keys():
# history_flow_list = get_history_pattern_info(name_c, pattern_name)
# lp_total_history_flow += history_flow_list[modify_index]
#
# multiply_factor2 = lp_total_SCADA_flow / lp_total_history_flow
# lp_pattern_list = regions_patterns['lp']
# for pattern_name in lp_pattern_list:
# pattern = get_pattern(name_c, pattern_name)
# pattern['factors'][modify_index] *= multiply_factor2
# cs = ChangeSet()
# cs.append(pattern)
# set_pattern(name_c, cs)
# 运行并返回结果
result = run_project(name_c)
time_cost_end = time.perf_counter()
print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format(
datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'),
time_cost_end - time_cost_start))
close_project(name_c)
# DingZQ 下面这几句一定要这样,不然读取不了
@@ -1040,7 +881,6 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
node_result = output.node_results()
link_result = output.link_results()
num_periods_result = output.times()['num_periods']
# print(num_periods_result)
# print(node_result)
# 存储
@@ -1065,25 +905,25 @@ if __name__ == "__main__":
globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns = get_realtime_region_patterns('bb', globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id)
# 打印字典内容以验证
# print("Reservoirs ID:", globals.reservoirs_id)
# print("Tanks ID:", globals.tanks_id)
# print("Fixed Pumps ID:", globals.fixed_pumps_id)
# print("Variable Pumps ID:", globals.variable_pumps_id)
# print("Pressure ID:", globals.pressure_id)
# print("Demand ID:", globals.demand_id)
# print("Quality ID:", globals.quality_id)
# print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id)
# print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id)
# print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns)
# print("Source Outflow Region:", region_result)
# print('Source Outflow Region ID:', globals.source_outflow_region_id)
# print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns)
# print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns)
# print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id)
# print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns)
print("Reservoirs ID:", globals.reservoirs_id)
print("Tanks ID:", globals.tanks_id)
print("Fixed Pumps ID:", globals.fixed_pumps_id)
print("Variable Pumps ID:", globals.variable_pumps_id)
print("Pressure ID:", globals.pressure_id)
print("Demand ID:", globals.demand_id)
print("Quality ID:", globals.quality_id)
print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id)
print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id)
print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns)
print("Source Outflow Region:", region_result)
print('Source Outflow Region ID:', globals.source_outflow_region_id)
print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns)
print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns)
print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id)
print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns)
# 模拟示例1
# run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-14T10:30:00+08:00')
run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-25T23:45:00+08:00')
# 模拟示例2
# run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-02-14T10:30:00+08:00', modify_total_duration=900,
# scheme_Type="burst_Analysis", scheme_Name="scheme1")
@@ -1104,4 +944,3 @@ if __name__ == "__main__":