Compare commits

...

10 Commits

Author SHA1 Message Date
xinzish
32bbe3ddcd fix bug and refine ,end of 2025 2025-12-31 16:11:28 +08:00
xinzish
38fb35a333 Merge branch 'dingsu/szh' of https://e.coding.net/tjwater/tjwaterbackend/TJWaterServer into dingsu/szh 2025-08-16 09:21:05 +08:00
unknown
c15924b8e4 modify project name, change influxdb token, remove log simulation result 2025-08-15 19:00:01 +08:00
DingZQ
92e2122743 Refine 2025-08-15 14:59:36 +08:00
DingZQ
3b1a4b4b95 Refine 2025-08-15 14:50:58 +08:00
DingZQ
40f3ae02fb Remove logger: 2025-08-15 14:49:22 +08:00
DingZQ
065bcded8e Update document for time format 2025-08-14 20:19:50 +08:00
DingZQ
2ed288830f Update influxdb token 2025-08-14 19:59:20 +08:00
DingZQ
e4b1648041 Change bb to project_info.name 2025-08-13 18:12:05 +08:00
DingZQ
d745e6f011 Update Online_Analysis 2025-08-13 18:01:37 +08:00
33 changed files with 3244 additions and 216 deletions

11
.env Normal file
View File

@@ -0,0 +1,11 @@
DB_NAME=szh
DB_HOST=127.0.0.1
DB_PORT=5433
DB_USER=tjwater
DB_PASSWORD=Tjwater@123456
TIMESCALEDB_DB_NAME=szh
TIMESCALEDB_DB_HOST=127.0.0.1
TIMESCALEDB_DB_PORT=5435
TIMESCALEDB_DB_USER=tjwater
TIMESCALEDB_DB_PASSWORD=Tjwater@123456

View File

@@ -3,6 +3,7 @@ import auto_store_non_realtime_SCADA_data
import asyncio import asyncio
import influxdb_api import influxdb_api
import influxdb_info import influxdb_info
import project_info
# 为了让多个任务并发运行,我们可以用 asyncio.to_thread 分别启动它们 # 为了让多个任务并发运行,我们可以用 asyncio.to_thread 分别启动它们
async def main(): async def main():
@@ -16,8 +17,8 @@ if __name__ == "__main__":
token = influxdb_info.token token = influxdb_info.token
org_name = influxdb_info.org org_name = influxdb_info.org
influxdb_api.query_pg_scada_info_realtime('bb') influxdb_api.query_pg_scada_info_realtime(project_info.name)
influxdb_api.query_pg_scada_info_non_realtime('bb') influxdb_api.query_pg_scada_info_non_realtime(project_info.name)
# 用 asyncio 并发启动两个任务 # 用 asyncio 并发启动两个任务
asyncio.run(main()) asyncio.run(main())

View File

@@ -1,6 +1,6 @@
from .project import list_project, have_project, create_project, delete_project, clean_project from .project_backup import list_project, have_project, create_project, delete_project, clean_project
from .project import is_project_open, open_project, close_project from .project_backup import is_project_open, open_project, close_project
from .project import copy_project from .project_backup import copy_project
#DingZQ, 2024-12-28, convert inp v3 to v2 #DingZQ, 2024-12-28, convert inp v3 to v2
from .inp_in import read_inp, import_inp, convert_inp_v3_to_v2 from .inp_in import read_inp, import_inp, convert_inp_v3_to_v2

View File

@@ -1,6 +1,6 @@
import datetime import datetime
import os import os
from .project import * from .project_backup import *
from .database import ChangeSet, write from .database import ChangeSet, write
from .sections import * from .sections import *
from .s0_base import get_region_type from .s0_base import get_region_type

View File

@@ -1,5 +1,5 @@
import os import os
from .project import * from .project_backup import *
from .database import ChangeSet from .database import ChangeSet
from .sections import * from .sections import *
from .s1_title import inp_out_title from .s1_title import inp_out_title

36
api/postgresql_info.py Normal file
View File

@@ -0,0 +1,36 @@
from dotenv import load_dotenv
import os
load_dotenv()
pg_name = os.getenv("DB_NAME")
pg_host = os.getenv("DB_HOST")
pg_port = os.getenv("DB_PORT")
pg_user = os.getenv("DB_USER")
pg_password = os.getenv("DB_PASSWORD")
def get_pgconn_string(
db_name=pg_name,
db_host=pg_host,
db_port=pg_port,
db_user=pg_user,
db_password=pg_password,
):
"""返回 PostgreSQL 连接字符串"""
return f"dbname={db_name} host={db_host} port={db_port} user={db_user} password={db_password}"
def get_pg_config():
"""返回 PostgreSQL 配置变量的字典"""
return {
"name": pg_name,
"host": pg_host,
"port": pg_port,
"user": pg_user,
}
def get_pg_password():
"""返回密码(谨慎使用)"""
return pg_password

View File

@@ -2,33 +2,37 @@ import os
import psycopg as pg import psycopg as pg
from psycopg.rows import dict_row from psycopg.rows import dict_row
from .connection import g_conn_dict as conn from .connection import g_conn_dict as conn
from .postgresql_info import get_pgconn_string, get_pg_config, get_pg_password
# no undo/redo # no undo/redo
_server_databases = ['template0', 'template1', 'postgres', 'project'] _server_databases = ["template0", "template1", "postgres", "project"]
def list_project() -> list[str]: def list_project() -> list[str]:
ps = [] ps = []
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor(row_factory=dict_row) as cur: with conn.cursor(row_factory=dict_row) as cur:
for p in cur.execute(f"select datname from pg_database where datname <> 'postgres' and datname <> 'template0' and datname <> 'template1' and datname <> 'project'"): for p in cur.execute(
ps.append(p['datname']) f"select datname from pg_database where datname <> 'postgres' and datname <> 'template0' and datname <> 'template1' and datname <> 'project'"
):
ps.append(p["datname"])
return ps return ps
def have_project(name: str) -> bool: def have_project(name: str) -> bool:
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(f"select * from pg_database where datname = '{name}'") cur.execute(f"select * from pg_database where datname = '{name}'")
return cur.rowcount > 0 return cur.rowcount > 0
def copy_project(source: str, new: str) -> None: def copy_project(source: str, new: str) -> None:
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(f'create database "{new}" with template = {source}') cur.execute(f'create database "{new}" with template = {source}')
# 2025-02-07, WMH # 2025-02-07, WMH
# copyproject会把pg中operation这个表的全部内容也加进去我们实际项目运行一周后operation这个表会变得特别大导致CopyProject花费的时间很长CopyProjectEx把operation的在复制时没有一块复制过去节省时间 # copyproject会把pg中operation这个表的全部内容也加进去我们实际项目运行一周后operation这个表会变得特别大导致CopyProject花费的时间很长CopyProjectEx把operation的在复制时没有一块复制过去节省时间
class CopyProjectEx: class CopyProjectEx:
@@ -39,105 +43,116 @@ class CopyProjectEx:
connection.commit() connection.commit()
@staticmethod @staticmethod
def execute_pg_dump(hostname, source_db, exclude_table_list): def execute_pg_dump(source_db, exclude_table_list):
dump_command_structure = (
f'pg_dump -h {hostname} -F c -s -f source_db_structure.dump {source_db}' os.environ["PGPASSWORD"] = get_pg_password() # 设置密码环境变量
) pg_config = get_pg_config()
host = pg_config["host"]
port = pg_config["port"]
user = pg_config["user"]
dump_command_structure = f"pg_dump -h {host} -p {port} -U {user} -F c -s -f source_db_structure.dump {source_db}"
os.system(dump_command_structure) os.system(dump_command_structure)
if exclude_table_list is not None: if exclude_table_list is not None:
exclude_table = ' '.join(['-T {}'.format(i) for i in exclude_table_list]) exclude_table = " ".join(["-T {}".format(i) for i in exclude_table_list])
dump_command_db = ( dump_command_db = f"pg_dump -h {host} -p {port} -U {user} -F c -a {exclude_table} -f source_db.dump {source_db}"
f'pg_dump -h {hostname} -F c -a {exclude_table} -f source_db.dump {source_db}'
)
else: else:
dump_command_db = ( dump_command_db = f"pg_dump -h {host} -p {port} -U {user} -F c -a -f source_db.dump {source_db}"
f'pg_dump -h {hostname} -F c -a -f source_db.dump {source_db}'
)
os.system(dump_command_db) os.system(dump_command_db)
@staticmethod @staticmethod
def execute_pg_restore(hostname, new_db): def execute_pg_restore(new_db):
restore_command_structure = ( os.environ["PGPASSWORD"] = get_pg_password() # 设置密码环境变量
f'pg_restore -h {hostname} -d {new_db} source_db_structure.dump' pg_config = get_pg_config()
) host = pg_config["host"]
port = pg_config["port"]
user = pg_config["user"]
restore_command_structure = f"pg_restore -h {host} -p {port} -U {user} -d {new_db} source_db_structure.dump"
os.system(restore_command_structure) os.system(restore_command_structure)
restore_command_db = ( restore_command_db = (
f'pg_restore -h {hostname} -d {new_db} source_db.dump' f"pg_restore -h {host} -p {port} -U {user} -d {new_db} source_db.dump"
) )
os.system(restore_command_db) os.system(restore_command_db)
@staticmethod @staticmethod
def init_operation_table(connection, excluded_table): def init_operation_table(connection, excluded_table):
with connection.cursor() as cursor: with connection.cursor() as cursor:
if 'operation' in excluded_table: if "operation" in excluded_table:
insert_query \ insert_query = "insert into operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
= "insert into operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
cursor.execute(insert_query) cursor.execute(insert_query)
if 'current_operation' in excluded_table: if "current_operation" in excluded_table:
insert_query \ insert_query = "insert into current_operation (id) values (0)"
= "insert into current_operation (id) values (0)"
cursor.execute(insert_query) cursor.execute(insert_query)
if 'restore_operation' in excluded_table: if "restore_operation" in excluded_table:
insert_query \ insert_query = "insert into restore_operation (id) values (0)"
= "insert into restore_operation (id) values (0)"
cursor.execute(insert_query) cursor.execute(insert_query)
if 'batch_operation' in excluded_table: if "batch_operation" in excluded_table:
insert_query \ insert_query = "insert into batch_operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
= "insert into batch_operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
cursor.execute(insert_query) cursor.execute(insert_query)
if 'operation_table' in excluded_table: if "operation_table" in excluded_table:
insert_query \ insert_query = (
= "insert into operation_table (option) values ('operation')" "insert into operation_table (option) values ('operation')"
)
cursor.execute(insert_query) cursor.execute(insert_query)
connection.commit() connection.commit()
def __call__(self, source: str, new: str, excluded_table: [str] = None) -> None: def __call__(self, source: str, new_db: str, excluded_tables: [str] = None) -> None:
connection = pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) source_connection = pg.connect(conninfo=get_pgconn_string(), autocommit=True)
self.create_database(connection, new) self.create_database(source_connection, new_db)
self.execute_pg_dump('127.0.0.1', source, excluded_table)
self.execute_pg_restore('127.0.0.1', new)
connection = pg.connect(conninfo=f"dbname='{new}' host=127.0.0.1", autocommit=True) self.execute_pg_dump(source, excluded_tables)
self.init_operation_table(connection, excluded_table) self.execute_pg_restore(new_db)
source_connection.close()
new_db_connection = pg.connect(
conninfo=get_pgconn_string(db_name=new_db), autocommit=True
)
self.init_operation_table(new_db_connection, excluded_tables)
new_db_connection.close()
def create_project(name: str) -> None: def create_project(name: str) -> None:
return copy_project('project', name) return copy_project("project", name)
def delete_project(name: str) -> None: def delete_project(name: str) -> None:
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{name}'") cur.execute(
f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{name}'"
)
cur.execute(f'drop database "{name}"') cur.execute(f'drop database "{name}"')
def clean_project(excluded: list[str] = []) -> None: def clean_project(excluded: list[str] = []) -> None:
projects = list_project() projects = list_project()
with pg.connect(conninfo="dbname=postgres host=127.0.0.1", autocommit=True) as conn: with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor(row_factory=dict_row) as cur: with conn.cursor(row_factory=dict_row) as cur:
row = cur.execute(f"select current_database()").fetchone() row = cur.execute(f"select current_database()").fetchone()
if row != None: if row != None:
current_db = row['current_database'] current_db = row["current_database"]
if current_db in projects: if current_db in projects:
projects.remove(current_db) projects.remove(current_db)
for project in projects: for project in projects:
if project in _server_databases or project in excluded: if project in _server_databases or project in excluded:
continue continue
cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{project}'") cur.execute(
f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{project}'"
)
cur.execute(f'drop database "{project}"') cur.execute(f'drop database "{project}"')
def open_project(name: str) -> None: def open_project(name: str) -> None:
if name not in conn: if name not in conn:
conn[name] = pg.connect(conninfo=f"dbname={name} host=127.0.0.1", autocommit=True) conn[name] = pg.connect(
conninfo=get_pgconn_string(db_name=name), autocommit=True
)
def is_project_open(name: str) -> bool: def is_project_open(name: str) -> bool:
@@ -148,4 +163,3 @@ def close_project(name: str) -> None:
if name in conn: if name in conn:
conn[name].close() conn[name].close()
del conn[name] del conn[name]

152
api/project_backup.py Normal file
View File

@@ -0,0 +1,152 @@
import os
import psycopg as pg
from psycopg.rows import dict_row
from .connection import g_conn_dict as conn
from .postgresql_info import get_pgconn_string
# no undo/redo
_server_databases = ['template0', 'template1', 'postgres', 'project']
def list_project() -> list[str]:
ps = []
with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor(row_factory=dict_row) as cur:
for p in cur.execute(f"select datname from pg_database where datname <> 'postgres' and datname <> 'template0' and datname <> 'template1' and datname <> 'project'"):
ps.append(p['datname'])
return ps
def have_project(name: str) -> bool:
with pg.connect(conninfo=get_pgconn_string(db_name=name), autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"select * from pg_database where datname = '{name}'")
return cur.rowcount > 0
def copy_project(source: str, new: str) -> None:
with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f'create database "{new}" with template = {source}')
# 2025-02-07, WMH
# copyproject会把pg中operation这个表的全部内容也加进去我们实际项目运行一周后operation这个表会变得特别大导致CopyProject花费的时间很长CopyProjectEx把operation的在复制时没有一块复制过去节省时间
class CopyProjectEx:
@ staticmethod
def create_database(connection, new_db):
with connection.cursor() as cursor:
cursor.execute(f'create database "{new_db}"')
connection.commit()
@staticmethod
def execute_pg_dump(hostname, source_db, exclude_table_list):
dump_command_structure = (
f'pg_dump -h {hostname} -F c -s -f source_db_structure.dump {source_db}'
)
os.system(dump_command_structure)
if exclude_table_list is not None:
exclude_table = ' '.join(['-T {}'.format(i) for i in exclude_table_list])
dump_command_db = (
f'pg_dump -h {hostname} -F c -a {exclude_table} -f source_db.dump {source_db}'
)
else:
dump_command_db = (
f'pg_dump -h {hostname} -F c -a -f source_db.dump {source_db}'
)
os.system(dump_command_db)
@staticmethod
def execute_pg_restore(hostname, new_db):
restore_command_structure = (
f'pg_restore -h {hostname} -d {new_db} source_db_structure.dump'
)
os.system(restore_command_structure)
restore_command_db = (
f'pg_restore -h {hostname} -d {new_db} source_db.dump'
)
os.system(restore_command_db)
@staticmethod
def init_operation_table(connection, excluded_table):
with connection.cursor() as cursor:
if 'operation' in excluded_table:
insert_query \
= "insert into operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
cursor.execute(insert_query)
if 'current_operation' in excluded_table:
insert_query \
= "insert into current_operation (id) values (0)"
cursor.execute(insert_query)
if 'restore_operation' in excluded_table:
insert_query \
= "insert into restore_operation (id) values (0)"
cursor.execute(insert_query)
if 'batch_operation' in excluded_table:
insert_query \
= "insert into batch_operation (id, redo, undo, redo_cs, undo_cs) values (0, '', '', '', '')"
cursor.execute(insert_query)
if 'operation_table' in excluded_table:
insert_query \
= "insert into operation_table (option) values ('operation')"
cursor.execute(insert_query)
connection.commit()
def __call__(self, source: str, new: str, excluded_table: [str] = None) -> None:
connection = pg.connect(conninfo=get_pgconn_string(), autocommit=True)
self.create_database(connection, new)
self.execute_pg_dump('127.0.0.1', source, excluded_table)
self.execute_pg_restore('127.0.0.1', new)
connection = pg.connect(conninfo=get_pgconn_string(db_name=new), autocommit=True)
self.init_operation_table(connection, excluded_table)
def create_project(name: str) -> None:
return copy_project('project', name)
def delete_project(name: str) -> None:
with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{name}'")
cur.execute(f'drop database "{name}"')
def clean_project(excluded: list[str] = []) -> None:
projects = list_project()
with pg.connect(conninfo=get_pgconn_string(), autocommit=True) as conn:
with conn.cursor(row_factory=dict_row) as cur:
row = cur.execute(f"select current_database()").fetchone()
if row != None:
current_db = row['current_database']
if current_db in projects:
projects.remove(current_db)
for project in projects:
if project in _server_databases or project in excluded:
continue
cur.execute(f"select pg_terminate_backend(pid) from pg_stat_activity where datname = '{project}'")
cur.execute(f'drop database "{project}"')
def open_project(name: str) -> None:
if name not in conn:
conn[name] = pg.connect(conninfo=get_pgconn_string(db_name=name), autocommit=True)
def is_project_open(name: str) -> bool:
return name in conn
def close_project(name: str) -> None:
if name in conn:
conn[name].close()
del conn[name]

View File

@@ -1,6 +1,6 @@
import os import os
import ctypes import ctypes
from .project import have_project from .project_backup import have_project
from .inp_out import dump_inp from .inp_out import dump_inp
def calculate_service_area(name: str) -> list[dict[str, list[str]]]: def calculate_service_area(name: str) -> list[dict[str, list[str]]]:

2408
api_ex/burst_locate_SCADA.py Normal file

File diff suppressed because it is too large Load Diff

109
api_ex/kmeans_sensor.py Normal file
View File

@@ -0,0 +1,109 @@
import wntr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sklearn.cluster
import os
class QD_KMeans(object):
def __init__(self, wn, num_monitors):
# self.inp = inp
self.cluster_num = num_monitors # 聚类中心个数,也即测压点个数
self.wn=wn
self.monitor_nodes = []
self.coords = []
self.junction_nodes = {} # Added missing initialization
def get_junctions_coordinates(self):
for junction_name in self.wn.junction_name_list:
junction = self.wn.get_node(junction_name)
self.junction_nodes[junction_name] = junction.coordinates
self.coords.append(junction.coordinates )
# print(f"Total junctions: {self.junction_coordinates}")
def select_monitoring_points(self):
if not self.coords: # Add check if coordinates are collected
self.get_junctions_coordinates()
coords = np.array(self.coords)
coords_normalized = (coords - coords.min(axis=0)) / (coords.max(axis=0) - coords.min(axis=0))
kmeans = sklearn.cluster.KMeans(n_clusters= self.cluster_num, random_state=42)
kmeans.fit(coords_normalized)
for center in kmeans.cluster_centers_:
distances = np.sum((coords_normalized - center) ** 2, axis=1)
nearest_node = self.wn.junction_name_list[np.argmin(distances)]
self.monitor_nodes.append(nearest_node)
return self.monitor_nodes
def visualize_network(self):
"""Visualize network with monitoring points"""
ax=wntr.graphics.plot_network(self.wn,
node_attribute=self.monitor_nodes,
node_size=30,
title='Optimal sensor')
plt.show()
def kmeans_sensor_placement(name: str, sensor_num: int, min_diameter: int) -> list:
inp_name = f'./db_inp/{name}.db.inp'
wn= wntr.network.WaterNetworkModel(inp_name)
wn_cluster=QD_KMeans(wn, sensor_num)
# Select monitoring pointse
sensor_ids= wn_cluster.select_monitoring_points()
# wn_cluster.visualize_network()
return sensor_ids
if __name__ == "__main__":
#sensorindex = get_ID(name='suzhouhe_2024_cloud_0817', sensor_num=30, min_diameter=500)
sensorindex = kmeans_sensor_placement(name='szh', sensor_num=50, min_diameter=300)
print(sensorindex)

View File

@@ -10,6 +10,7 @@ import shutil
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
import simulation import simulation
import influxdb_info import influxdb_info
import project_info
def setup_logger(): def setup_logger():
# 创建日志目录 # 创建日志目录
@@ -103,19 +104,19 @@ def run_simulation_job() -> None:
if current_time.minute % 15 == 0: if current_time.minute % 15 == 0:
print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.") print(f"{current_time.strftime('%Y-%m-%d %H:%M:%S')} -- Start simulation task.")
# 计算前获取scada_info中的信息按照设定的方法修改pg数据库 # 计算前获取scada_info中的信息按照设定的方法修改pg数据库
simulation.query_corresponding_element_id_and_query_id("bb") simulation.query_corresponding_element_id_and_query_id(project_info.name)
simulation.query_corresponding_pattern_id_and_query_id('bb') simulation.query_corresponding_pattern_id_and_query_id(project_info.name)
region_result = simulation.query_non_realtime_region('bb') region_result = simulation.query_non_realtime_region(project_info.name)
globals.source_outflow_region_id = simulation.get_source_outflow_region_id('bb', region_result) globals.source_outflow_region_id = simulation.get_source_outflow_region_id(project_info.name, region_result)
globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id('bb', region_result) globals.realtime_region_pipe_flow_and_demand_id = simulation.query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result)
globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns('bb') globals.pipe_flow_region_patterns = simulation.query_pipe_flow_region_patterns(project_info.name)
globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns('bb', region_result) globals.non_realtime_region_patterns = simulation.query_non_realtime_region_patterns(project_info.name, region_result)
globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns('bb', globals.source_outflow_region_patterns, realtime_region_pipe_flow_and_demand_patterns = simulation.get_realtime_region_patterns(project_info.name,
globals.source_outflow_region_id, globals.source_outflow_region_id,
globals.realtime_region_pipe_flow_and_demand_id) globals.realtime_region_pipe_flow_and_demand_id)
modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点 modify_pattern_start_time: str = get_next_15minute_time() # 获取下一个15分钟时间点
# print(modify_pattern_start_time) # print(modify_pattern_start_time)
simulation.run_simulation(name='bb', simulation_type="realtime", modify_pattern_start_time=modify_pattern_start_time) simulation.run_simulation(name=project_info.name, simulation_type="realtime", modify_pattern_start_time=modify_pattern_start_time)
logger.info('{} -- Successfully run simulation and store realtime simulation result.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))) logger.info('{} -- Successfully run simulation and store realtime simulation result.'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
else: else:
@@ -150,6 +151,6 @@ if __name__ == "__main__":
client = InfluxDBClient(url=url, token=token) client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里 # step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_realtime('bb') influxdb_api.query_pg_scada_info_realtime(project_info.name)
# 自动执行 # 自动执行
realtime_task() realtime_task()

View File

@@ -8,6 +8,7 @@ from logging.handlers import TimedRotatingFileHandler
import time import time
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
import influxdb_info import influxdb_info
import project_info
def setup_logger(): def setup_logger():
# 创建日志目录 # 创建日志目录
@@ -133,6 +134,6 @@ if __name__ == "__main__":
client = InfluxDBClient(url=url, token=token) client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里 # step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_non_realtime('bb') influxdb_api.query_pg_scada_info_non_realtime(project_info.name)
# 自动执行 # 自动执行
store_non_realtime_SCADA_data_task() store_non_realtime_SCADA_data_task()

6
build_pyd_singelfile.py Normal file
View File

@@ -0,0 +1,6 @@
from distutils.core import setup
from Cython.Build import cythonize
setup(ext_modules=cythonize([
"api/project.py"
]))

View File

@@ -0,0 +1,85 @@
#!/usr/bin/env python3
from pathlib import Path
import re
inp = Path(r"d:\TJWaterServer\epanet\szhskeleton-patternfixed-ascii.inp")
out = Path(r"d:\TJWaterServer\epanet\szhskeleton-patternfixed-ascii-fixed2.inp")
mapout = Path(r"d:\TJWaterServer\epanet\szhskeleton-patternfixed-ascii-fixed2.mapping.txt")
text = inp.read_text(encoding='utf-8')
lines = text.splitlines()
# find [VALVES] start and end
start = None
for i,l in enumerate(lines):
if l.strip().upper() == '[VALVES]':
start = i
break
if start is None:
print('No [VALVES] section found')
raise SystemExit(1)
end = len(lines)
for j in range(start+1, len(lines)):
if re.match(r"^\s*\[.+\]", lines[j]):
end = j
break
# collect valve lines with their absolute numbers
valve_entries = [] # (absolute_line_index, token, line)
for idx in range(start+1, end):
l = lines[idx]
if not l.strip() or l.strip().startswith(';'):
continue
tok = l.split()[0]
valve_entries.append((idx, tok, l))
from collections import defaultdict
positions = defaultdict(list)
for ln, tok, l in valve_entries:
positions[tok].append(ln)
# find duplicates
dups = {tok:lns for tok,lns in positions.items() if len(lns)>1}
print('Found', sum(1 for _ in valve_entries), 'valve entries; duplicates:', len(dups))
replacements = [] # (line_index, old, new)
counter = 1
for tok, lns in dups.items():
# skip first occurrence, rename others
for occ_index, ln in enumerate(lns):
if occ_index == 0:
continue
# produce new name: prefix V if starts with digit
if re.fullmatch(r"\d+", tok) or re.match(r"^\d", tok):
base = 'V' + tok
else:
base = tok
new = f'{base}_{occ_index}'
# ensure uniqueness globally
while any(rn == new for _,_,rn in replacements) or any(new == t for t in positions.keys()):
counter += 1
new = f'{base}_{occ_index}_{counter}'
replacements.append((ln, tok, new))
# Apply replacements on the given absolute lines
for ln, old, new in replacements:
line = lines[ln]
# replace only first token occurrence
parts = line.split()
if parts:
# find start of token in line (preserve spacing)
m = re.search(re.escape(parts[0]), line)
if m:
startpos = m.start()
endpos = m.end()
newline = line[:startpos] + new + line[endpos:]
lines[ln] = newline
# write new file
out.write_text('\n'.join(lines) + '\n', encoding='utf-8')
# write mapping
with mapout.open('w', encoding='utf-8') as f:
for ln, old, new in replacements:
f.write(f'line {ln+1}: {old} -> {new}\n')
print('Wrote', out, 'with', len(replacements), 'replacements; mapping at', mapout)

View File

@@ -9,7 +9,7 @@ import subprocess
import logging import logging
from typing import Any from typing import Any
sys.path.append("..") sys.path.append("..")
from api import project from api import project_backup
from api import inp_out from api import inp_out
@@ -243,7 +243,7 @@ def dump_output_binary(path: str) -> str:
#DingZQ, 2025-02-04, 返回dict[str, Any] #DingZQ, 2025-02-04, 返回dict[str, Any]
def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]: def run_project_return_dict(name: str, readable_output: bool = False) -> dict[str, Any]:
if not project.have_project(name): if not project_backup.have_project(name):
raise Exception(f'Not found project [{name}]') raise Exception(f'Not found project [{name}]')
dir = os.path.abspath(os.getcwd()) dir = os.path.abspath(os.getcwd())
@@ -276,7 +276,7 @@ def run_project_return_dict(name: str, readable_output: bool = False) -> dict[st
# original code # original code
def run_project(name: str, readable_output: bool = False) -> str: def run_project(name: str, readable_output: bool = False) -> str:
if not project.have_project(name): if not project_backup.have_project(name):
raise Exception(f'Not found project [{name}]') raise Exception(f'Not found project [{name}]')
dir = os.path.abspath(os.getcwd()) dir = os.path.abspath(os.getcwd())
@@ -315,7 +315,7 @@ def run_project(name: str, readable_output: bool = False) -> str:
data['output'] = dump_output_binary(opt) data['output'] = dump_output_binary(opt)
data['report'] = dump_report(rpt) data['report'] = dump_report(rpt)
logging.info(f"Report: {data['report']}") #logging.info(f"Report: {data['report']}")
return json.dumps(data) return json.dumps(data)

Binary file not shown.

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""
Fix non-ASCII ID tokens in an EPANET .inp file by mapping each unique non-ASCII-containing token
to an ASCII-safe name. Outputs a new INP and a mapping file for review.
Usage: python fix_inp_nonascii.py input.inp [output.inp]
"""
import re
import sys
from pathlib import Path
if len(sys.argv) < 2:
print("Usage: python fix_inp_nonascii.py input.inp [output.inp]")
sys.exit(2)
src = Path(sys.argv[1])
if len(sys.argv) > 2:
dst = Path(sys.argv[2])
else:
dst = src.with_name(src.stem + '-ascii' + src.suffix)
text = src.read_text(encoding='utf-8')
# Find tokens that contain at least one non-ASCII char. Token = contiguous non-whitespace sequence
nonascii_tokens = set(re.findall(r"\S*[^\x00-\x7F]\S*", text))
if not nonascii_tokens:
print("No non-ASCII tokens found. Copying source to destination unchanged.")
dst.write_text(text, encoding='utf-8')
sys.exit(0)
used = set()
mapping = {}
counter = 1
# Sort tokens to get deterministic output
for t in sorted(nonascii_tokens):
# build ASCII prefix from characters that are safe (alnum, underscore, hyphen)
prefix = ''.join(ch for ch in t if ord(ch) < 128 and (ch.isalnum() or ch in '_-'))
if not prefix:
prefix = 'ID'
candidate = prefix
# ensure candidate is unique and not equal to original token
while candidate in used:
candidate = f"{prefix}_x{counter}"
counter += 1
# if candidate accidentally equals the original token (rare), force suffix
if candidate == t:
candidate = f"{prefix}_x{counter}"
counter += 1
mapping[t] = candidate
used.add(candidate)
# Replace occurrences safely using regex word boundary style (escape token)
new_text = text
for src_token, dst_token in mapping.items():
# replace exact matches (no partial). Use lookarounds: not part of larger non-whitespace.
pattern = re.escape(src_token)
new_text = re.sub(pattern, dst_token, new_text)
# Write output files
dst.write_text(new_text, encoding='utf-8')
mapfile = dst.with_suffix(dst.suffix + '.mapping.txt')
with mapfile.open('w', encoding='utf-8') as f:
for k, v in mapping.items():
f.write(f"{k} -> {v}\n")
print(f"Wrote: {dst}\nMapping: {mapfile}\nReplaced {len(mapping)} non-ASCII tokens.")

144
epanet/fix_valve_ids.py Normal file
View File

@@ -0,0 +1,144 @@
#!/usr/bin/env python3
import re
from pathlib import Path
inp = Path(r"d:\TJWaterServer\epanet\szhskeleton-patternfixed-ascii.inp")
mapf = Path(r"d:\TJWaterServer\epanet\szhskeleton-patternfixed-ascii.inp.mapping.txt")
out = Path(r"d:\TJWaterServer\epanet\szhskeleton-patternfixed-ascii-fixed.inp")
outmap = out.with_suffix(out.suffix + '.mapping.txt')
text = inp.read_text(encoding='utf-8')
# parse mapping file (original -> mapped)
map_original_to_mapped = {}
if mapf.exists():
for line in mapf.read_text(encoding='utf-8').splitlines():
if '->' in line:
a,b = line.split('->',1)
map_original_to_mapped[a.strip()] = b.strip()
# find [VALVES] block
m = re.search(r"(?mi)^\[VALVES\]\s*(?:;.*\n)?(.*?)(?=^\[|\Z)", text, flags=re.S|re.M)
if not m:
print('No [VALVES] section found')
raise SystemExit(1)
block = m.group(1)
# extract IDs (first non-empty token at start of each non-comment line)
ids = []
line_offsets = []
lines = block.splitlines()
for i,l in enumerate(lines):
if not l.strip() or l.strip().startswith(';'):
continue
# split by whitespace
toks = l.split()
if toks:
ids.append(toks[0])
line_offsets.append((i, l))
# find duplicates
from collections import defaultdict
count = defaultdict(list)
for idx, token in enumerate(ids):
count[token].append(idx)
dups = {k:v for k,v in count.items() if len(v)>1}
print(f'Found {len(ids)} valve IDs; {len(dups)} duplicates')
for k,v in list(dups.items())[:40]:
print(k, 'occurs', len(v), 'times')
# Also find mapped collisions: multiple originals mapped to same mapped token
mapped_rev = defaultdict(list)
for orig,mapped in map_original_to_mapped.items():
mapped_rev[mapped].append(orig)
collisions = {m:origlist for m,origlist in mapped_rev.items() if len(origlist)>1}
print('\nMapped collisions (same mapped token from multiple originals):', len(collisions))
for m,ol in list(collisions.items())[:40]:
print(m, ' <- ', ol[:5])
# We'll fix any ID that is purely digits, or any duplicate ID in the valves block.
fixed_map = {} # oldToken -> newToken
used = set(ids) # existing tokens in valves
suffix_counter = 1
for token, positions in dups.items():
# choose new unique names for subsequent occurrences (leave first occurrence as-is)
for pos_index, occ in enumerate(positions):
if pos_index == 0:
continue
base = token
# if base is all digits or starts with digit, prefix with VAL_
if re.fullmatch(r"\d+", base) or re.match(r"^\d", base):
candidate = f'VAL_{base}'
else:
candidate = f'{base}_dup'
# ensure uniqueness
while candidate in used:
candidate = f'{candidate}_{suffix_counter}'
suffix_counter += 1
used.add(candidate)
fixed_map[token + f'__occ{pos_index}'] = candidate
# The above approach requires us to identify which exact occurrence to replace. We'll instead build a replacement pass that replaces only the Nth occurrence.
# Build per-token occurrence numbers to replace subsequent ones.
occ_to_new = {} # (token, occ_index) -> newname
for token, positions in dups.items():
for pos_index, occ in enumerate(positions):
if pos_index == 0:
continue
if re.fullmatch(r"\d+", token) or re.match(r"^\d", token):
candidate = f'VAL_{token}'
else:
candidate = f'{token}_dup'
while candidate in used:
candidate = f'{candidate}_{suffix_counter}'
suffix_counter += 1
used.add(candidate)
occ_to_new[(token, pos_index)] = candidate
# Now construct new block replacing the Nth occurrence of duplicates token
new_lines = []
occ_seen = defaultdict(int)
for l in lines:
if not l.strip() or l.strip().startswith(';'):
new_lines.append(l)
continue
toks = l.split()
token = toks[0]
occ_seen[token] += 1
occ_idx = occ_seen[token]-1
if (token, occ_idx) in occ_to_new:
new_token = occ_to_new[(token, occ_idx)]
# replace only the first token in the line
rest = l[len(l.lstrip()):]
# reconstruct preserving leading whitespace
leading = l[:len(l)-len(l.lstrip())]
# find start index of token in line
m2 = re.match(r"(\s*)" + re.escape(token), l)
if m2:
leading = m2.group(1)
new_line = leading + new_token + l[m2.end():]
new_lines.append(new_line)
# record mapping for global replacement
fixed_map[token + f'__occ{occ_idx}'] = new_token
else:
new_lines.append(l)
# write new file by replacing block
new_block = '\n'.join(new_lines) + '\n'
new_text = text[:m.start(1)] + new_block + text[m.end(1):]
out.write_text(new_text, encoding='utf-8')
# Create an updated mapping file: show which tokens were changed and why
with outmap.open('w', encoding='utf-8') as f:
f.write('Changes applied to fix duplicate valve IDs:\n')
for k,v in occ_to_new.items():
token, occ = k
f.write(f'{token} occurrence {occ} -> {v}\n')
f.write('\nNote: These replacements are only for valve ID occurrences beyond the first.\n')
print('Wrote', out, 'and mapping', outmap)
print('Replacements:', len(occ_to_new))
print('If you want different naming (e.g. prefix with V_), rerun with that preference.')

65
epanet/fix_valve_ids2.py Normal file
View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python3
from pathlib import Path
import re
inp = Path(r"d:\TJWaterServer\epanet\szhskeleton-patternfixed-ascii.inp")
text = inp.read_text(encoding='utf-8')
lines = text.splitlines()
start = None
for i,l in enumerate(lines):
if l.strip().upper() == '[VALVES]':
start = i
break
if start is None:
print('No [VALVES] section found')
raise SystemExit(1)
# collect until next section header or EOF
end = len(lines)
for j in range(start+1, len(lines)):
if re.match(r"^\s*\[.+\]", lines[j]):
end = j
break
block_lines = lines[start+1:end]
ids = []
for idx,l in enumerate(block_lines, start=start+1):
if not l.strip() or l.strip().startswith(';'):
continue
# first token
tok = l.split()[0]
ids.append((idx, tok, l))
from collections import defaultdict
count = defaultdict(list)
for ln, tok, l in ids:
count[tok].append(ln)
dups = {k:v for k,v in count.items() if len(v)>1}
print('Total valve entries found:', len(ids))
print('Duplicate token count:', len(dups))
if dups:
print('\nSample duplicates:')
for k,v in list(dups.items())[:20]:
print(k, 'lines:', v)
# show whether tokens are purely digits
num_only = [tok for ln,tok,l in ids if re.fullmatch(r'\d+', tok)]
print('\nNumeric-only valve IDs count:', len(num_only))
# show examples of numeric-only
if num_only:
print('Examples:', num_only[:20])
# write a short report
rep = inp.with_name(inp.stem + '-valves-report.txt')
with rep.open('w', encoding='utf-8') as f:
f.write(f'Total valve entries: {len(ids)}\n')
f.write(f'Duplicate tokens: {len(dups)}\n')
for k,v in dups.items():
f.write(f'{k}: lines {v}\n')
f.write('\nNumeric-only tokens:\n')
for tok in sorted(set(num_only)):
f.write(tok + '\n')
print('Wrote report to', rep)

Binary file not shown.

View File

@@ -17,6 +17,7 @@ import pandas as pd
import openpyxl import openpyxl
import pytz import pytz
import influxdb_info import influxdb_info
import project_info
import time_api import time_api
# influxdb数据库连接信息 # influxdb数据库连接信息

View File

@@ -1,7 +1,8 @@
# influxdb数据库连接信息 # influxdb数据库连接信息
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址 url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Vq8F5tzxqmjH6JYPBP5xqwo6nJbzRqCnahlcoMVyZGMPm3H92swD08VX-5lTH1laN_JG1x7EZ80WOQoycanmBw==" # 替换为你的InfluxDB Token #token = "Vq8F5tzxqmjH6JYPBP5xqwo6nJbzRqCnahlcoMVyZGMPm3H92swD08VX-5lTH1laN_JG1x7EZ80WOQoycanmBw==" # 替换为你的InfluxDB Token
token = "blgSuYzqnybW7nxVghtyuA7Ma45lY6raHNXPwHYB2gZaTwrE4xgxIqQoJ2xY6_kahGwai83WVb4V4juRrmfHqg=="
# _ENCODED_TOKEN = "eEdETTVSWnFSSkF1ekFHUy1vdFhVZEMyTkZkWTc1cUpBalJMcUFCNHA1V2NJSUFsSVVwT3BUOF95QTE2QU9IbUpXZXJ3UV8wOGd3Yjg0c3k0MmpuWlE9PQ==" # _ENCODED_TOKEN = "eEdETTVSWnFSSkF1ekFHUy1vdFhVZEMyTkZkWTc1cUpBalJMcUFCNHA1V2NJSUFsSVVwT3BUOF95QTE2QU9IbUpXZXJ3UV8wOGd3Yjg0c3k0MmpuWlE9PQ=="
# token = base64.b64decode(_ENCODED_TOKEN).decode("utf-8") # token = base64.b64decode(_ENCODED_TOKEN).decode("utf-8")
org = "TJWATERORG" # 替换为你的Organization名称 org = "TJWATERORG" # 替换为你的Organization名称

104
main.py
View File

@@ -35,6 +35,7 @@ from fastapi import FastAPI, APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from fastapi import FastAPI, Depends, HTTPException, Header from fastapi import FastAPI, Depends, HTTPException, Header
from typing import Annotated from typing import Annotated
import project_info
JUNCTION = 0 JUNCTION = 0
RESERVOIR = 1 RESERVOIR = 1
@@ -143,44 +144,6 @@ redis_client = redis.Redis(host="localhost", port=6379, db=0)
# influx_org_name = influxdb_info.org # influx_org_name = influxdb_info.org
# influx_client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org_name, timeout=100*1000) # 100 seconds # influx_client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org_name, timeout=100*1000) # 100 seconds
def setup_logger():
# 创建日志目录
log_dir = "logs"
os.makedirs(log_dir, exist_ok=True)
# 配置基础日志格式
log_format = "%(asctime)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(log_format)
# 创建主 Logger
logger = logging.getLogger()
logger.setLevel(logging.INFO) # 全局日志级别
# --- 1. 按日期分割的日志文件 Handler ---
log_file = os.path.join(log_dir, "fastapi.log")
file_handler = TimedRotatingFileHandler(
filename=log_file,
when="midnight", # 每天午夜轮转
interval=1,
backupCount=7,
encoding="utf-8"
)
file_handler.suffix = "fastapi-%Y-%m-%d.log" # 文件名格式
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO) # 文件记录所有级别日志
# --- 2. 控制台实时输出 Handler ---
console_handler = logging.StreamHandler() # 默认输出到 sys.stderr (控制台)
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO) # 控制台仅显示 INFO 及以上级别
# 将 Handler 添加到 Logger
logger.addHandler(file_handler)
# logger.addHandler(console_handler)
return logger
logger = setup_logger()
# 配置 CORS 中间件 # 配置 CORS 中间件
app.add_middleware( app.add_middleware(
@@ -196,62 +159,8 @@ lock_simulation = Value('i', 0)
app.add_middleware(GZipMiddleware, minimum_size=1000) app.add_middleware(GZipMiddleware, minimum_size=1000)
logger = logging.getLogger()
async def receive_with_body(body: bytes) -> Receive: logger.setLevel(logging.INFO)
async def receive() -> dict:
return {
"type": "http.request",
"body": body,
"more_body": False,
}
return receive
@app.middleware("http")
async def log_requests(request: Request, call_next):
if request.url.path == "/favicon.ico":
return Response(status_code=204)
# 记录接收请求的时间
request_time = time.time()
request_time_str = datetime.fromtimestamp(request_time).strftime('%Y-%m-%d %H:%M:%S')
# 判断是否为文件上传
is_file_upload = request.headers.get("content-type", "").startswith("multipart/form-data")
# 记录接收的请求数据
logger.info(f"Received request: {request.method} {request.url} at {request_time_str}")
if not is_file_upload:
request_body = await request.body()
logger.info(f"Request body: {request_body.decode('utf-8')}")
# 创建新的 Request 对象,传递缓存的请求体
receive = await receive_with_body(request_body)
request = Request(request.scope, receive=receive)
else:
logger.info(f"Request body: File")
# 处理请求
response = await call_next(request)
# 记录发送响应的时间
response_time = time.time()
response_time_str = datetime.fromtimestamp(response_time).strftime('%Y-%m-%d %H:%M:%S')
processing_time = response_time - request_time
# 记录发送的响应数据
# response_body = b""
# async for chunk in response.body_iterator:
# response_body += chunk
# 记录响应的状态码以及是否成功
success_status = response.status_code < 400
logger.info(f"Response status: {response.status_code} at {response_time_str}, success: {success_status}")
# logging.info(f"Response body: {response_body.decode('utf-8')}")
logger.info(f"Processing time: {processing_time:.3f} seconds")
return response
@app.on_event("startup") @app.on_event("startup")
async def startup_db(): async def startup_db():
@@ -261,7 +170,7 @@ async def startup_db():
logger.info('**********************************************************') logger.info('**********************************************************')
# open 'szh' by default # open 'szh' by default
open_project("szh") open_project(project_info.name)
############################################################ ############################################################
# auth # auth
@@ -2914,8 +2823,9 @@ async def fastapi_run_simulation_manually_by_date(data: Run_Simulation_Manually_
thread.start() thread.start()
# thread.join() # thread.join()
matched_keys = redis_client.keys(f"*{item['simulation_date']}*") # DingZQ 08152025
redis_client.delete(*matched_keys) # matched_keys = redis_client.keys(f"*{item['simulation_date']}*")
# redis_client.delete(*matched_keys)
############################################################ ############################################################

View File

@@ -1,6 +1,6 @@
import os import os
from tjnetwork import * from tjnetwork import *
from api.project import CopyProjectEx from api.project_backup import CopyProjectEx
from run_simulation import run_simulation_ex, from_clock_to_seconds_2 from run_simulation import run_simulation_ex, from_clock_to_seconds_2
from math import sqrt, pi from math import sqrt, pi
from epanet.epanet import Output from epanet.epanet import Output
@@ -705,7 +705,7 @@ def network_update(file_path: str) -> None:
print(f"history_patterns_flows文件存在开始处理...") print(f"history_patterns_flows文件存在开始处理...")
# 连接到 PostgreSQL 数据库(这里是数据库 "bb" # 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect("dbname=bb host=127.0.0.1") as conn: with psycopg.connect(f"dbname={project_info.name} host=127.0.0.1") as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
with open(csv_path, newline='', encoding='utf-8-sig') as csvfile: with open(csv_path, newline='', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile) reader = csv.DictReader(csvfile)
@@ -1082,10 +1082,10 @@ if __name__ == '__main__':
# f.write(str_dump) # f.write(str_dump)
# 更新inp文件并插入history_patterns_flows # 更新inp文件并插入history_patterns_flows
network_update('suzhouhe-202505.inp') network_update('fx0217-mass injection.inp')
# 更新scada_info文件 # 更新scada_info文件
# submit_scada_info('bb', '4490') submit_scada_info(project_info.name, '4490')
# 示例scheme_name_exists # 示例scheme_name_exists
# if scheme_name_exists(name='bb', scheme_name='burst_scheme'): # if scheme_name_exists(name='bb', scheme_name='burst_scheme'):
@@ -1098,24 +1098,25 @@ if __name__ == '__main__':
# burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_Name='GSD230112144241FA18292A84CB_400') # burst_ID='GSD230112144241FA18292A84CB', burst_size=400, modify_total_duration=1800, scheme_Name='GSD230112144241FA18292A84CB_400')
# 示例create_user # 示例create_user
create_user(name='szh', username='admin', password='123456') create_user(name=project_info.name, username='admin', password='123456')
# 示例delete_user # 示例delete_user
# delete_user(name='bb', username='admin_test') delete_user(name=project_info.name, username='admin_test')
# 示例query_scheme_list # 示例query_scheme_list
# result = query_scheme_list(name='bb') result = query_scheme_list(name=project_info.name)
# print(result) print(result)
# 示例delete_scheme_info # 示例delete_scheme_info
# delete_scheme_info(name='bb', scheme_name='burst_scheme') delete_scheme_info(name=project_info.name, scheme_name='burst_scheme')
# 示例upload_shp_to_pg # 示例upload_shp_to_pg
upload_shp_to_pg(name='bb', table_name='GIS_pipe', role='86158', shp_file_path='市政管线.shp') # 这里的role是 电脑的用户名,服务器上是 Administrator
upload_shp_to_pg(name=project_info.name, table_name='GIS_pipe', role='Administrator', shp_file_path='市政管线.shp')
# 示例submit_risk_probability_result # 示例submit_risk_probability_result
# submit_risk_probability_result(name='bb', result_file_path='./北碚市政管线风险评价结果.xlsx') submit_risk_probability_result(name=project_info.name, result_file_path='./北碚市政管线风险评价结果.xlsx')
# 示例pressure_sensor_placement_sensitivity # 示例pressure_sensor_placement_sensitivity
# pressure_sensor_placement_sensitivity(name='bb', scheme_name='20250517', sensor_number=10, min_diameter=300, username='admin') pressure_sensor_placement_sensitivity(name=project_info.name, scheme_name='20250517', sensor_number=10, min_diameter=300, username='admin')

View File

@@ -1,7 +1 @@
name='szh'
# influxdb数据库连接信息
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Vq8F5tzxqmjH6JYPBP5xqwo6nJbzRqCnahlcoMVyZGMPm3H92swD08VX-5lTH1laN_JG1x7EZ80WOQoycanmBw==" # 替换为你的InfluxDB Token
# _ENCODED_TOKEN = "eEdETTVSWnFSSkF1ekFHUy1vdFhVZEMyTkZkWTc1cUpBalJMcUFCNHA1V2NJSUFsSVVwT3BUOF95QTE2QU9IbUpXZXJ3UV8wOGd3Yjg0c3k0MmpuWlE9PQ=="
# token = base64.b64decode(_ENCODED_TOKEN).decode("utf-8")
org = "TJWATERORG" # 替换为你的Organization名称

18
run_server.py Normal file
View File

@@ -0,0 +1,18 @@
import asyncio
import sys
import uvicorn
if __name__ == "__main__":
# Windows 设置事件循环策略
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
# 用 uvicorn.run 支持 workers 参数
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
workers=2, # 这里可以设置多进程
loop="asyncio",
)

View File

@@ -8,7 +8,7 @@ import json
import pytz import pytz
import requests import requests
import time import time
import project_info
url_path = 'http://10.101.15.16:9000/loong' # 内网 url_path = 'http://10.101.15.16:9000/loong' # 内网
# url_path = 'http://183.64.62.100:9057/loong' # 外网 # url_path = 'http://183.64.62.100:9057/loong' # 外网
@@ -848,7 +848,7 @@ if __name__ == '__main__':
# run_simulation("beibeizone","2024-04-01T08:00:00Z") # run_simulation("beibeizone","2024-04-01T08:00:00Z")
# read_inp('bb_server', 'model20_en.inp') # read_inp('bb_server', 'model20_en.inp')
run_simulation_ex( run_simulation_ex(
name='bb', simulation_type='extended', start_datetime='2024-11-09T02:30:00Z', name=project_info.name, simulation_type='extended', start_datetime='2024-11-09T02:30:00Z',
# end_datetime='2024-05-30T16:00:00Z', # end_datetime='2024-05-30T16:00:00Z',
# duration=0, # duration=0,
# pump_control={'PU00006': [45, 40]} # pump_control={'PU00006': [45, 40]}

View File

@@ -19,7 +19,7 @@ from spopt.region import Skater
from shapely.geometry import Point from shapely.geometry import Point
import geopandas as gpd import geopandas as gpd
from sklearn.metrics import pairwise_distances from sklearn.metrics import pairwise_distances
import project_info
# 2025/03/12 # 2025/03/12
# Step1: 获取节点坐标 # Step1: 获取节点坐标
@@ -639,7 +639,7 @@ def get_ID(name: str, sensor_num: int, min_diameter: int) -> list[str]:
if __name__ == '__main__': if __name__ == '__main__':
sensorindex = get_ID(name='bb', sensor_num=20, min_diameter=300) sensorindex = get_ID(name=project_info.name, sensor_num=20, min_diameter=300)
print(sensorindex) print(sensorindex)
# 将 sensor_coord 字典转换为 DataFrame # 将 sensor_coord 字典转换为 DataFrame

View File

@@ -12,7 +12,7 @@ from wntr.epanet.toolkit import EpanetException
from numpy.linalg import slogdet from numpy.linalg import slogdet
import random import random
from tjnetwork import * from tjnetwork import *
import project_info
# 2025/03/12 # 2025/03/12
# Step1: 获取节点坐标 # Step1: 获取节点坐标
@@ -472,7 +472,7 @@ def get_sensor_coord(name: str, sensor_num: int) -> dict[str, float]:
if __name__ == '__main__': if __name__ == '__main__':
sensor_coord = get_sensor_coord(name='bb', sensor_num=20) sensor_coord = get_sensor_coord(name=project_info.name, sensor_num=20)
print(sensor_coord) print(sensor_coord)
# ''' # '''
# 初始测压点布置根据灵敏度来布置计算初始情况下的校准过程的error # 初始测压点布置根据灵敏度来布置计算初始情况下的校准过程的error

View File

@@ -51,12 +51,12 @@ Password Tjwater@123456
Organizatio TJWATEORG Organizatio TJWATEORG
Bucket TJWATERBUCKET Bucket TJWATERBUCKET
API Token : cpuAmRnJqSMd7F34q1VjG6JgwZfO0S0w0vK2ZmAvA6zvf6m-6UAobUKSW3xhGr_nxZI5HsFlpfZHT1i8sI3LyQ== API Token : LsqvuqtBqtBv44z_CWh5bWfn9hs1QKcYu5kWahF_cdF6JyqtwuUxU5CK7HWP7BOtP5_2f5mjx76qXyuPLOHWdw==
influx config create --config-name onboarding ` influx config create --config-name onboarding `
--host-url "http://localhost:8086" ` --host-url "http://localhost:8086" `
--org "TJWATERORG" ` --org "TJWATERORG" `
--token "cpuAmRnJqSMd7F34q1VjG6JgwZfO0S0w0vK2ZmAvA6zvf6m-6UAobUKSW3xhGr_nxZI5HsFlpfZHT1i8sI3LyQ==" ` --token "LsqvuqtBqtBv44z_CWh5bWfn9hs1QKcYu5kWahF_cdF6JyqtwuUxU5CK7HWP7BOtP5_2f5mjx76qXyuPLOHWdw=="
--active --active
@@ -83,6 +83,8 @@ Setup instructions for WMH's work
4submit_risk_probability_result 4submit_risk_probability_result
5pressure_sensor_placement_sensitivity 5pressure_sensor_placement_sensitivity
6. In times table, all time values should be the format "hh:mm::ss"
NOTES: NOTES:
1. SCADA设备的时候单位要转换现在模拟算出来的单位是L/sSCADA数据是m3/hL/s要乘3.6才是m3/h 1. SCADA设备的时候单位要转换现在模拟算出来的单位是L/sSCADA数据是m3/hL/s要乘3.6才是m3/h
2. 之前的SCADA压力应该转过了SCADA数据的单位是MPa要乘以100才是m 2. 之前的SCADA压力应该转过了SCADA数据的单位是MPa要乘以100才是m

View File

@@ -18,6 +18,7 @@ import psycopg
import logging import logging
import globals import globals
import uuid import uuid
import project_info
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
@@ -592,6 +593,9 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
# 打开数据库 # 打开数据库
open_project(name_c) open_project(name_c)
dic_time = get_time(name_c) dic_time = get_time(name_c)
print(dic_time)
# 获取水力模拟步长0:15:00 # 获取水力模拟步长0:15:00
globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP'] globals.hydraulic_timestep = dic_time['HYDRAULIC TIMESTEP']
# 将时间字符串转换为 timedelta 对象 # 将时间字符串转换为 timedelta 对象
@@ -889,7 +893,7 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
cs.append(valve_status) cs.append(valve_status)
set_status(name_c, cs) set_status(name_c, cs)
# 运行并返回结果 # 运行并返回结果
result = run_project(name_c) run_project(name_c)
time_cost_end = time.perf_counter() time_cost_end = time.perf_counter()
print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start)) print('{} -- Hydraulic simulation finished, cost time: {:.2f} s.'.format( datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S'), time_cost_end - time_cost_start))
# DingZQ 下面这几句一定要这样,不然读取不了 # DingZQ 下面这几句一定要这样,不然读取不了
@@ -929,22 +933,22 @@ def run_simulation(name: str, simulation_type: str, modify_pattern_start_time: s
if __name__ == "__main__": if __name__ == "__main__":
# 计算前获取scada_info中的信息按照设定的方法修改pg数据库 # 计算前获取scada_info中的信息按照设定的方法修改pg数据库
query_corresponding_element_id_and_query_id("bb") query_corresponding_element_id_and_query_id(project_info.name)
query_corresponding_pattern_id_and_query_id('bb') query_corresponding_pattern_id_and_query_id(project_info.name)
region_result = query_non_realtime_region('bb') region_result = query_non_realtime_region(project_info.name)
globals.source_outflow_region_id = get_source_outflow_region_id('bb', region_result) globals.source_outflow_region_id = get_source_outflow_region_id(project_info.name, region_result)
globals.realtime_region_pipe_flow_and_demand_id = query_realtime_region_pipe_flow_and_demand_id('bb', region_result) globals.realtime_region_pipe_flow_and_demand_id = query_realtime_region_pipe_flow_and_demand_id(project_info.name, region_result)
globals.pipe_flow_region_patterns = query_pipe_flow_region_patterns('bb') globals.pipe_flow_region_patterns = query_pipe_flow_region_patterns(project_info.name)
globals.non_realtime_region_patterns = query_non_realtime_region_patterns('bb', region_result) globals.non_realtime_region_patterns = query_non_realtime_region_patterns(project_info.name, region_result)
globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns = get_realtime_region_patterns('bb', globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id) globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns = get_realtime_region_patterns(project_info.name, globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id)
# 基础日期和时间(日期部分保持不变) # 基础日期和时间(日期部分保持不变)
base_date = datetime(2025, 5, 4) base_date = datetime(2025, 8, 12)
# 循环生成96个时间点15分钟间隔 # 循环生成96个时间点15分钟间隔
for i in range(96): for i in range(4):
# 计算当前时间偏移 # 计算当前时间偏移
time_offset = timedelta(minutes=15 * i) time_offset = timedelta(minutes=15 * i)
@@ -956,7 +960,7 @@ if __name__ == "__main__":
# 执行函数调用 # 执行函数调用
run_simulation( run_simulation(
name='bb', name=project_info.name,
simulation_type="realtime", simulation_type="realtime",
modify_pattern_start_time=iso_time modify_pattern_start_time=iso_time
) )

View File

@@ -1,9 +1,9 @@
REM f: REM f:
REM cd "f:\DEV\GitHub\TJWaterServer" REM cd "f:\DEV\GitHub\TJWaterServer"
git pull REM git pull
REM call startpg.bat REM call startpg.bat
cd C:\SourceCode\Server cd d:\TJWaterServer\
REM uvicorn main:app --host 0.0.0.0 --port 80 --reload REM uvicorn main:app --host 0.0.0.0 --port 80 --reload
uvicorn main:app --host 0.0.0.0 --port 80 python -m uvicorn main:app --host 0.0.0.0 --port 8000 --reload