Files
TJWaterServerBinary/app/services/simulation.py

1353 lines
62 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
from app.services.tjnetwork import *
from app.native.api.s36_wda_cal import *
# from get_real_status import *
from datetime import datetime, timedelta
from math import modf
import os
import json
import pytz
import requests
import time
import shutil
from app.services.epanet.epanet import Output
from typing import Optional, Tuple
import app.infra.db.influxdb.api as influxdb_api
import typing
import psycopg
import logging
import app.services.globals as globals
import uuid
import app.services.project_info as project_info
from app.native.api.postgresql_info import get_pgconn_string
from app.infra.db.timescaledb.internal_queries import (
InternalQueries as TimescaleInternalQueries,
)
from app.infra.db.timescaledb.internal_queries import (
InternalStorage as TimescaleInternalStorage,
)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
def query_corresponding_element_id_and_query_id(name: str) -> None:
"""
查询scada_info这张表中realtime类型的记录中associated_element_id与api_query_id的对应关系
:param name: 数据库名称
:return:
"""
# 连接数据库
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 transmission_mode 为 'realtime' 的记录
cur.execute(
"""
SELECT type, associated_element_id, api_query_id
FROM scada_info
WHERE transmission_mode = 'realtime';
"""
)
records = cur.fetchall()
# 遍历查询结果,并根据 type 将数据存储到相应的字典中
for record in records:
type_, associated_element_id, api_query_id = record
if type_ == "reservoir_liquid_level":
globals.reservoirs_id[associated_element_id] = api_query_id
elif type_ == "tank_liquid_level":
globals.tanks_id[associated_element_id] = api_query_id
elif type_ == "fixed_pump":
globals.fixed_pumps_id[associated_element_id] = api_query_id
elif type_ == "variable_pump":
globals.variable_pumps_id[associated_element_id] = api_query_id
elif type_ == "pressure":
globals.pressure_id[associated_element_id] = api_query_id
elif type_ == "demand":
globals.demand_id[associated_element_id] = api_query_id
elif type_ == "quality":
globals.quality_id[associated_element_id] = api_query_id
else:
# 如果遇到未定义的类型,可以选择记录日志或忽略
print(f"未处理的类型: {type_}")
except psycopg.Error as e:
print(f"数据库连接或查询出错: {e}")
def query_corresponding_pattern_id_and_query_id(name: str) -> None:
"""
查询 scada_info 表中 transmission_mode 为 'realtime',且 type 为 'source_outflow''pipe_flow' 的记录,
提取 associated_pattern 和 api_query_id 的对应关系,并分别存储到对应的字典中。
:param name: 数据库名称
:return:
"""
# 连接数据库
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 transmission_mode 为 'realtime' 且 type 为 'source_outflow' 或 'pipe_flow' 的记录
cur.execute(
"""
SELECT type, associated_pattern, api_query_id
FROM scada_info
WHERE transmission_mode = 'realtime'
AND type IN ('source_outflow', 'pipe_flow');
"""
)
records = cur.fetchall()
# 遍历查询结果,并根据 type 将数据存储到相应的字典中
for record in records:
type_, associated_pattern, api_query_id = record
if type_ == "source_outflow":
globals.source_outflow_pattern_id[associated_pattern] = (
api_query_id
)
elif type_ == "pipe_flow":
globals.realtime_pipe_flow_pattern_id[associated_pattern] = (
api_query_id
)
except psycopg.Error as e:
print(f"数据库连接或查询出错: {e}")
# 2025/01/11
def query_non_realtime_region(name: str) -> dict:
"""
查询 scada_info 表中 transmission_mode 为 'non_realtime',且 type 为 'pipe_flow' 的记录,
提取所有以 'associated_source_outflow_id' 开头的列的值,并将每条记录的这些值作为一个 regionregion1, region2, ...
最后去掉重复的 region并存储到 source_outflow_region 字典中。
:param name: 数据库名字
:return: 包含区域与对应 associated_source_outflow_id 的字典
"""
source_outflow_regions = [] # 用于存储所有 region包含重复的
# 构建连接字符串
conn_string = get_pgconn_string(db_name=name)
try:
# 连接到数据库
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 执行查询,筛选出 transmission_mode 为 'non_realtime' 且 type 为 'pipe_flow' 的记录
cur.execute(
"""
SELECT *
FROM scada_info
WHERE transmission_mode = 'non_realtime'
AND type = 'pipe_flow';
"""
)
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 找出所有以 'associated_source_outflow_id' 开头的列
source_outflow_cols = [
col
for col in col_names
if col.startswith("associated_source_outflow_id")
]
logging.info(
f"Identified source_outflow columns: {source_outflow_cols}"
)
for record in records:
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
values = [
record[col_names.index(col)]
for col in source_outflow_cols
if record[col_names.index(col)] is not None
]
# 如果该记录有相关的值,则将其作为一个 region
if values:
# 将值排序以确保相同的组合顺序一致(如果顺序不重要)
# 如果顺序重要,请删除排序步骤
region_tuple = tuple(sorted(values))
source_outflow_regions.append(region_tuple)
# 移除重复的 regions
unique_regions = []
seen = set()
for region in source_outflow_regions:
if region not in seen:
seen.add(region)
unique_regions.append(region)
# 为每个唯一的 region 分配一个 region 键
for idx, region in enumerate(unique_regions, 1):
region_key = f"region{idx}"
globals.source_outflow_region[region_key] = list(region)
logging.info("查询并处理数据成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.source_outflow_region
# 2025/01/18
def query_non_realtime_region_patterns(
name: str,
source_outflow_region: dict,
column_prefix: str = "associated_source_outflow_id",
) -> dict:
"""
根据 source_outflow_region对 scada_info 表中 transmission_mode 为 'non_realtime'的记录进行分组,
将匹配的记录的 associated_pattern 存入 non_realtime_region_patterns 字典中,同时把用 realtime pipe_flow修正的 non_realtime demand 去掉
:param name: 数据库名称
:param source_outflow_region: 包含区域与对应 associated_source_outflow_id 的字典
:param column_prefix: 需要提取的列的前缀
:return: 包含区域与对应 associated_pattern 的字典
"""
globals.non_realtime_region_patterns = {
region: [] for region in globals.source_outflow_region.keys()
}
region_tuple_to_key = {
frozenset(ids): region for region, ids in globals.source_outflow_region.items()
}
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 执行查询,筛选出 transmission_mode 为 'non_realtime'
cur.execute(
"""
SELECT *
FROM scada_info
WHERE transmission_mode = 'non_realtime'
"""
)
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 找出所有以指定前缀开头的列
source_outflow_cols = [
col for col in col_names if col.startswith(column_prefix)
]
logging.info(
f"Identified source_outflow columns: {source_outflow_cols}"
)
# 确保 'associated_pattern' 列存在
if "associated_pattern" not in col_names:
logging.error(
"'associated_pattern' column not found in scada_info table."
)
return globals.non_realtime_region_patterns
# 获取 'associated_pattern' 列的索引
pattern_idx = col_names.index("associated_pattern")
for record in records:
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
values = [
record[col_names.index(col)]
for col in source_outflow_cols
if record[col_names.index(col)] is not None
]
if values:
# 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配
region_frozenset = frozenset(values)
# 检查是否存在匹配的 region
region_key = region_tuple_to_key.get(region_frozenset)
if region_key:
# 获取 'associated_pattern' 的值
associated_pattern = record[pattern_idx]
if associated_pattern is not None:
globals.non_realtime_region_patterns[region_key].append(
associated_pattern
)
logging.info("生成 regions_patterns 成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
# 获取pipe_flow_region_patterns中的所有区域
exclude_regions = set(
region
for regions in globals.pipe_flow_region_patterns.values()
for region in regions
)
# 从non_realtime_region_patterns中去除这些区域
for region_key, regions in globals.non_realtime_region_patterns.items():
globals.non_realtime_region_patterns[region_key] = [
region for region in regions if region not in exclude_regions
]
return globals.non_realtime_region_patterns
# 2025/01/18
def query_realtime_region_pipe_flow_and_demand_id(
name: str,
source_outflow_region: dict,
column_prefix: str = "associated_source_outflow_id",
) -> dict:
"""
根据 source_outflow_region对 scada_info 表中 transmission_mode 为 'realtime'
且 type 为 'pipe_flow'demand 的记录进行分组,将匹配的记录的 api_query_id 存入 realtime_region_pipe_flow_and_demand_id 字典中。
:param name: 数据库名称
:param source_outflow_region: 包含区域与对应 associated_source_outflow_id 的字典
:param column_prefix: 需要提取的列的前缀
:return: 包含区域与对应 api_query_id 的字典
"""
globals.realtime_region_pipe_flow_and_demand_id = {
region: [] for region in globals.source_outflow_region.keys()
}
# 创建一个映射,从 frozenset(ids) 到 region_key
region_tuple_to_key = {
frozenset(ids): region for region, ids in globals.source_outflow_region.items()
}
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 执行查询,筛选出 transmission_mode 为 'realtime' 且 type 为 'pipe_flow' 或 'demand' 的记录
cur.execute(
"""
SELECT *
FROM scada_info
WHERE transmission_mode = 'realtime'
AND type IN ('pipe_flow', 'demand');
"""
)
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 找出所有以指定前缀开头的列
source_outflow_cols = [
col for col in col_names if col.startswith(column_prefix)
]
logging.info(
f"Identified source_outflow columns: {source_outflow_cols}"
)
# 确保 'api_query_id' 列存在
if "api_query_id" not in col_names:
logging.error(
"'api_query_id' column not found in scada_info table."
)
return globals.realtime_region_pipe_flow_and_demand_id
# 获取 'api_query_id' 列的索引
api_query_id_idx = col_names.index("api_query_id")
for record in records:
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
values = [
record[col_names.index(col)]
for col in source_outflow_cols
if record[col_names.index(col)] is not None
]
if values:
# 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配
region_frozenset = frozenset(values)
# 检查是否存在匹配的 region
region_key = region_tuple_to_key.get(region_frozenset)
if region_key:
# 获取 'api_query_id' 的值
api_query_id = record[api_query_id_idx]
if api_query_id is not None:
globals.realtime_region_pipe_flow_and_demand_id[
region_key
].append(api_query_id)
logging.info("生成 realtime_region_pipe_flow_and_demand_id 成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.realtime_region_pipe_flow_and_demand_id
# 2025/01/17
def query_pipe_flow_region_patterns(
name: str, column_prefix: str = "associated_pipe_flow_id"
) -> dict:
"""
查询 scada_info 表中 type 为 'demand' 且 transmission_mode 为 'non_realtime' 的记录,
记录该记录的 associated_pattern。
如果该记录的 associated_pipe_flow_id 存在,
且根据 associated_pipe_flow_id 查询的 associated_element_id 对应的记录的 transmission_mode 为 'realtime'
则将该记录的 associated_pattern 作为值记录到字典中,字典的 key 为 pipe_flow 类的 associated_pattern。
字典样式为:{'region1': ['P17021', 'ZBBGXSZW000377'], 'region2': ['P16504']}
:param name: 数据库名称
:param column_prefix: 需要提取的列的前缀
:return: pipe_flow_region_patterns 字典
"""
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 type 为 'demand' 且 transmission_mode 为 'non_realtime' 的记录
cur.execute(
"""
SELECT associated_pattern, associated_pipe_flow_id
FROM scada_info
WHERE type = 'demand'
AND transmission_mode = 'non_realtime';
"""
)
records = cur.fetchall()
col_names = [desc.name for desc in cur.description]
# 获取列索引
pattern_idx = col_names.index("associated_pattern")
pipe_flow_id_idx = col_names.index("associated_pipe_flow_id")
for record in records:
associated_pattern = record[pattern_idx]
associated_pipe_flow_id = record[pipe_flow_id_idx]
if associated_pipe_flow_id:
# 根据 associated_pipe_flow_id 查询对应的记录
cur.execute(
"""
SELECT associated_pattern, transmission_mode
FROM scada_info
WHERE associated_element_id = %s;
""",
(associated_pipe_flow_id,),
)
pipe_flow_record = cur.fetchone()
if pipe_flow_record:
pipe_flow_associated_pattern = pipe_flow_record[0]
transmission_mode = pipe_flow_record[1]
if transmission_mode == "realtime":
# 将 associated_pattern 记录到字典中
if (
pipe_flow_associated_pattern
not in globals.pipe_flow_region_patterns
):
globals.pipe_flow_region_patterns[
pipe_flow_associated_pattern
] = []
globals.pipe_flow_region_patterns[
pipe_flow_associated_pattern
].append(associated_pattern)
logging.info("生成 pipe_flow_region_patterns 成功。")
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.pipe_flow_region_patterns
# 2025/02/15
def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict:
"""
在地图上拾取SCADA元素后获取SCADA_ID在pg数据库中查询该SCADA设备对应的associated_element_id和api_query_id
:param name: pg数据库的名称
:param SCADA_ID: SCADA设备的ID
:return: 包含associated_element_id和api_query_id的字典
"""
conn_string = get_pgconn_string(db_name=name)
try:
# 使用 psycopg.connect 创建连接
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 执行查询
query = """
SELECT associated_element_id, API_query_id
FROM scada_info
WHERE id = %s
"""
cur.execute(query, (SCADA_ID,)) # 执行查询并传递参数
# 获取查询结果
result = cur.fetchone()
if result:
# 将结果转换为字典
associated_info = {
"associated_element_id": result[0],
"API_query_id": result[1],
}
return associated_info
else:
# 如果没有找到记录
return None
except Exception as e:
print(f"An error occurred: {e}")
return None
# 2025/01/11
def get_source_outflow_region_id(
name: str,
source_outflow_region: dict,
column_prefix: str = "associated_source_outflow_id",
) -> dict:
"""
基于 source_outflow_region将其中的 associated_source_outflow_id 替换为对应的 api_query_id
生成新的字典 source_outflow_region_id。
:param name: 数据库名称
:param source_outflow_region: 包含区域与对应 associated_source_outflow_id 的字典
:param column_prefix: 需要提取的列的前缀
:return: 包含区域与对应 api_query_id 的字典
"""
globals.source_outflow_region_id = {
region: [] for region in globals.source_outflow_region.keys()
}
# 提取所有唯一的 associated_source_outflow_id
all_ids = set()
for ids in globals.source_outflow_region.values():
all_ids.update(ids)
if not all_ids:
logging.warning(
"No associated_source_outflow_id found in source_outflow_region."
)
return globals.source_outflow_region_id
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 查询 associated_element_id 和 api_query_id
query = f"""
SELECT associated_element_id, api_query_id
FROM scada_info
WHERE associated_element_id = ANY(%s)
"""
cur.execute(query, (list(all_ids),))
rows = cur.fetchall()
# 构建 associated_source_outflow_id 到 api_query_id 的映射
id_to_api_query_id = {}
for row in rows:
associated_id = row[0]
api_query_id = row[1]
if associated_id in all_ids and api_query_id is not None:
id_to_api_query_id[associated_id] = str(api_query_id)
# 替换 source_outflow_region 中的 associated_source_outflow_id 为 api_query_id
for region, ids in globals.source_outflow_region.items():
for id_ in ids:
api_id = id_to_api_query_id.get(id_)
if api_id:
globals.source_outflow_region_id[region].append(api_id)
else:
logging.warning(
f"No api_query_id found for associated_source_outflow_id: {id_}"
)
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return globals.source_outflow_region_id
# 2025/01/18
def get_realtime_region_patterns(
name: str,
source_outflow_region_id: dict,
realtime_region_pipe_flow_and_demand_id: dict,
) -> Tuple[dict, dict]:
"""
根据每个 region从 scada_info 表中查询 api_query_id 对应的 associated_pattern。
将结果分别存储到 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 两个字典中。
:param name: 数据库名称
:param source_outflow_region_id: 包含 region 与对应 api_query_id 的字典
:param realtime_region_pipe_flow_and_demand_id: 包含 region 与对应 api_query_id 的字典
:return: source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 两个字典
"""
# 初始化返回的字典
globals.source_outflow_region_patterns = {
region: [] for region in globals.source_outflow_region_id.keys()
}
globals.realtime_region_pipe_flow_and_demand_patterns = {
region: [] for region in globals.realtime_region_pipe_flow_and_demand_id.keys()
}
conn_string = get_pgconn_string(db_name=name)
try:
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 遍历每个 region
for region in globals.source_outflow_region_id.keys():
# 获取 source_outflow_region_id 的 api_query_id 并查询 associated_pattern
source_outflow_api_ids = globals.source_outflow_region_id[region]
if source_outflow_api_ids:
api_query_ids_str = ", ".join(
[f"'{api_id}'" for api_id in source_outflow_api_ids]
)
cur.execute(
f"""
SELECT api_query_id, associated_pattern
FROM scada_info
WHERE api_query_id IN ({api_query_ids_str});
"""
)
results = cur.fetchall()
globals.source_outflow_region_patterns[region] = [
associated_pattern
for _, associated_pattern in results
if associated_pattern
]
# 获取 realtime_region_pipe_flow_and_demand_id 的 api_query_id 并查询 associated_pattern
realtime_api_ids = globals.realtime_region_pipe_flow_and_demand_id[
region
]
if realtime_api_ids:
api_query_ids_str = ", ".join(
[f"'{api_id}'" for api_id in realtime_api_ids]
)
cur.execute(
f"""
SELECT api_query_id, associated_pattern
FROM scada_info
WHERE api_query_id IN ({api_query_ids_str});
"""
)
results = cur.fetchall()
globals.realtime_region_pipe_flow_and_demand_patterns[
region
] = [
associated_pattern
for _, associated_pattern in results
if associated_pattern
]
logging.info(
"生成 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 成功。"
)
except psycopg.Error as e:
logging.error(f"数据库连接或查询出错: {e}")
except Exception as ex:
logging.error(f"处理数据时出错: {ex}")
return (
globals.source_outflow_region_patterns,
globals.realtime_region_pipe_flow_and_demand_patterns,
)
def get_pattern_index(cur_datetime: str) -> int:
"""
根据给定的日期时间字符串,计算并返回对应的模式索引。
:param cur_datetime: str, 当前的日期时间字符串格式为“YYYY-MM-DD HH:MM:SS”。
:return: int, 基于预定义的时间步长 PATTERN_TIME_STEP。
"""
str_format = "%Y-%m-%d %H:%M:%S"
dt = datetime.strptime(cur_datetime, str_format)
hr = dt.hour
mnt = dt.minute
i = int((hr * 60 + mnt) / globals.PATTERN_TIME_STEP)
return i
def get_pattern_index_str(current_time: str) -> str:
"""
根据当前时间获取时间步长的模式索引并将其格式化为“HH:MM:00”字符串。
:param current_time: str, 当前时间,格式为"YYYY-MM-DD HH:MM:SS"
:return: str 以“HH:MM:00”格式返回
"""
i = get_pattern_index(current_time)
[minN, hrN] = modf(i * globals.PATTERN_TIME_STEP / 60)
minN_str = str(int(minN * 60))
minN_str = minN_str.zfill(2)
hrN_str = str(int(hrN))
hrN_str = hrN_str.zfill(2)
str_i = "{}:{}:00".format(hrN_str, minN_str)
return str_i
def from_seconds_to_clock(secs: int) -> str:
"""
从秒格式化为“HH:MM:00”字符串
:param secs: int
:return: str 以“HH:MM:00”格式返回
"""
hrs = int(secs / 3600)
minutes = int((secs - hrs * 3600) / 60)
seconds = secs - hrs * 3600 - minutes * 60
hrs_str = str(hrs).zfill(2)
minutes_str = str(minutes).zfill(2)
seconds_str = str(seconds).zfill(2)
str_clock = "{}:{}:{}".format(hrs_str, minutes_str, seconds_str)
return str_clock
def convert_time_format(original_time: str) -> str:
"""
格式转换将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00”
:param original_time: str “2024-04-13T08:00:00+08:00"格式的时间
:return: str“2024-04-13 08:00:00”格式的时间
"""
new_time = original_time.replace("T", " ")
new_time = new_time.replace("+08:00", "")
return new_time
def get_history_pattern_info(project_name, pattern_name):
"""读取选定pattern的保存的历史pattern信息flow和factor"""
flow_list = []
factor_list = []
patterns_info = read_all(
project_name,
f"select * from history_patterns_flows where id = '{pattern_name}' order by _order",
)
for item in patterns_info:
flow_list.append(float(item["flow"]))
factor_list.append(float(item["factor"]))
return flow_list, factor_list
# 2025/01/11
def run_simulation(
name: str,
simulation_type: str,
modify_pattern_start_time: str,
modify_total_duration: int = 0,
modify_reservoir_head_pattern: dict[str, list] = None,
modify_tank_initial_level: dict[str, float] = None,
modify_junction_base_demand: dict[str, float] = None,
modify_junction_damand_pattern: dict[str, list] = None,
modify_fixed_pump_pattern: dict[str, list] = None,
modify_variable_pump_pattern: dict[str, list] = None,
modify_valve_opening: dict[str, float] = None,
scheme_type: str = None,
scheme_name: str = None,
) -> None:
"""
传入需要修改的参数,改变数据库中对应位置的值,然后计算,返回结果
:param name: 模型名称,数据库中对应的名字
:param simulation_type: 模拟的类型realtime为实时模拟修改原数据库extended为多步长模拟需要复制数据库
:param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00'
:param modify_total_duration: 模拟总历时,秒
:param modify_reservoir_head_pattern: dict中包含多个水库模式str为水库head_pattern的idlist为修改后的head_pattern
:param modify_tank_initial_level: dict中包含多个水塔str为水塔的idfloat为修改后的initial_level
:param modify_junction_base_demand: dict中包含多个节点str为节点的idfloat为修改后的base_demand
:param modify_junction_damand_pattern: dict中包含多个节点模式str为节点demand_pattern的idlist为修改后的demand_pattern
:param modify_fixed_pump_pattern: dict中包含多个水泵模式str为工频水泵的idlist为修改后的pattern
:param modify_variable_pump_pattern: dict中包含多个水泵模式str为变频水泵的idlist为修改后的pattern
:param modify_valve_opening: dict中包含多个阀门开启度str为阀门的idfloat为修改后的阀门开启度
:param scheme_type: 模拟方案类型
:param scheme_name模拟方案名称
:return:
"""
# 记录开始时间
time_cost_start = time.perf_counter()
print("name", name)
print("simulation_type", simulation_type)
print("modify_pattern_start_time", modify_pattern_start_time)
print("modify_total_duration", modify_total_duration)
print("modify_reservoir_head_pattern", modify_reservoir_head_pattern)
print("modify_tank_initial_level", modify_tank_initial_level)
print("modify_junction_base_demand", modify_junction_base_demand)
print(
"{} -- Hydraulic simulation started.".format(
datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
)
)
# 判断是实时模拟还是多步长模拟
# if simulation_type.upper() == 'REALTIME': # 实时模拟(修改原数据库)
# name_c = name
# elif simulation_type.upper() == 'EXTENDED': # 扩展模拟(复制数据库)
# name_c = '_'.join([name, 'c'])
# if have_project(name_c):
# if is_project_open(name_c):
# close_project(name_c)
# delete_project(name_c)
# copy_project(name, name_c) # 备份项目
# else:
# raise Exception('Incorrect simulation type, choose in (realtime, extended)')
name_c = name
# 打开数据库
open_project(name_c)
dic_time = get_time(name_c)
print(dic_time)
# 获取水力模拟步长0:15:00
globals.hydraulic_timestep = dic_time["HYDRAULIC TIMESTEP"]
# 将时间字符串转换为 timedelta 对象
time_obj = datetime.strptime(globals.hydraulic_timestep, "%H:%M:%S")
# 转换为分钟浮点数
globals.PATTERN_TIME_STEP = float(
time_obj.hour * 60 + time_obj.minute + time_obj.second / 60
)
# 对输入的时间参数进行处理
pattern_start_time = convert_time_format(modify_pattern_start_time)
# 获取模拟开始时间是对应pattern的第几个数
modify_index = get_pattern_index(pattern_start_time)
# 遍历水泵的pattern_id并根据输入的pump_pattern修改pattern的值
# for pump_pattern_id in pump_pattern_ids:
# # 检查pump_pattern中pump_pattern_id对应的第一个频率值是否为有效数字非空、非NaN。如果该值有效则继续执行代码块。
# if not np.isnan(modify_pump_pattern[pump_pattern_id][0]):
# # 取出数据库中的pattern
# pump_pattern = get_pattern(name_c, get_pump(name_c, pump_pattern_id)['pattern'])
# # 替换数据库中的pattern为modify_pump_pattern
# pump_pattern['factors'][modify_index: modify_index + len(modify_pump_pattern[pump_pattern_id])] \
# = modify_pump_pattern[pump_pattern_id]
# cs = ChangeSet()
# cs.append(pump_pattern)
# set_pattern(name_c, cs)
# 修改模拟开始的时间
str_pattern_start = get_pattern_index_str(
convert_time_format(modify_pattern_start_time)
)
dic_time = get_time(name_c)
dic_time["PATTERN START"] = str_pattern_start
dic_time["DURATION"] = from_seconds_to_clock(modify_total_duration)
if simulation_type.upper() == "REALTIME":
dic_time["DURATION"] = 0
cs = ChangeSet()
cs.operations.append(dic_time)
set_time(name_c, cs)
# 根据SCADA实时数据进行修改如果没有对应的SCADA数据如未来的时间点则不改变pg数据库的数据
if globals.reservoirs_id:
# reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'}
# 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'}
reservoir_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.reservoirs_id.values()),
query_time=modify_pattern_start_time,
)
# 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'}
reservoir_dict = {
key: reservoir_SCADA_data_dict[value]
for key, value in globals.reservoirs_id.items()
}
# 3.修改reservoir液位模式
for reservoir_name, value in reservoir_dict.items():
if value and float(value) != 0:
# 先根据reservoir获取对应的pattern再对pattern进行修改
reservoir_pattern = get_pattern(
name_c, get_reservoir(name_c, reservoir_name)["pattern"]
)
reservoir_pattern["factors"][modify_index] = (
float(value) + globals.RESERVOIR_BASIC_HEIGHT
)
cs = ChangeSet()
cs.append(reservoir_pattern)
set_pattern(name_c, cs)
if globals.tanks_id:
# 修改tank初始液位
tank_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.tanks_id.values()),
query_time=modify_pattern_start_time,
)
tank_dict = {
key: tank_SCADA_data_dict[value] for key, value in globals.tanks_id.items()
}
for tank_name, value in tank_dict.items():
if value and float(value) != 0:
tank = get_tank(name_c, tank_name)
tank["init_level"] = float(value)
cs = ChangeSet()
cs.append(tank)
set_tank(name_c, cs)
if globals.fixed_pumps_id:
# 修改工频泵的pattern
fixed_pump_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.fixed_pumps_id.values()),
query_time=modify_pattern_start_time,
)
# print(fixed_pump_SCADA_data_dict)
fixed_pump_dict = {
key: fixed_pump_SCADA_data_dict[value]
for key, value in globals.fixed_pumps_id.items()
}
# print(fixed_pump_dict)
for fixed_pump_name, value in fixed_pump_dict.items():
if value:
pump_pattern = get_pattern(
name_c, get_pump(name_c, fixed_pump_name)["pattern"]
)
# print(pump_pattern)
pump_pattern["factors"][modify_index] = float(value)
# print(pump_pattern['factors'][modify_index])
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
if globals.variable_pumps_id:
# 修改变频泵的pattern
variable_pump_SCADA_data_dict = (
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.variable_pumps_id.values()),
query_time=modify_pattern_start_time,
)
)
variable_pump_dict = {
key: variable_pump_SCADA_data_dict[value]
for key, value in globals.variable_pumps_id.items()
}
for variable_pump_name, value in variable_pump_dict.items():
if value:
pump_pattern = get_pattern(
name_c, get_pump(name_c, variable_pump_name)["pattern"]
)
pump_pattern["factors"][modify_index] = float(value) / 50
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
if globals.demand_id:
# 基于实时数据修改大用户节点的pattern
demand_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.demand_id.values()),
query_time=modify_pattern_start_time,
)
demand_dict = {
key: demand_SCADA_data_dict[value]
for key, value in globals.demand_id.items()
}
for demand_name, value in demand_dict.items():
if value:
demand_pattern = get_pattern(
name_c, get_demand(name_c, demand_name)["pattern"]
)
if get_option(name_c)["UNITS"] == "LPS":
demand_pattern["factors"][modify_index] = (
float(value) / 3.6
) # 默认SCADA数据获取的是流量单位是m3/h 转换为 L/s
elif get_option(name_c)["UNITS"] == "CMH":
demand_pattern["factors"][modify_index] = float(value)
cs = ChangeSet()
cs.append(demand_pattern)
set_pattern(name_c, cs)
# 水质、压力实时数据使用方法待补充
#############################
if globals.source_outflow_pattern_id:
# 基于实时的出厂流量计数据修改出厂流量计绑定的pattern
source_outflow_SCADA_data_dict = (
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.source_outflow_pattern_id.values()),
query_time=modify_pattern_start_time,
)
)
# print(source_outflow_SCADA_data_dict)
source_outflow_dict = {
key: source_outflow_SCADA_data_dict[value]
for key, value in globals.source_outflow_pattern_id.items()
}
# print(source_outflow_dict)
for pattern_name in source_outflow_dict.keys():
# print(pattern_name)
history_source_outflow_flow_list, history_source_outflow_factor_list = (
get_history_pattern_info(name_c, pattern_name)
)
history_source_outflow_flow = history_source_outflow_flow_list[modify_index]
history_source_outflow_factor = history_source_outflow_factor_list[
modify_index
]
# print(source_outflow_dict[pattern_name])
# print(history_source_outflow_flow)
# print(history_source_outflow_factor)
if source_outflow_dict[pattern_name]:
realtime_source_outflow = float(source_outflow_dict[pattern_name])
multiply_factor = realtime_source_outflow / history_source_outflow_flow
# print(multiply_factor)
pattern = get_pattern(name_c, pattern_name)
pattern["factors"][modify_index] = (
multiply_factor * history_source_outflow_factor
)
# print(pattern['factors'][modify_index])
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
if globals.realtime_pipe_flow_pattern_id:
# 基于实时的pipe_flow类数据修改pipe_flow类绑定的pattern
realtime_pipe_flow_SCADA_data_dict = (
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(globals.realtime_pipe_flow_pattern_id.values()),
query_time=modify_pattern_start_time,
)
)
realtime_pipe_flow_dict = {
key: realtime_pipe_flow_SCADA_data_dict[value]
for key, value in globals.realtime_pipe_flow_pattern_id.items()
}
for pattern_name in realtime_pipe_flow_dict.keys():
history_pipe_flow_flow_list, history_pipe_flow_factor_list = (
get_history_pattern_info(name_c, pattern_name)
)
history_pipe_flow_flow = history_pipe_flow_flow_list[modify_index]
history_pipe_flow_factor = history_pipe_flow_factor_list[modify_index]
if realtime_pipe_flow_dict[pattern_name]:
realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name])
multiply_factor = realtime_pipe_flow / history_pipe_flow_flow
pattern = get_pattern(name_c, pattern_name)
pattern["factors"][modify_index] = (
multiply_factor * history_pipe_flow_factor
)
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
if globals.pipe_flow_region_patterns:
# 基于实时的pipe_flow类数据修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern
temp_realtime_pipe_flow_pattern_id = {}
# 遍历 pipe_flow_region_patterns 字典的 key
for (
pipe_flow_region,
demand_patterns,
) in globals.pipe_flow_region_patterns.items():
# 获取对应的实时值
query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region)
temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id
temp_realtime_pipe_flow_SCADA_data_dict = (
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=list(temp_realtime_pipe_flow_pattern_id.values()),
query_time=modify_pattern_start_time,
)
)
temp_realtime_pipe_flow_dict = {
key: temp_realtime_pipe_flow_SCADA_data_dict[value]
for key, value in temp_realtime_pipe_flow_pattern_id.items()
}
for pattern_name in temp_realtime_pipe_flow_dict.keys():
temp_history_pipe_flow_flow_list, temp_history_pipe_flow_factor_list = (
get_history_pattern_info(name_c, pattern_name)
)
temp_history_pipe_flow_flow = temp_history_pipe_flow_flow_list[modify_index]
if temp_realtime_pipe_flow_dict[pattern_name]:
temp_realtime_pipe_flow = float(
temp_realtime_pipe_flow_dict[pattern_name]
)
temp_multiply_factor = (
temp_realtime_pipe_flow / temp_history_pipe_flow_flow
)
temp_non_realtime_demand_pattern_list = (
globals.pipe_flow_region_patterns[pattern_name]
)
for demand_pattern_name in temp_non_realtime_demand_pattern_list:
(
history_non_realtime_demand_flow_list,
history_non_realtime_demand_factor_list,
) = get_history_pattern_info(name_c, demand_pattern_name)
history_non_realtime_demand_factor = (
history_non_realtime_demand_factor_list[modify_index]
)
pattern = get_pattern(name_c, demand_pattern_name)
pattern["factors"][modify_index] = (
temp_multiply_factor * history_non_realtime_demand_factor
)
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
if globals.source_outflow_region:
# 根据associated_source_outflow_id进行分区各分区用出厂的流量计 - 实时的pipe_flow和demand进行数据更新
for region in globals.source_outflow_region.keys():
temp_source_outflow_region_id = globals.source_outflow_region_id.get(
region, []
)
temp_realtime_region_pipe_flow_and_demand_id = (
globals.realtime_region_pipe_flow_and_demand_id.get(region, [])
)
temp_source_outflow_region_patterns = (
globals.source_outflow_region_patterns.get(region, [])
)
temp_realtime_region_pipe_flow_and_demand_patterns = (
globals.realtime_region_pipe_flow_and_demand_patterns.get(region, [])
)
temp_non_realtime_region_patterns = (
globals.non_realtime_region_patterns.get(region, [])
)
region_source_outflow_data_dict = (
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=temp_source_outflow_region_id,
query_time=modify_pattern_start_time,
)
)
region_realtime_region_pipe_flow_and_demand_data_dict = (
TimescaleInternalQueries.query_scada_by_ids_time(
device_ids=temp_realtime_region_pipe_flow_and_demand_id,
query_time=modify_pattern_start_time,
)
)
# 2025/02/12 确保 region_source_outflow_data_dict 和
# region_realtime_region_pipe_flow_and_demand_data_dict中的每个值都不是 None 且不为 0
region_source_outflow_valid_values = [
float(value)
for value in region_source_outflow_data_dict.values()
if value not in [None, 0]
]
valid_values = [
float(value)
for value in region_realtime_region_pipe_flow_and_demand_data_dict.values()
if value not in [None, 0]
]
# 如果都非空,则执行 sum 操作
if region_source_outflow_valid_values and valid_values:
region_total_source_outflow = sum(region_source_outflow_valid_values)
history_region_total_source_outflow = 0
for source_outflow_pattern_name in temp_source_outflow_region_patterns:
(
temp_history_source_outflow_flow_list,
temp_history_source_outflow_factor_list,
) = get_history_pattern_info(name_c, source_outflow_pattern_name)
history_region_total_source_outflow += (
temp_history_source_outflow_flow_list[modify_index]
)
region_total_realtime_region_pipe_flow_and_demand = sum(valid_values)
history_region_total_realtime_region_pipe_flow_and_demand = 0
for (
pipe_flow_and_demand_pattern_name
) in temp_realtime_region_pipe_flow_and_demand_patterns:
(
temp_history_pipe_flow_and_demand_flow_list,
temp_history_pipe_flow_and_demand_factor_list,
) = get_history_pattern_info(
name_c, pipe_flow_and_demand_pattern_name
)
history_region_total_realtime_region_pipe_flow_and_demand += (
temp_history_pipe_flow_and_demand_flow_list[modify_index]
)
temp_multiply_factor = (
region_total_source_outflow
- region_total_realtime_region_pipe_flow_and_demand
) / (
history_region_total_source_outflow
- history_region_total_realtime_region_pipe_flow_and_demand
)
for (
non_realtime_region_pattern_name
) in temp_non_realtime_region_patterns:
(
history_non_realtime_region_pattern_flow_list,
history_non_realtime_region_pattern_factor_list,
) = get_history_pattern_info(
name_c, non_realtime_region_pattern_name
)
history_non_realtime_region_pattern_factor = (
history_non_realtime_region_pattern_factor_list[modify_index]
)
pattern = get_pattern(name_c, non_realtime_region_pattern_name)
pattern["factors"][modify_index] = (
temp_multiply_factor
* history_non_realtime_region_pattern_factor
)
cs = ChangeSet()
cs.append(pattern)
set_pattern(name_c, cs)
# 根据输入的参数进行数据修改后面修改的可以覆盖前面的用于EXTENDED类的方案模拟
# 修改清水池(reservoir)液位的pattern
if modify_reservoir_head_pattern:
for reservoir_name in modify_reservoir_head_pattern.keys():
# 这句代码的作用是判断modify_reservoir_head_pattern[reservoir_name][0]是否不是NaN。
# 如果modify_reservoir_head_pattern[reservoir_name][0]不是NaN则条件成立代码块会执行
if not np.isnan(modify_reservoir_head_pattern[reservoir_name][0]):
# 给 list 中的所有元素加上 RESERVOIR_BASIC_HEIGHT
modified_values = [
value + globals.RESERVOIR_BASIC_HEIGHT
for value in modify_reservoir_head_pattern[reservoir_name]
]
reservoir_pattern = get_pattern(
name_c, get_reservoir(name_c, reservoir_name)["pattern"]
)
reservoir_pattern["factors"][
modify_index : modify_index + len(modified_values)
] = modified_values
cs = ChangeSet()
cs.append(reservoir_pattern)
set_pattern(name_c, cs)
# 修改调节池(tank)初始液位
if modify_tank_initial_level:
for tank_name in modify_tank_initial_level.keys():
if (not np.isnan(modify_tank_initial_level[tank_name])) and (
modify_tank_initial_level[tank_name] != 0
):
tank = get_tank(name_c, tank_name)
tank["init_level"] = modify_tank_initial_level[tank_name]
cs = ChangeSet()
cs.append(tank)
set_tank(name_c, cs)
# 修改节点junction基础水量demand
if modify_junction_base_demand:
for junction_name in modify_junction_base_demand.keys():
if not np.isnan(modify_junction_base_demand[junction_name]):
junction = get_demand(name_c, junction_name)
junction["demand"] = modify_junction_base_demand[junction_name]
cs = ChangeSet()
cs.append(junction)
set_demand(name_c, cs)
# 修改节点junction的水量模式pattern
if modify_junction_damand_pattern:
for pattern_name in modify_junction_damand_pattern.keys():
if not np.isnan(modify_junction_damand_pattern[pattern_name][0]):
junction_pattern = get_pattern(name_c, pattern_name)
junction_pattern["factors"][
modify_index : modify_index
+ len(modify_junction_damand_pattern[pattern_name])
] = modify_junction_damand_pattern[pattern_name]
cs = ChangeSet()
cs.append(junction_pattern)
set_pattern(name_c, cs)
# 修改工频水泵fixed_pump的pattern
if modify_fixed_pump_pattern:
for pump_name in modify_fixed_pump_pattern.keys():
if not np.isnan(modify_fixed_pump_pattern[pump_name][0]):
pump_pattern = get_pattern(
name_c, get_pump(name_c, pattern_name)["pattern"]
)
pump_pattern["factors"][
modify_index : modify_index + len(modify_fixed_pump_pattern)
] = modify_fixed_pump_pattern[pump_name]
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
# 修改变频水泵variable_pump的pattern
if modify_variable_pump_pattern:
for pump_name in modify_variable_pump_pattern.keys():
if not np.isnan(modify_variable_pump_pattern[pump_name][0]):
# 给 list 中的所有元素除以 50Hz
modified_values = [
value / 50 for value in modify_variable_pump_pattern[pump_name]
]
pump_pattern = get_pattern(
name_c, get_pump(name_c, pattern_name)["pattern"]
)
pump_pattern["factors"][
modify_index : modify_index + len(modified_values)
] = modified_values
cs = ChangeSet()
cs.append(pump_pattern)
set_pattern(name_c, cs)
# 修改阀门valve的状态setting和status
if modify_valve_opening:
for valve_name in modify_valve_opening.keys():
if not np.isnan(modify_valve_opening[valve_name]):
valve_status = get_status(name_c, valve_name)
if modify_valve_opening[valve_name] == 0:
valve_status["status"] = "CLOSED"
valve_status["setting"] = 0
elif modify_valve_opening[valve_name] < 1:
valve_status["status"] = "OPEN"
valve_status["setting"] = 0.1036 * pow(
modify_valve_opening[valve_name], -3.105
)
elif modify_valve_opening[valve_name] == 1:
valve_status["status"] = "OPEN"
valve_status["setting"] = 0
cs = ChangeSet()
cs.append(valve_status)
set_status(name_c, cs)
# 运行并返回结果
run_project(name_c)
time_cost_end = time.perf_counter()
print(
"{} -- Hydraulic simulation finished, cost time: {:.2f} s.".format(
datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"),
time_cost_end - time_cost_start,
)
)
# DingZQ 下面这几句一定要这样,不然读取不了
# time.sleep(5) # wait 5 seconds
# TODO: 2025/03/24
# DingZQ 这个名字要用随机数来处理
tmp_file = f"./temp/simulation_{uuid.uuid4()}.result.out"
shutil.copy(f"./temp/{name_c}.db.opt", tmp_file)
output = Output(tmp_file)
node_result = output.node_results()
link_result = output.link_results()
# link_flow = []
# for link in link_result:
# link_flow.append(link['result'][-1]['flow'])
# print(link_flow)
num_periods_result = output.times()["num_periods"]
print("simulation_type", simulation_type)
print("before store result")
# print(num_periods_result)
# print(node_result)
# 存储
starttime = time.time()
if simulation_type.upper() == "REALTIME":
TimescaleInternalStorage.store_realtime_simulation(
node_result, link_result, modify_pattern_start_time
)
elif simulation_type.upper() == "EXTENDED":
TimescaleInternalStorage.store_scheme_simulation(
scheme_type,
scheme_name,
node_result,
link_result,
modify_pattern_start_time,
num_periods_result,
)
endtime = time.time()
logging.info("store time: %f", endtime - starttime)
# 暂不需要再次存储 SCADA 模拟信息
# TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
# if simulation_type.upper() == "REALTIME":
# influxdb_api.store_realtime_simulation_result_to_influxdb(
# node_result, link_result, modify_pattern_start_time
# )
# elif simulation_type.upper() == "EXTENDED":
# influxdb_api.store_scheme_simulation_result_to_influxdb(
# node_result,
# link_result,
# modify_pattern_start_time,
# num_periods_result,
# scheme_type,
# scheme_name,
# )
# 暂不需要再次存储 SCADA 模拟信息
# influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
print("after store result")
del output
os.remove(tmp_file)
if __name__ == "__main__":
# 计算前获取scada_info中的信息按照设定的方法修改pg数据库
query_corresponding_element_id_and_query_id(project_info.name)
query_corresponding_pattern_id_and_query_id(project_info.name)
region_result = query_non_realtime_region(project_info.name)
globals.source_outflow_region_id = get_source_outflow_region_id(
project_info.name, region_result
)
globals.realtime_region_pipe_flow_and_demand_id = (
query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result)
)
globals.pipe_flow_region_patterns = query_pipe_flow_region_patterns(
project_info.name
)
globals.non_realtime_region_patterns = query_non_realtime_region_patterns(
project_info.name, region_result
)
(
globals.source_outflow_region_patterns,
globals.realtime_region_pipe_flow_and_demand_patterns,
) = get_realtime_region_patterns(
project_info.name,
globals.source_outflow_region_id,
globals.realtime_region_pipe_flow_and_demand_id,
)
# 基础日期和时间(日期部分保持不变)
base_date = datetime(2025, 5, 4)
# 循环生成96个时间点15分钟间隔
for i in range(96):
# 计算当前时间偏移
time_offset = timedelta(minutes=15 * i)
# 生成完整时间对象
current_time = base_date + time_offset
# 格式化成ISO8601带时区格式
iso_time = current_time.strftime("%Y-%m-%dT%H:%M:%S") + "+08:00"
# 执行函数调用
run_simulation(
name=project_info.name,
simulation_type="realtime",
modify_pattern_start_time=iso_time,
)
# 打印字典内容以验证
# print("Reservoirs ID:", globals.reservoirs_id)
# print("Tanks ID:", globals.tanks_id)
# print("Fixed Pumps ID:", globals.fixed_pumps_id)
# print("Variable Pumps ID:", globals.variable_pumps_id)
# print("Pressure ID:", globals.pressure_id)
# print("Demand ID:", globals.demand_id)
# print("Quality ID:", globals.quality_id)
# print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id)
# print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id)
# print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns)
# print("Source Outflow Region:", region_result)
# print('Source Outflow Region ID:', globals.source_outflow_region_id)
# print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns)
# print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns)
# print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id)
# print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns)
# dump_inp(name='bb', inp="sensor_placement.inp", version='2')
# 模拟示例1
# run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-25T23:45:00+08:00')
# 模拟示例2
# run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-03-10T12:00:00+08:00',
# modify_total_duration=1800, scheme_type="burst_Analysis", scheme_name="scheme1")
# 查询示例1query_SCADA_ID_corresponding_info
# result = query_SCADA_ID_corresponding_info(name='bb', SCADA_ID='P10755')
# print(result)