1353 lines
62 KiB
Python
1353 lines
62 KiB
Python
import numpy as np
|
||
from app.services.tjnetwork import *
|
||
from app.native.api.s36_wda_cal import *
|
||
|
||
# from get_real_status import *
|
||
from datetime import datetime, timedelta
|
||
from math import modf
|
||
import os
|
||
import json
|
||
import pytz
|
||
import requests
|
||
import time
|
||
import shutil
|
||
from app.services.epanet.epanet import Output
|
||
from typing import Optional, Tuple
|
||
import app.infra.db.influxdb.api as influxdb_api
|
||
import typing
|
||
import psycopg
|
||
import logging
|
||
import app.services.globals as globals
|
||
import uuid
|
||
import app.services.project_info as project_info
|
||
from app.native.api.postgresql_info import get_pgconn_string
|
||
from app.infra.db.timescaledb.internal_queries import (
|
||
InternalQueries as TimescaleInternalQueries,
|
||
)
|
||
from app.infra.db.timescaledb.internal_queries import (
|
||
InternalStorage as TimescaleInternalStorage,
|
||
)
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||
)
|
||
|
||
|
||
def query_corresponding_element_id_and_query_id(name: str) -> None:
|
||
"""
|
||
查询scada_info这张表中,realtime类型的记录中,associated_element_id与api_query_id的对应关系
|
||
:param name: 数据库名称
|
||
:return:
|
||
"""
|
||
# 连接数据库
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 查询 transmission_mode 为 'realtime' 的记录
|
||
cur.execute(
|
||
"""
|
||
SELECT type, associated_element_id, api_query_id
|
||
FROM scada_info
|
||
WHERE transmission_mode = 'realtime';
|
||
"""
|
||
)
|
||
records = cur.fetchall()
|
||
# 遍历查询结果,并根据 type 将数据存储到相应的字典中
|
||
for record in records:
|
||
type_, associated_element_id, api_query_id = record
|
||
if type_ == "reservoir_liquid_level":
|
||
globals.reservoirs_id[associated_element_id] = api_query_id
|
||
elif type_ == "tank_liquid_level":
|
||
globals.tanks_id[associated_element_id] = api_query_id
|
||
elif type_ == "fixed_pump":
|
||
globals.fixed_pumps_id[associated_element_id] = api_query_id
|
||
elif type_ == "variable_pump":
|
||
globals.variable_pumps_id[associated_element_id] = api_query_id
|
||
elif type_ == "pressure":
|
||
globals.pressure_id[associated_element_id] = api_query_id
|
||
elif type_ == "demand":
|
||
globals.demand_id[associated_element_id] = api_query_id
|
||
elif type_ == "quality":
|
||
globals.quality_id[associated_element_id] = api_query_id
|
||
else:
|
||
# 如果遇到未定义的类型,可以选择记录日志或忽略
|
||
print(f"未处理的类型: {type_}")
|
||
except psycopg.Error as e:
|
||
print(f"数据库连接或查询出错: {e}")
|
||
|
||
|
||
def query_corresponding_pattern_id_and_query_id(name: str) -> None:
|
||
"""
|
||
查询 scada_info 表中 transmission_mode 为 'realtime',且 type 为 'source_outflow' 或 'pipe_flow' 的记录,
|
||
提取 associated_pattern 和 api_query_id 的对应关系,并分别存储到对应的字典中。
|
||
:param name: 数据库名称
|
||
:return:
|
||
"""
|
||
# 连接数据库
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 查询 transmission_mode 为 'realtime' 且 type 为 'source_outflow' 或 'pipe_flow' 的记录
|
||
cur.execute(
|
||
"""
|
||
SELECT type, associated_pattern, api_query_id
|
||
FROM scada_info
|
||
WHERE transmission_mode = 'realtime'
|
||
AND type IN ('source_outflow', 'pipe_flow');
|
||
"""
|
||
)
|
||
records = cur.fetchall()
|
||
# 遍历查询结果,并根据 type 将数据存储到相应的字典中
|
||
for record in records:
|
||
type_, associated_pattern, api_query_id = record
|
||
if type_ == "source_outflow":
|
||
globals.source_outflow_pattern_id[associated_pattern] = (
|
||
api_query_id
|
||
)
|
||
elif type_ == "pipe_flow":
|
||
globals.realtime_pipe_flow_pattern_id[associated_pattern] = (
|
||
api_query_id
|
||
)
|
||
except psycopg.Error as e:
|
||
print(f"数据库连接或查询出错: {e}")
|
||
|
||
|
||
# 2025/01/11
|
||
def query_non_realtime_region(name: str) -> dict:
|
||
"""
|
||
查询 scada_info 表中 transmission_mode 为 'non_realtime',且 type 为 'pipe_flow' 的记录,
|
||
提取所有以 'associated_source_outflow_id' 开头的列的值,并将每条记录的这些值作为一个 region(region1, region2, ...),
|
||
最后去掉重复的 region,并存储到 source_outflow_region 字典中。
|
||
:param name: 数据库名字
|
||
:return: 包含区域与对应 associated_source_outflow_id 的字典
|
||
"""
|
||
source_outflow_regions = [] # 用于存储所有 region(包含重复的)
|
||
# 构建连接字符串
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
# 连接到数据库
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 执行查询,筛选出 transmission_mode 为 'non_realtime' 且 type 为 'pipe_flow' 的记录
|
||
cur.execute(
|
||
"""
|
||
SELECT *
|
||
FROM scada_info
|
||
WHERE transmission_mode = 'non_realtime'
|
||
AND type = 'pipe_flow';
|
||
"""
|
||
)
|
||
records = cur.fetchall()
|
||
col_names = [desc.name for desc in cur.description]
|
||
# 找出所有以 'associated_source_outflow_id' 开头的列
|
||
source_outflow_cols = [
|
||
col
|
||
for col in col_names
|
||
if col.startswith("associated_source_outflow_id")
|
||
]
|
||
logging.info(
|
||
f"Identified source_outflow columns: {source_outflow_cols}"
|
||
)
|
||
for record in records:
|
||
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
|
||
values = [
|
||
record[col_names.index(col)]
|
||
for col in source_outflow_cols
|
||
if record[col_names.index(col)] is not None
|
||
]
|
||
# 如果该记录有相关的值,则将其作为一个 region
|
||
if values:
|
||
# 将值排序以确保相同的组合顺序一致(如果顺序不重要)
|
||
# 如果顺序重要,请删除排序步骤
|
||
region_tuple = tuple(sorted(values))
|
||
source_outflow_regions.append(region_tuple)
|
||
# 移除重复的 regions
|
||
unique_regions = []
|
||
seen = set()
|
||
for region in source_outflow_regions:
|
||
if region not in seen:
|
||
seen.add(region)
|
||
unique_regions.append(region)
|
||
# 为每个唯一的 region 分配一个 region 键
|
||
for idx, region in enumerate(unique_regions, 1):
|
||
region_key = f"region{idx}"
|
||
globals.source_outflow_region[region_key] = list(region)
|
||
logging.info("查询并处理数据成功。")
|
||
except psycopg.Error as e:
|
||
logging.error(f"数据库连接或查询出错: {e}")
|
||
except Exception as ex:
|
||
logging.error(f"处理数据时出错: {ex}")
|
||
return globals.source_outflow_region
|
||
|
||
|
||
# 2025/01/18
|
||
def query_non_realtime_region_patterns(
|
||
name: str,
|
||
source_outflow_region: dict,
|
||
column_prefix: str = "associated_source_outflow_id",
|
||
) -> dict:
|
||
"""
|
||
根据 source_outflow_region,对 scada_info 表中 transmission_mode 为 'non_realtime'的记录进行分组,
|
||
将匹配的记录的 associated_pattern 存入 non_realtime_region_patterns 字典中,同时把用 realtime pipe_flow修正的 non_realtime demand 去掉
|
||
:param name: 数据库名称
|
||
:param source_outflow_region: 包含区域与对应 associated_source_outflow_id 的字典
|
||
:param column_prefix: 需要提取的列的前缀
|
||
:return: 包含区域与对应 associated_pattern 的字典
|
||
"""
|
||
globals.non_realtime_region_patterns = {
|
||
region: [] for region in globals.source_outflow_region.keys()
|
||
}
|
||
region_tuple_to_key = {
|
||
frozenset(ids): region for region, ids in globals.source_outflow_region.items()
|
||
}
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 执行查询,筛选出 transmission_mode 为 'non_realtime'
|
||
cur.execute(
|
||
"""
|
||
SELECT *
|
||
FROM scada_info
|
||
WHERE transmission_mode = 'non_realtime'
|
||
"""
|
||
)
|
||
records = cur.fetchall()
|
||
col_names = [desc.name for desc in cur.description]
|
||
# 找出所有以指定前缀开头的列
|
||
source_outflow_cols = [
|
||
col for col in col_names if col.startswith(column_prefix)
|
||
]
|
||
logging.info(
|
||
f"Identified source_outflow columns: {source_outflow_cols}"
|
||
)
|
||
# 确保 'associated_pattern' 列存在
|
||
if "associated_pattern" not in col_names:
|
||
logging.error(
|
||
"'associated_pattern' column not found in scada_info table."
|
||
)
|
||
return globals.non_realtime_region_patterns
|
||
# 获取 'associated_pattern' 列的索引
|
||
pattern_idx = col_names.index("associated_pattern")
|
||
for record in records:
|
||
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
|
||
values = [
|
||
record[col_names.index(col)]
|
||
for col in source_outflow_cols
|
||
if record[col_names.index(col)] is not None
|
||
]
|
||
if values:
|
||
# 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配
|
||
region_frozenset = frozenset(values)
|
||
# 检查是否存在匹配的 region
|
||
region_key = region_tuple_to_key.get(region_frozenset)
|
||
if region_key:
|
||
# 获取 'associated_pattern' 的值
|
||
associated_pattern = record[pattern_idx]
|
||
if associated_pattern is not None:
|
||
globals.non_realtime_region_patterns[region_key].append(
|
||
associated_pattern
|
||
)
|
||
logging.info("生成 regions_patterns 成功。")
|
||
except psycopg.Error as e:
|
||
logging.error(f"数据库连接或查询出错: {e}")
|
||
except Exception as ex:
|
||
logging.error(f"处理数据时出错: {ex}")
|
||
# 获取pipe_flow_region_patterns中的所有区域
|
||
exclude_regions = set(
|
||
region
|
||
for regions in globals.pipe_flow_region_patterns.values()
|
||
for region in regions
|
||
)
|
||
# 从non_realtime_region_patterns中去除这些区域
|
||
for region_key, regions in globals.non_realtime_region_patterns.items():
|
||
globals.non_realtime_region_patterns[region_key] = [
|
||
region for region in regions if region not in exclude_regions
|
||
]
|
||
return globals.non_realtime_region_patterns
|
||
|
||
|
||
# 2025/01/18
|
||
def query_realtime_region_pipe_flow_and_demand_id(
|
||
name: str,
|
||
source_outflow_region: dict,
|
||
column_prefix: str = "associated_source_outflow_id",
|
||
) -> dict:
|
||
"""
|
||
根据 source_outflow_region,对 scada_info 表中 transmission_mode 为 'realtime',
|
||
且 type 为 'pipe_flow' 或 ‘demand’ 的记录进行分组,将匹配的记录的 api_query_id 存入 realtime_region_pipe_flow_and_demand_id 字典中。
|
||
:param name: 数据库名称
|
||
:param source_outflow_region: 包含区域与对应 associated_source_outflow_id 的字典
|
||
:param column_prefix: 需要提取的列的前缀
|
||
:return: 包含区域与对应 api_query_id 的字典
|
||
"""
|
||
globals.realtime_region_pipe_flow_and_demand_id = {
|
||
region: [] for region in globals.source_outflow_region.keys()
|
||
}
|
||
# 创建一个映射,从 frozenset(ids) 到 region_key
|
||
region_tuple_to_key = {
|
||
frozenset(ids): region for region, ids in globals.source_outflow_region.items()
|
||
}
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 执行查询,筛选出 transmission_mode 为 'realtime' 且 type 为 'pipe_flow' 或 'demand' 的记录
|
||
cur.execute(
|
||
"""
|
||
SELECT *
|
||
FROM scada_info
|
||
WHERE transmission_mode = 'realtime'
|
||
AND type IN ('pipe_flow', 'demand');
|
||
"""
|
||
)
|
||
records = cur.fetchall()
|
||
col_names = [desc.name for desc in cur.description]
|
||
# 找出所有以指定前缀开头的列
|
||
source_outflow_cols = [
|
||
col for col in col_names if col.startswith(column_prefix)
|
||
]
|
||
logging.info(
|
||
f"Identified source_outflow columns: {source_outflow_cols}"
|
||
)
|
||
# 确保 'api_query_id' 列存在
|
||
if "api_query_id" not in col_names:
|
||
logging.error(
|
||
"'api_query_id' column not found in scada_info table."
|
||
)
|
||
return globals.realtime_region_pipe_flow_and_demand_id
|
||
# 获取 'api_query_id' 列的索引
|
||
api_query_id_idx = col_names.index("api_query_id")
|
||
for record in records:
|
||
# 提取所有以 'associated_source_outflow_id' 开头的列的值,排除 None
|
||
values = [
|
||
record[col_names.index(col)]
|
||
for col in source_outflow_cols
|
||
if record[col_names.index(col)] is not None
|
||
]
|
||
if values:
|
||
# 将值转换为 frozenset 以便与 region_tuple_to_key 进行匹配
|
||
region_frozenset = frozenset(values)
|
||
# 检查是否存在匹配的 region
|
||
region_key = region_tuple_to_key.get(region_frozenset)
|
||
if region_key:
|
||
# 获取 'api_query_id' 的值
|
||
api_query_id = record[api_query_id_idx]
|
||
if api_query_id is not None:
|
||
globals.realtime_region_pipe_flow_and_demand_id[
|
||
region_key
|
||
].append(api_query_id)
|
||
logging.info("生成 realtime_region_pipe_flow_and_demand_id 成功。")
|
||
except psycopg.Error as e:
|
||
logging.error(f"数据库连接或查询出错: {e}")
|
||
except Exception as ex:
|
||
logging.error(f"处理数据时出错: {ex}")
|
||
return globals.realtime_region_pipe_flow_and_demand_id
|
||
|
||
|
||
# 2025/01/17
|
||
def query_pipe_flow_region_patterns(
|
||
name: str, column_prefix: str = "associated_pipe_flow_id"
|
||
) -> dict:
|
||
"""
|
||
查询 scada_info 表中 type 为 'demand' 且 transmission_mode 为 'non_realtime' 的记录,
|
||
记录该记录的 associated_pattern。
|
||
如果该记录的 associated_pipe_flow_id 存在,
|
||
且根据 associated_pipe_flow_id 查询的 associated_element_id 对应的记录的 transmission_mode 为 'realtime',
|
||
则将该记录的 associated_pattern 作为值记录到字典中,字典的 key 为 pipe_flow 类的 associated_pattern。
|
||
字典样式为:{'region1': ['P17021', 'ZBBGXSZW000377'], 'region2': ['P16504']}
|
||
:param name: 数据库名称
|
||
:param column_prefix: 需要提取的列的前缀
|
||
:return: pipe_flow_region_patterns 字典
|
||
"""
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 查询 type 为 'demand' 且 transmission_mode 为 'non_realtime' 的记录
|
||
cur.execute(
|
||
"""
|
||
SELECT associated_pattern, associated_pipe_flow_id
|
||
FROM scada_info
|
||
WHERE type = 'demand'
|
||
AND transmission_mode = 'non_realtime';
|
||
"""
|
||
)
|
||
records = cur.fetchall()
|
||
col_names = [desc.name for desc in cur.description]
|
||
# 获取列索引
|
||
pattern_idx = col_names.index("associated_pattern")
|
||
pipe_flow_id_idx = col_names.index("associated_pipe_flow_id")
|
||
for record in records:
|
||
associated_pattern = record[pattern_idx]
|
||
associated_pipe_flow_id = record[pipe_flow_id_idx]
|
||
if associated_pipe_flow_id:
|
||
# 根据 associated_pipe_flow_id 查询对应的记录
|
||
cur.execute(
|
||
"""
|
||
SELECT associated_pattern, transmission_mode
|
||
FROM scada_info
|
||
WHERE associated_element_id = %s;
|
||
""",
|
||
(associated_pipe_flow_id,),
|
||
)
|
||
pipe_flow_record = cur.fetchone()
|
||
if pipe_flow_record:
|
||
pipe_flow_associated_pattern = pipe_flow_record[0]
|
||
transmission_mode = pipe_flow_record[1]
|
||
if transmission_mode == "realtime":
|
||
# 将 associated_pattern 记录到字典中
|
||
if (
|
||
pipe_flow_associated_pattern
|
||
not in globals.pipe_flow_region_patterns
|
||
):
|
||
globals.pipe_flow_region_patterns[
|
||
pipe_flow_associated_pattern
|
||
] = []
|
||
globals.pipe_flow_region_patterns[
|
||
pipe_flow_associated_pattern
|
||
].append(associated_pattern)
|
||
logging.info("生成 pipe_flow_region_patterns 成功。")
|
||
except psycopg.Error as e:
|
||
logging.error(f"数据库连接或查询出错: {e}")
|
||
except Exception as ex:
|
||
logging.error(f"处理数据时出错: {ex}")
|
||
return globals.pipe_flow_region_patterns
|
||
|
||
|
||
# 2025/02/15
|
||
def query_SCADA_ID_corresponding_info(name: str, SCADA_ID: str) -> dict:
|
||
"""
|
||
在地图上拾取SCADA元素后,获取SCADA_ID,在pg数据库中查询该SCADA设备对应的associated_element_id和api_query_id
|
||
:param name: pg数据库的名称
|
||
:param SCADA_ID: SCADA设备的ID
|
||
:return: 包含associated_element_id和api_query_id的字典
|
||
"""
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
# 使用 psycopg.connect 创建连接
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 执行查询
|
||
query = """
|
||
SELECT associated_element_id, API_query_id
|
||
FROM scada_info
|
||
WHERE id = %s
|
||
"""
|
||
cur.execute(query, (SCADA_ID,)) # 执行查询并传递参数
|
||
# 获取查询结果
|
||
result = cur.fetchone()
|
||
if result:
|
||
# 将结果转换为字典
|
||
associated_info = {
|
||
"associated_element_id": result[0],
|
||
"API_query_id": result[1],
|
||
}
|
||
return associated_info
|
||
else:
|
||
# 如果没有找到记录
|
||
return None
|
||
except Exception as e:
|
||
print(f"An error occurred: {e}")
|
||
return None
|
||
|
||
|
||
# 2025/01/11
|
||
def get_source_outflow_region_id(
|
||
name: str,
|
||
source_outflow_region: dict,
|
||
column_prefix: str = "associated_source_outflow_id",
|
||
) -> dict:
|
||
"""
|
||
基于 source_outflow_region,将其中的 associated_source_outflow_id 替换为对应的 api_query_id,
|
||
生成新的字典 source_outflow_region_id。
|
||
|
||
:param name: 数据库名称
|
||
:param source_outflow_region: 包含区域与对应 associated_source_outflow_id 的字典
|
||
:param column_prefix: 需要提取的列的前缀
|
||
:return: 包含区域与对应 api_query_id 的字典
|
||
"""
|
||
globals.source_outflow_region_id = {
|
||
region: [] for region in globals.source_outflow_region.keys()
|
||
}
|
||
# 提取所有唯一的 associated_source_outflow_id
|
||
all_ids = set()
|
||
for ids in globals.source_outflow_region.values():
|
||
all_ids.update(ids)
|
||
if not all_ids:
|
||
logging.warning(
|
||
"No associated_source_outflow_id found in source_outflow_region."
|
||
)
|
||
return globals.source_outflow_region_id
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 查询 associated_element_id 和 api_query_id
|
||
query = f"""
|
||
SELECT associated_element_id, api_query_id
|
||
FROM scada_info
|
||
WHERE associated_element_id = ANY(%s)
|
||
"""
|
||
cur.execute(query, (list(all_ids),))
|
||
rows = cur.fetchall()
|
||
# 构建 associated_source_outflow_id 到 api_query_id 的映射
|
||
id_to_api_query_id = {}
|
||
for row in rows:
|
||
associated_id = row[0]
|
||
api_query_id = row[1]
|
||
if associated_id in all_ids and api_query_id is not None:
|
||
id_to_api_query_id[associated_id] = str(api_query_id)
|
||
# 替换 source_outflow_region 中的 associated_source_outflow_id 为 api_query_id
|
||
for region, ids in globals.source_outflow_region.items():
|
||
for id_ in ids:
|
||
api_id = id_to_api_query_id.get(id_)
|
||
if api_id:
|
||
globals.source_outflow_region_id[region].append(api_id)
|
||
else:
|
||
logging.warning(
|
||
f"No api_query_id found for associated_source_outflow_id: {id_}"
|
||
)
|
||
except psycopg.Error as e:
|
||
logging.error(f"数据库连接或查询出错: {e}")
|
||
except Exception as ex:
|
||
logging.error(f"处理数据时出错: {ex}")
|
||
return globals.source_outflow_region_id
|
||
|
||
|
||
# 2025/01/18
|
||
def get_realtime_region_patterns(
|
||
name: str,
|
||
source_outflow_region_id: dict,
|
||
realtime_region_pipe_flow_and_demand_id: dict,
|
||
) -> Tuple[dict, dict]:
|
||
"""
|
||
根据每个 region,从 scada_info 表中查询 api_query_id 对应的 associated_pattern。
|
||
将结果分别存储到 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 两个字典中。
|
||
:param name: 数据库名称
|
||
:param source_outflow_region_id: 包含 region 与对应 api_query_id 的字典
|
||
:param realtime_region_pipe_flow_and_demand_id: 包含 region 与对应 api_query_id 的字典
|
||
:return: source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 两个字典
|
||
"""
|
||
# 初始化返回的字典
|
||
globals.source_outflow_region_patterns = {
|
||
region: [] for region in globals.source_outflow_region_id.keys()
|
||
}
|
||
globals.realtime_region_pipe_flow_and_demand_patterns = {
|
||
region: [] for region in globals.realtime_region_pipe_flow_and_demand_id.keys()
|
||
}
|
||
conn_string = get_pgconn_string(db_name=name)
|
||
try:
|
||
with psycopg.connect(conn_string) as conn:
|
||
with conn.cursor() as cur:
|
||
# 遍历每个 region
|
||
for region in globals.source_outflow_region_id.keys():
|
||
# 获取 source_outflow_region_id 的 api_query_id 并查询 associated_pattern
|
||
source_outflow_api_ids = globals.source_outflow_region_id[region]
|
||
if source_outflow_api_ids:
|
||
api_query_ids_str = ", ".join(
|
||
[f"'{api_id}'" for api_id in source_outflow_api_ids]
|
||
)
|
||
cur.execute(
|
||
f"""
|
||
SELECT api_query_id, associated_pattern
|
||
FROM scada_info
|
||
WHERE api_query_id IN ({api_query_ids_str});
|
||
"""
|
||
)
|
||
results = cur.fetchall()
|
||
globals.source_outflow_region_patterns[region] = [
|
||
associated_pattern
|
||
for _, associated_pattern in results
|
||
if associated_pattern
|
||
]
|
||
# 获取 realtime_region_pipe_flow_and_demand_id 的 api_query_id 并查询 associated_pattern
|
||
realtime_api_ids = globals.realtime_region_pipe_flow_and_demand_id[
|
||
region
|
||
]
|
||
if realtime_api_ids:
|
||
api_query_ids_str = ", ".join(
|
||
[f"'{api_id}'" for api_id in realtime_api_ids]
|
||
)
|
||
cur.execute(
|
||
f"""
|
||
SELECT api_query_id, associated_pattern
|
||
FROM scada_info
|
||
WHERE api_query_id IN ({api_query_ids_str});
|
||
"""
|
||
)
|
||
results = cur.fetchall()
|
||
globals.realtime_region_pipe_flow_and_demand_patterns[
|
||
region
|
||
] = [
|
||
associated_pattern
|
||
for _, associated_pattern in results
|
||
if associated_pattern
|
||
]
|
||
logging.info(
|
||
"生成 source_outflow_region_patterns 和 realtime_region_pipe_flow_and_demand_patterns 成功。"
|
||
)
|
||
except psycopg.Error as e:
|
||
logging.error(f"数据库连接或查询出错: {e}")
|
||
except Exception as ex:
|
||
logging.error(f"处理数据时出错: {ex}")
|
||
return (
|
||
globals.source_outflow_region_patterns,
|
||
globals.realtime_region_pipe_flow_and_demand_patterns,
|
||
)
|
||
|
||
|
||
def get_pattern_index(cur_datetime: str) -> int:
|
||
"""
|
||
根据给定的日期时间字符串,计算并返回对应的模式索引。
|
||
:param cur_datetime: str, 当前的日期时间字符串,格式为“YYYY-MM-DD HH:MM:SS”。
|
||
:return: int, 基于预定义的时间步长 PATTERN_TIME_STEP。
|
||
"""
|
||
str_format = "%Y-%m-%d %H:%M:%S"
|
||
dt = datetime.strptime(cur_datetime, str_format)
|
||
hr = dt.hour
|
||
mnt = dt.minute
|
||
i = int((hr * 60 + mnt) / globals.PATTERN_TIME_STEP)
|
||
return i
|
||
|
||
|
||
def get_pattern_index_str(current_time: str) -> str:
|
||
"""
|
||
根据当前时间获取时间步长的模式索引,并将其格式化为“HH:MM:00”字符串。
|
||
:param current_time: str, 当前时间,格式为"YYYY-MM-DD HH:MM:SS"
|
||
:return: str, 以“HH:MM:00”格式返回
|
||
"""
|
||
i = get_pattern_index(current_time)
|
||
[minN, hrN] = modf(i * globals.PATTERN_TIME_STEP / 60)
|
||
minN_str = str(int(minN * 60))
|
||
minN_str = minN_str.zfill(2)
|
||
hrN_str = str(int(hrN))
|
||
hrN_str = hrN_str.zfill(2)
|
||
str_i = "{}:{}:00".format(hrN_str, minN_str)
|
||
return str_i
|
||
|
||
|
||
def from_seconds_to_clock(secs: int) -> str:
|
||
"""
|
||
从秒格式化为“HH:MM:00”字符串
|
||
:param secs: int,秒
|
||
:return: str, 以“HH:MM:00”格式返回
|
||
"""
|
||
hrs = int(secs / 3600)
|
||
minutes = int((secs - hrs * 3600) / 60)
|
||
seconds = secs - hrs * 3600 - minutes * 60
|
||
hrs_str = str(hrs).zfill(2)
|
||
minutes_str = str(minutes).zfill(2)
|
||
seconds_str = str(seconds).zfill(2)
|
||
str_clock = "{}:{}:{}".format(hrs_str, minutes_str, seconds_str)
|
||
return str_clock
|
||
|
||
|
||
def convert_time_format(original_time: str) -> str:
|
||
"""
|
||
格式转换,将“2024-04-13T08:00:00+08:00"转为“2024-04-13 08:00:00”
|
||
:param original_time: str, “2024-04-13T08:00:00+08:00"格式的时间
|
||
:return: str,“2024-04-13 08:00:00”格式的时间
|
||
"""
|
||
new_time = original_time.replace("T", " ")
|
||
new_time = new_time.replace("+08:00", "")
|
||
return new_time
|
||
|
||
|
||
def get_history_pattern_info(project_name, pattern_name):
|
||
"""读取选定pattern的保存的历史pattern信息flow和factor"""
|
||
flow_list = []
|
||
factor_list = []
|
||
patterns_info = read_all(
|
||
project_name,
|
||
f"select * from history_patterns_flows where id = '{pattern_name}' order by _order",
|
||
)
|
||
for item in patterns_info:
|
||
flow_list.append(float(item["flow"]))
|
||
factor_list.append(float(item["factor"]))
|
||
return flow_list, factor_list
|
||
|
||
|
||
# 2025/01/11
|
||
def run_simulation(
|
||
name: str,
|
||
simulation_type: str,
|
||
modify_pattern_start_time: str,
|
||
modify_total_duration: int = 0,
|
||
modify_reservoir_head_pattern: dict[str, list] = None,
|
||
modify_tank_initial_level: dict[str, float] = None,
|
||
modify_junction_base_demand: dict[str, float] = None,
|
||
modify_junction_damand_pattern: dict[str, list] = None,
|
||
modify_fixed_pump_pattern: dict[str, list] = None,
|
||
modify_variable_pump_pattern: dict[str, list] = None,
|
||
modify_valve_opening: dict[str, float] = None,
|
||
scheme_type: str = None,
|
||
scheme_name: str = None,
|
||
) -> None:
|
||
"""
|
||
传入需要修改的参数,改变数据库中对应位置的值,然后计算,返回结果
|
||
:param name: 模型名称,数据库中对应的名字
|
||
:param simulation_type: 模拟的类型,realtime为实时模拟,修改原数据库;extended为多步长模拟,需要复制数据库
|
||
:param modify_pattern_start_time: 模拟开始时间,格式为'2024-11-25T09:00:00+08:00'
|
||
:param modify_total_duration: 模拟总历时,秒
|
||
:param modify_reservoir_head_pattern: dict中包含多个水库模式,str为水库head_pattern的id,list为修改后的head_pattern
|
||
:param modify_tank_initial_level: dict中包含多个水塔,str为水塔的id,float为修改后的initial_level
|
||
:param modify_junction_base_demand: dict中包含多个节点,str为节点的id,float为修改后的base_demand
|
||
:param modify_junction_damand_pattern: dict中包含多个节点模式,str为节点demand_pattern的id,list为修改后的demand_pattern
|
||
:param modify_fixed_pump_pattern: dict中包含多个水泵模式,str为工频水泵的id,list为修改后的pattern
|
||
:param modify_variable_pump_pattern: dict中包含多个水泵模式,str为变频水泵的id,list为修改后的pattern
|
||
:param modify_valve_opening: dict中包含多个阀门开启度,str为阀门的id,float为修改后的阀门开启度
|
||
:param scheme_type: 模拟方案类型
|
||
:param scheme_name:模拟方案名称
|
||
:return:
|
||
"""
|
||
# 记录开始时间
|
||
time_cost_start = time.perf_counter()
|
||
|
||
print("name", name)
|
||
print("simulation_type", simulation_type)
|
||
print("modify_pattern_start_time", modify_pattern_start_time)
|
||
print("modify_total_duration", modify_total_duration)
|
||
print("modify_reservoir_head_pattern", modify_reservoir_head_pattern)
|
||
print("modify_tank_initial_level", modify_tank_initial_level)
|
||
print("modify_junction_base_demand", modify_junction_base_demand)
|
||
|
||
print(
|
||
"{} -- Hydraulic simulation started.".format(
|
||
datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
|
||
)
|
||
)
|
||
|
||
# 判断是实时模拟还是多步长模拟
|
||
# if simulation_type.upper() == 'REALTIME': # 实时模拟(修改原数据库)
|
||
# name_c = name
|
||
# elif simulation_type.upper() == 'EXTENDED': # 扩展模拟(复制数据库)
|
||
# name_c = '_'.join([name, 'c'])
|
||
# if have_project(name_c):
|
||
# if is_project_open(name_c):
|
||
# close_project(name_c)
|
||
# delete_project(name_c)
|
||
# copy_project(name, name_c) # 备份项目
|
||
# else:
|
||
# raise Exception('Incorrect simulation type, choose in (realtime, extended)')
|
||
name_c = name
|
||
# 打开数据库
|
||
open_project(name_c)
|
||
dic_time = get_time(name_c)
|
||
|
||
print(dic_time)
|
||
|
||
# 获取水力模拟步长,如’0:15:00‘
|
||
globals.hydraulic_timestep = dic_time["HYDRAULIC TIMESTEP"]
|
||
# 将时间字符串转换为 timedelta 对象
|
||
time_obj = datetime.strptime(globals.hydraulic_timestep, "%H:%M:%S")
|
||
# 转换为分钟浮点数
|
||
globals.PATTERN_TIME_STEP = float(
|
||
time_obj.hour * 60 + time_obj.minute + time_obj.second / 60
|
||
)
|
||
# 对输入的时间参数进行处理
|
||
pattern_start_time = convert_time_format(modify_pattern_start_time)
|
||
# 获取模拟开始时间是对应pattern的第几个数
|
||
modify_index = get_pattern_index(pattern_start_time)
|
||
# 遍历水泵的pattern_id,并根据输入的pump_pattern修改pattern的值
|
||
# for pump_pattern_id in pump_pattern_ids:
|
||
# # 检查pump_pattern中pump_pattern_id对应的第一个频率值是否为有效数字(非空、非NaN)。如果该值有效,则继续执行代码块。
|
||
# if not np.isnan(modify_pump_pattern[pump_pattern_id][0]):
|
||
# # 取出数据库中的pattern
|
||
# pump_pattern = get_pattern(name_c, get_pump(name_c, pump_pattern_id)['pattern'])
|
||
# # 替换数据库中的pattern为modify_pump_pattern
|
||
# pump_pattern['factors'][modify_index: modify_index + len(modify_pump_pattern[pump_pattern_id])] \
|
||
# = modify_pump_pattern[pump_pattern_id]
|
||
# cs = ChangeSet()
|
||
# cs.append(pump_pattern)
|
||
# set_pattern(name_c, cs)
|
||
# 修改模拟开始的时间
|
||
str_pattern_start = get_pattern_index_str(
|
||
convert_time_format(modify_pattern_start_time)
|
||
)
|
||
dic_time = get_time(name_c)
|
||
dic_time["PATTERN START"] = str_pattern_start
|
||
dic_time["DURATION"] = from_seconds_to_clock(modify_total_duration)
|
||
if simulation_type.upper() == "REALTIME":
|
||
dic_time["DURATION"] = 0
|
||
cs = ChangeSet()
|
||
cs.operations.append(dic_time)
|
||
set_time(name_c, cs)
|
||
# 根据SCADA实时数据进行修改,如果没有对应的SCADA数据,如未来的时间点,则不改变pg数据库的数据
|
||
if globals.reservoirs_id:
|
||
# reservoirs_id = {'ZBBDJSCP000002': '2497', 'R00003': '2571'}
|
||
# 1.获取reservoir的SCADA数据,形式如{'2497': '3.1231', '2571': '2.7387'}
|
||
reservoir_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(globals.reservoirs_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
# 2.构建出新字典,形式如{'ZBBDJSCP000002': '3.1231', 'R00003': '2.7387'}
|
||
reservoir_dict = {
|
||
key: reservoir_SCADA_data_dict[value]
|
||
for key, value in globals.reservoirs_id.items()
|
||
}
|
||
# 3.修改reservoir液位模式
|
||
for reservoir_name, value in reservoir_dict.items():
|
||
if value and float(value) != 0:
|
||
# 先根据reservoir获取对应的pattern,再对pattern进行修改
|
||
reservoir_pattern = get_pattern(
|
||
name_c, get_reservoir(name_c, reservoir_name)["pattern"]
|
||
)
|
||
reservoir_pattern["factors"][modify_index] = (
|
||
float(value) + globals.RESERVOIR_BASIC_HEIGHT
|
||
)
|
||
cs = ChangeSet()
|
||
cs.append(reservoir_pattern)
|
||
set_pattern(name_c, cs)
|
||
if globals.tanks_id:
|
||
# 修改tank初始液位
|
||
tank_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(globals.tanks_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
tank_dict = {
|
||
key: tank_SCADA_data_dict[value] for key, value in globals.tanks_id.items()
|
||
}
|
||
for tank_name, value in tank_dict.items():
|
||
if value and float(value) != 0:
|
||
tank = get_tank(name_c, tank_name)
|
||
tank["init_level"] = float(value)
|
||
cs = ChangeSet()
|
||
cs.append(tank)
|
||
set_tank(name_c, cs)
|
||
if globals.fixed_pumps_id:
|
||
# 修改工频泵的pattern
|
||
fixed_pump_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(globals.fixed_pumps_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
# print(fixed_pump_SCADA_data_dict)
|
||
fixed_pump_dict = {
|
||
key: fixed_pump_SCADA_data_dict[value]
|
||
for key, value in globals.fixed_pumps_id.items()
|
||
}
|
||
# print(fixed_pump_dict)
|
||
for fixed_pump_name, value in fixed_pump_dict.items():
|
||
if value:
|
||
pump_pattern = get_pattern(
|
||
name_c, get_pump(name_c, fixed_pump_name)["pattern"]
|
||
)
|
||
# print(pump_pattern)
|
||
pump_pattern["factors"][modify_index] = float(value)
|
||
# print(pump_pattern['factors'][modify_index])
|
||
cs = ChangeSet()
|
||
cs.append(pump_pattern)
|
||
set_pattern(name_c, cs)
|
||
if globals.variable_pumps_id:
|
||
# 修改变频泵的pattern
|
||
variable_pump_SCADA_data_dict = (
|
||
TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(globals.variable_pumps_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
)
|
||
variable_pump_dict = {
|
||
key: variable_pump_SCADA_data_dict[value]
|
||
for key, value in globals.variable_pumps_id.items()
|
||
}
|
||
for variable_pump_name, value in variable_pump_dict.items():
|
||
if value:
|
||
pump_pattern = get_pattern(
|
||
name_c, get_pump(name_c, variable_pump_name)["pattern"]
|
||
)
|
||
pump_pattern["factors"][modify_index] = float(value) / 50
|
||
cs = ChangeSet()
|
||
cs.append(pump_pattern)
|
||
set_pattern(name_c, cs)
|
||
if globals.demand_id:
|
||
# 基于实时数据,修改大用户节点的pattern
|
||
demand_SCADA_data_dict = TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(globals.demand_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
demand_dict = {
|
||
key: demand_SCADA_data_dict[value]
|
||
for key, value in globals.demand_id.items()
|
||
}
|
||
for demand_name, value in demand_dict.items():
|
||
if value:
|
||
demand_pattern = get_pattern(
|
||
name_c, get_demand(name_c, demand_name)["pattern"]
|
||
)
|
||
if get_option(name_c)["UNITS"] == "LPS":
|
||
demand_pattern["factors"][modify_index] = (
|
||
float(value) / 3.6
|
||
) # 默认SCADA数据获取的是流量单位是m3/h, 转换为 L/s
|
||
elif get_option(name_c)["UNITS"] == "CMH":
|
||
demand_pattern["factors"][modify_index] = float(value)
|
||
cs = ChangeSet()
|
||
cs.append(demand_pattern)
|
||
set_pattern(name_c, cs)
|
||
# 水质、压力实时数据使用方法待补充
|
||
#############################
|
||
if globals.source_outflow_pattern_id:
|
||
# 基于实时的出厂流量计数据,修改出厂流量计绑定的pattern
|
||
source_outflow_SCADA_data_dict = (
|
||
TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(globals.source_outflow_pattern_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
)
|
||
# print(source_outflow_SCADA_data_dict)
|
||
source_outflow_dict = {
|
||
key: source_outflow_SCADA_data_dict[value]
|
||
for key, value in globals.source_outflow_pattern_id.items()
|
||
}
|
||
# print(source_outflow_dict)
|
||
for pattern_name in source_outflow_dict.keys():
|
||
# print(pattern_name)
|
||
history_source_outflow_flow_list, history_source_outflow_factor_list = (
|
||
get_history_pattern_info(name_c, pattern_name)
|
||
)
|
||
history_source_outflow_flow = history_source_outflow_flow_list[modify_index]
|
||
history_source_outflow_factor = history_source_outflow_factor_list[
|
||
modify_index
|
||
]
|
||
# print(source_outflow_dict[pattern_name])
|
||
# print(history_source_outflow_flow)
|
||
# print(history_source_outflow_factor)
|
||
if source_outflow_dict[pattern_name]:
|
||
realtime_source_outflow = float(source_outflow_dict[pattern_name])
|
||
multiply_factor = realtime_source_outflow / history_source_outflow_flow
|
||
# print(multiply_factor)
|
||
pattern = get_pattern(name_c, pattern_name)
|
||
pattern["factors"][modify_index] = (
|
||
multiply_factor * history_source_outflow_factor
|
||
)
|
||
# print(pattern['factors'][modify_index])
|
||
cs = ChangeSet()
|
||
cs.append(pattern)
|
||
set_pattern(name_c, cs)
|
||
if globals.realtime_pipe_flow_pattern_id:
|
||
# 基于实时的pipe_flow类数据,修改pipe_flow类绑定的pattern
|
||
realtime_pipe_flow_SCADA_data_dict = (
|
||
TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(globals.realtime_pipe_flow_pattern_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
)
|
||
realtime_pipe_flow_dict = {
|
||
key: realtime_pipe_flow_SCADA_data_dict[value]
|
||
for key, value in globals.realtime_pipe_flow_pattern_id.items()
|
||
}
|
||
for pattern_name in realtime_pipe_flow_dict.keys():
|
||
history_pipe_flow_flow_list, history_pipe_flow_factor_list = (
|
||
get_history_pattern_info(name_c, pattern_name)
|
||
)
|
||
history_pipe_flow_flow = history_pipe_flow_flow_list[modify_index]
|
||
history_pipe_flow_factor = history_pipe_flow_factor_list[modify_index]
|
||
if realtime_pipe_flow_dict[pattern_name]:
|
||
realtime_pipe_flow = float(realtime_pipe_flow_dict[pattern_name])
|
||
multiply_factor = realtime_pipe_flow / history_pipe_flow_flow
|
||
pattern = get_pattern(name_c, pattern_name)
|
||
pattern["factors"][modify_index] = (
|
||
multiply_factor * history_pipe_flow_factor
|
||
)
|
||
cs = ChangeSet()
|
||
cs.append(pattern)
|
||
set_pattern(name_c, cs)
|
||
if globals.pipe_flow_region_patterns:
|
||
# 基于实时的pipe_flow类数据,修改pipe_flow分区流量计范围内的non_realtime的demand绑定的pattern
|
||
temp_realtime_pipe_flow_pattern_id = {}
|
||
# 遍历 pipe_flow_region_patterns 字典的 key
|
||
for (
|
||
pipe_flow_region,
|
||
demand_patterns,
|
||
) in globals.pipe_flow_region_patterns.items():
|
||
# 获取对应的实时值
|
||
query_api_id = globals.realtime_pipe_flow_pattern_id.get(pipe_flow_region)
|
||
temp_realtime_pipe_flow_pattern_id[pipe_flow_region] = query_api_id
|
||
temp_realtime_pipe_flow_SCADA_data_dict = (
|
||
TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=list(temp_realtime_pipe_flow_pattern_id.values()),
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
)
|
||
temp_realtime_pipe_flow_dict = {
|
||
key: temp_realtime_pipe_flow_SCADA_data_dict[value]
|
||
for key, value in temp_realtime_pipe_flow_pattern_id.items()
|
||
}
|
||
for pattern_name in temp_realtime_pipe_flow_dict.keys():
|
||
temp_history_pipe_flow_flow_list, temp_history_pipe_flow_factor_list = (
|
||
get_history_pattern_info(name_c, pattern_name)
|
||
)
|
||
temp_history_pipe_flow_flow = temp_history_pipe_flow_flow_list[modify_index]
|
||
if temp_realtime_pipe_flow_dict[pattern_name]:
|
||
temp_realtime_pipe_flow = float(
|
||
temp_realtime_pipe_flow_dict[pattern_name]
|
||
)
|
||
temp_multiply_factor = (
|
||
temp_realtime_pipe_flow / temp_history_pipe_flow_flow
|
||
)
|
||
temp_non_realtime_demand_pattern_list = (
|
||
globals.pipe_flow_region_patterns[pattern_name]
|
||
)
|
||
for demand_pattern_name in temp_non_realtime_demand_pattern_list:
|
||
(
|
||
history_non_realtime_demand_flow_list,
|
||
history_non_realtime_demand_factor_list,
|
||
) = get_history_pattern_info(name_c, demand_pattern_name)
|
||
history_non_realtime_demand_factor = (
|
||
history_non_realtime_demand_factor_list[modify_index]
|
||
)
|
||
pattern = get_pattern(name_c, demand_pattern_name)
|
||
pattern["factors"][modify_index] = (
|
||
temp_multiply_factor * history_non_realtime_demand_factor
|
||
)
|
||
cs = ChangeSet()
|
||
cs.append(pattern)
|
||
set_pattern(name_c, cs)
|
||
if globals.source_outflow_region:
|
||
# 根据associated_source_outflow_id进行分区,各分区用(出厂的流量计 - 实时的pipe_flow和demand)进行数据更新
|
||
for region in globals.source_outflow_region.keys():
|
||
temp_source_outflow_region_id = globals.source_outflow_region_id.get(
|
||
region, []
|
||
)
|
||
temp_realtime_region_pipe_flow_and_demand_id = (
|
||
globals.realtime_region_pipe_flow_and_demand_id.get(region, [])
|
||
)
|
||
temp_source_outflow_region_patterns = (
|
||
globals.source_outflow_region_patterns.get(region, [])
|
||
)
|
||
temp_realtime_region_pipe_flow_and_demand_patterns = (
|
||
globals.realtime_region_pipe_flow_and_demand_patterns.get(region, [])
|
||
)
|
||
temp_non_realtime_region_patterns = (
|
||
globals.non_realtime_region_patterns.get(region, [])
|
||
)
|
||
region_source_outflow_data_dict = (
|
||
TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=temp_source_outflow_region_id,
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
)
|
||
region_realtime_region_pipe_flow_and_demand_data_dict = (
|
||
TimescaleInternalQueries.query_scada_by_ids_time(
|
||
device_ids=temp_realtime_region_pipe_flow_and_demand_id,
|
||
query_time=modify_pattern_start_time,
|
||
)
|
||
)
|
||
# 2025/02/12 确保 region_source_outflow_data_dict 和
|
||
# region_realtime_region_pipe_flow_and_demand_data_dict中的每个值都不是 None 且不为 0
|
||
region_source_outflow_valid_values = [
|
||
float(value)
|
||
for value in region_source_outflow_data_dict.values()
|
||
if value not in [None, 0]
|
||
]
|
||
valid_values = [
|
||
float(value)
|
||
for value in region_realtime_region_pipe_flow_and_demand_data_dict.values()
|
||
if value not in [None, 0]
|
||
]
|
||
# 如果都非空,则执行 sum 操作
|
||
if region_source_outflow_valid_values and valid_values:
|
||
region_total_source_outflow = sum(region_source_outflow_valid_values)
|
||
history_region_total_source_outflow = 0
|
||
for source_outflow_pattern_name in temp_source_outflow_region_patterns:
|
||
(
|
||
temp_history_source_outflow_flow_list,
|
||
temp_history_source_outflow_factor_list,
|
||
) = get_history_pattern_info(name_c, source_outflow_pattern_name)
|
||
history_region_total_source_outflow += (
|
||
temp_history_source_outflow_flow_list[modify_index]
|
||
)
|
||
region_total_realtime_region_pipe_flow_and_demand = sum(valid_values)
|
||
history_region_total_realtime_region_pipe_flow_and_demand = 0
|
||
for (
|
||
pipe_flow_and_demand_pattern_name
|
||
) in temp_realtime_region_pipe_flow_and_demand_patterns:
|
||
(
|
||
temp_history_pipe_flow_and_demand_flow_list,
|
||
temp_history_pipe_flow_and_demand_factor_list,
|
||
) = get_history_pattern_info(
|
||
name_c, pipe_flow_and_demand_pattern_name
|
||
)
|
||
history_region_total_realtime_region_pipe_flow_and_demand += (
|
||
temp_history_pipe_flow_and_demand_flow_list[modify_index]
|
||
)
|
||
temp_multiply_factor = (
|
||
region_total_source_outflow
|
||
- region_total_realtime_region_pipe_flow_and_demand
|
||
) / (
|
||
history_region_total_source_outflow
|
||
- history_region_total_realtime_region_pipe_flow_and_demand
|
||
)
|
||
for (
|
||
non_realtime_region_pattern_name
|
||
) in temp_non_realtime_region_patterns:
|
||
(
|
||
history_non_realtime_region_pattern_flow_list,
|
||
history_non_realtime_region_pattern_factor_list,
|
||
) = get_history_pattern_info(
|
||
name_c, non_realtime_region_pattern_name
|
||
)
|
||
history_non_realtime_region_pattern_factor = (
|
||
history_non_realtime_region_pattern_factor_list[modify_index]
|
||
)
|
||
pattern = get_pattern(name_c, non_realtime_region_pattern_name)
|
||
pattern["factors"][modify_index] = (
|
||
temp_multiply_factor
|
||
* history_non_realtime_region_pattern_factor
|
||
)
|
||
cs = ChangeSet()
|
||
cs.append(pattern)
|
||
set_pattern(name_c, cs)
|
||
# 根据输入的参数进行数据修改,后面修改的可以覆盖前面的,用于EXTENDED类的方案模拟
|
||
# 修改清水池(reservoir)液位的pattern
|
||
if modify_reservoir_head_pattern:
|
||
for reservoir_name in modify_reservoir_head_pattern.keys():
|
||
# 这句代码的作用是判断modify_reservoir_head_pattern[reservoir_name][0]是否不是NaN。
|
||
# 如果modify_reservoir_head_pattern[reservoir_name][0]不是NaN,则条件成立,代码块会执行
|
||
if not np.isnan(modify_reservoir_head_pattern[reservoir_name][0]):
|
||
# 给 list 中的所有元素加上 RESERVOIR_BASIC_HEIGHT
|
||
modified_values = [
|
||
value + globals.RESERVOIR_BASIC_HEIGHT
|
||
for value in modify_reservoir_head_pattern[reservoir_name]
|
||
]
|
||
reservoir_pattern = get_pattern(
|
||
name_c, get_reservoir(name_c, reservoir_name)["pattern"]
|
||
)
|
||
reservoir_pattern["factors"][
|
||
modify_index : modify_index + len(modified_values)
|
||
] = modified_values
|
||
cs = ChangeSet()
|
||
cs.append(reservoir_pattern)
|
||
set_pattern(name_c, cs)
|
||
# 修改调节池(tank)初始液位
|
||
if modify_tank_initial_level:
|
||
for tank_name in modify_tank_initial_level.keys():
|
||
if (not np.isnan(modify_tank_initial_level[tank_name])) and (
|
||
modify_tank_initial_level[tank_name] != 0
|
||
):
|
||
tank = get_tank(name_c, tank_name)
|
||
tank["init_level"] = modify_tank_initial_level[tank_name]
|
||
cs = ChangeSet()
|
||
cs.append(tank)
|
||
set_tank(name_c, cs)
|
||
# 修改节点(junction)基础水量(demand)
|
||
if modify_junction_base_demand:
|
||
for junction_name in modify_junction_base_demand.keys():
|
||
if not np.isnan(modify_junction_base_demand[junction_name]):
|
||
junction = get_demand(name_c, junction_name)
|
||
junction["demand"] = modify_junction_base_demand[junction_name]
|
||
cs = ChangeSet()
|
||
cs.append(junction)
|
||
set_demand(name_c, cs)
|
||
# 修改节点(junction)的水量模式(pattern)
|
||
if modify_junction_damand_pattern:
|
||
for pattern_name in modify_junction_damand_pattern.keys():
|
||
if not np.isnan(modify_junction_damand_pattern[pattern_name][0]):
|
||
junction_pattern = get_pattern(name_c, pattern_name)
|
||
junction_pattern["factors"][
|
||
modify_index : modify_index
|
||
+ len(modify_junction_damand_pattern[pattern_name])
|
||
] = modify_junction_damand_pattern[pattern_name]
|
||
cs = ChangeSet()
|
||
cs.append(junction_pattern)
|
||
set_pattern(name_c, cs)
|
||
# 修改工频水泵(fixed_pump)的pattern
|
||
if modify_fixed_pump_pattern:
|
||
for pump_name in modify_fixed_pump_pattern.keys():
|
||
if not np.isnan(modify_fixed_pump_pattern[pump_name][0]):
|
||
pump_pattern = get_pattern(
|
||
name_c, get_pump(name_c, pattern_name)["pattern"]
|
||
)
|
||
pump_pattern["factors"][
|
||
modify_index : modify_index + len(modify_fixed_pump_pattern)
|
||
] = modify_fixed_pump_pattern[pump_name]
|
||
cs = ChangeSet()
|
||
cs.append(pump_pattern)
|
||
set_pattern(name_c, cs)
|
||
# 修改变频水泵(variable_pump)的pattern
|
||
if modify_variable_pump_pattern:
|
||
for pump_name in modify_variable_pump_pattern.keys():
|
||
if not np.isnan(modify_variable_pump_pattern[pump_name][0]):
|
||
# 给 list 中的所有元素除以 50Hz
|
||
modified_values = [
|
||
value / 50 for value in modify_variable_pump_pattern[pump_name]
|
||
]
|
||
pump_pattern = get_pattern(
|
||
name_c, get_pump(name_c, pattern_name)["pattern"]
|
||
)
|
||
pump_pattern["factors"][
|
||
modify_index : modify_index + len(modified_values)
|
||
] = modified_values
|
||
cs = ChangeSet()
|
||
cs.append(pump_pattern)
|
||
set_pattern(name_c, cs)
|
||
# 修改阀门(valve)的状态setting和status
|
||
if modify_valve_opening:
|
||
for valve_name in modify_valve_opening.keys():
|
||
if not np.isnan(modify_valve_opening[valve_name]):
|
||
valve_status = get_status(name_c, valve_name)
|
||
if modify_valve_opening[valve_name] == 0:
|
||
valve_status["status"] = "CLOSED"
|
||
valve_status["setting"] = 0
|
||
if modify_valve_opening[valve_name] < 1:
|
||
valve_status["status"] = "OPEN"
|
||
valve_status["setting"] = 0.1036 * pow(
|
||
modify_valve_opening[valve_name], -3.105
|
||
)
|
||
if modify_valve_opening[valve_name] == 1:
|
||
valve_status["status"] = "OPEN"
|
||
valve_status["setting"] = 0
|
||
cs = ChangeSet()
|
||
cs.append(valve_status)
|
||
set_status(name_c, cs)
|
||
# 运行并返回结果
|
||
run_project(name_c)
|
||
time_cost_end = time.perf_counter()
|
||
print(
|
||
"{} -- Hydraulic simulation finished, cost time: {:.2f} s.".format(
|
||
datetime.now(pytz.timezone("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S"),
|
||
time_cost_end - time_cost_start,
|
||
)
|
||
)
|
||
# DingZQ 下面这几句一定要这样,不然读取不了
|
||
# time.sleep(5) # wait 5 seconds
|
||
|
||
# TODO: 2025/03/24
|
||
# DingZQ 这个名字要用随机数来处理
|
||
tmp_file = f"./temp/simulation_{uuid.uuid4()}.result.out"
|
||
shutil.copy(f"./temp/{name_c}.db.opt", tmp_file)
|
||
|
||
output = Output(tmp_file)
|
||
node_result = output.node_results()
|
||
link_result = output.link_results()
|
||
|
||
# link_flow = []
|
||
# for link in link_result:
|
||
# link_flow.append(link['result'][-1]['flow'])
|
||
# print(link_flow)
|
||
num_periods_result = output.times()["num_periods"]
|
||
print("simulation_type", simulation_type)
|
||
print("before store result")
|
||
# print(num_periods_result)
|
||
# print(node_result)
|
||
# 存储
|
||
starttime = time.time()
|
||
if simulation_type.upper() == "REALTIME":
|
||
TimescaleInternalStorage.store_realtime_simulation(
|
||
node_result, link_result, modify_pattern_start_time
|
||
)
|
||
elif simulation_type.upper() == "EXTENDED":
|
||
TimescaleInternalStorage.store_scheme_simulation(
|
||
scheme_type,
|
||
scheme_name,
|
||
node_result,
|
||
link_result,
|
||
modify_pattern_start_time,
|
||
num_periods_result,
|
||
)
|
||
endtime = time.time()
|
||
logging.info("store time: %f", endtime - starttime)
|
||
# 暂不需要再次存储 SCADA 模拟信息
|
||
# TimescaleInternalQueries.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
|
||
|
||
# if simulation_type.upper() == "REALTIME":
|
||
# influxdb_api.store_realtime_simulation_result_to_influxdb(
|
||
# node_result, link_result, modify_pattern_start_time
|
||
# )
|
||
# elif simulation_type.upper() == "EXTENDED":
|
||
# influxdb_api.store_scheme_simulation_result_to_influxdb(
|
||
# node_result,
|
||
# link_result,
|
||
# modify_pattern_start_time,
|
||
# num_periods_result,
|
||
# scheme_type,
|
||
# scheme_name,
|
||
# )
|
||
# 暂不需要再次存储 SCADA 模拟信息
|
||
# influxdb_api.fill_scheme_simulation_result_to_SCADA(scheme_type=scheme_type, scheme_name=scheme_name)
|
||
|
||
print("after store result")
|
||
|
||
del output
|
||
os.remove(tmp_file)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 计算前,获取scada_info中的信息,按照设定的方法修改pg数据库
|
||
query_corresponding_element_id_and_query_id(project_info.name)
|
||
query_corresponding_pattern_id_and_query_id(project_info.name)
|
||
region_result = query_non_realtime_region(project_info.name)
|
||
|
||
globals.source_outflow_region_id = get_source_outflow_region_id(
|
||
project_info.name, region_result
|
||
)
|
||
globals.realtime_region_pipe_flow_and_demand_id = (
|
||
query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result)
|
||
)
|
||
globals.pipe_flow_region_patterns = query_pipe_flow_region_patterns(
|
||
project_info.name
|
||
)
|
||
|
||
globals.non_realtime_region_patterns = query_non_realtime_region_patterns(
|
||
project_info.name, region_result
|
||
)
|
||
(
|
||
globals.source_outflow_region_patterns,
|
||
globals.realtime_region_pipe_flow_and_demand_patterns,
|
||
) = get_realtime_region_patterns(
|
||
project_info.name,
|
||
globals.source_outflow_region_id,
|
||
globals.realtime_region_pipe_flow_and_demand_id,
|
||
)
|
||
|
||
# 基础日期和时间(日期部分保持不变)
|
||
base_date = datetime(2025, 5, 4)
|
||
|
||
# 循环生成96个时间点(15分钟间隔)
|
||
for i in range(96):
|
||
# 计算当前时间偏移
|
||
time_offset = timedelta(minutes=15 * i)
|
||
|
||
# 生成完整时间对象
|
||
current_time = base_date + time_offset
|
||
|
||
# 格式化成ISO8601带时区格式
|
||
iso_time = current_time.strftime("%Y-%m-%dT%H:%M:%S") + "+08:00"
|
||
|
||
# 执行函数调用
|
||
run_simulation(
|
||
name=project_info.name,
|
||
simulation_type="realtime",
|
||
modify_pattern_start_time=iso_time,
|
||
)
|
||
|
||
# 打印字典内容以验证
|
||
# print("Reservoirs ID:", globals.reservoirs_id)
|
||
# print("Tanks ID:", globals.tanks_id)
|
||
# print("Fixed Pumps ID:", globals.fixed_pumps_id)
|
||
# print("Variable Pumps ID:", globals.variable_pumps_id)
|
||
# print("Pressure ID:", globals.pressure_id)
|
||
# print("Demand ID:", globals.demand_id)
|
||
# print("Quality ID:", globals.quality_id)
|
||
# print("Source Outflow Pattern ID:", globals.source_outflow_pattern_id)
|
||
# print("Realtime Pipe Flow Pattern ID:", globals.realtime_pipe_flow_pattern_id)
|
||
# print("Pipe Flow Region Patterns:", globals.pipe_flow_region_patterns)
|
||
# print("Source Outflow Region:", region_result)
|
||
# print('Source Outflow Region ID:', globals.source_outflow_region_id)
|
||
# print('Source Outflow Region Patterns:', globals.source_outflow_region_patterns)
|
||
# print("Non Realtime Region Patterns:", globals.non_realtime_region_patterns)
|
||
# print("Realtime Region Pipe Flow And Demand ID:", globals.realtime_region_pipe_flow_and_demand_id)
|
||
# print("Realtime Region Pipe Flow And Demand Patterns:", globals.realtime_region_pipe_flow_and_demand_patterns)
|
||
|
||
# dump_inp(name='bb', inp="sensor_placement.inp", version='2')
|
||
# 模拟示例1
|
||
# run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time='2025-02-25T23:45:00+08:00')
|
||
# 模拟示例2
|
||
# run_simulation(name='bb', simulation_type="extended", modify_pattern_start_time='2025-03-10T12:00:00+08:00',
|
||
# modify_total_duration=1800, scheme_type="burst_Analysis", scheme_name="scheme1")
|
||
|
||
# 查询示例1:query_SCADA_ID_corresponding_info
|
||
# result = query_SCADA_ID_corresponding_info(name='bb', SCADA_ID='P10755')
|
||
# print(result)
|