Files
TJWaterServer/online_Analysis.py
2025-02-07 22:58:15 +08:00

777 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
from tjnetwork import *
from api.project import CopyProjectEx
from run_simulation import run_simulation_ex, from_clock_to_seconds_2
from math import sqrt, pi
from epanet.epanet import Output
import json
from datetime import datetime
import time
import pytz
import psycopg
from psycopg import sql
import pandas as pd
import csv
import chardet
############################################################
# burst analysis 01
############################################################
def burst_analysis(prj_name, date_time, burst_ID: list | str,
burst_size: list | float | int = None, duration=900, pump_control=None, valve_closed=None) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'burst_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
##step 1 set the emitter coefficient of end node of busrt pipe
if isinstance(burst_ID, list):
if (burst_size is not None) and (type(burst_size) is not list):
return json.dumps('Type mismatch.')
elif isinstance(burst_ID, str):
burst_ID = [burst_ID]
if burst_size is not None:
if isinstance(burst_size, float) or isinstance(burst_size, int):
burst_size = [burst_size]
else:
return json.dumps('Type mismatch.')
else:
return json.dumps('Type mismatch.')
if burst_size is None:
burst_size = [-1] * len(burst_ID)
elif len(burst_size) < len(burst_ID):
burst_size += [-1] * (len(burst_ID) - len(burst_size))
elif len(burst_size) > len(burst_ID):
# burst_size = burst_size[:len(burst_ID)]
return json.dumps('Length mismatch.')
for burst_ID_, burst_size_ in zip(burst_ID, burst_size):
pipe = get_pipe(new_name, burst_ID_)
str_start_node = pipe['node1']
str_end_node = pipe['node2']
d_pipe = pipe['diameter'] / 1000.0
if burst_size_ <= 0:
burst_size_ = 3.14 * d_pipe * d_pipe / 4 / 8
else:
burst_size_ = burst_size_ / 10000
emitter_coeff = 0.65 * burst_size_ * sqrt(19.6) * 1000#1/8开口面积作为coeff
emitter_node = ''
if is_junction(new_name, str_end_node):
emitter_node = str_end_node
elif is_junction(new_name, str_start_node):
emitter_node = str_start_node
old_emitter = get_emitter(new_name, emitter_node)
if(old_emitter != None):
old_emitter['coefficient'] = emitter_coeff #爆管的emitter coefficient设置
else:
old_emitter = {'junction': emitter_node, 'coefficient': emitter_coeff}
new_emitter = ChangeSet()
new_emitter.append(old_emitter)
set_emitter(new_name, new_emitter)
#step 2. run simulation
# 涉及关阀计算可能导致关阀后仍有流量改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '20.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
valve_control = None
if valve_closed is not None:
valve_control = {}
for valve in valve_closed:
valve_control[valve] = {'status': 'CLOSED'}
result = run_simulation_ex(new_name,'realtime', date_time,
end_datetime=date_time,
duration=duration,
pump_control=pump_control,
valve_control=valve_control,
downloading_prohibition=True)
#step 3. restore the base model status
# execute_undo(prj_name) #有疑惑
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# valve closing analysis 02
############################################################
def valve_close_analysis(prj_name, date_time, valves, duration=None)->str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'valve_close_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
#step 1. change the valves status to 'closed'
for valve in valves:
if not is_valve(new_name,valve):
result='ID:{}is not a valve type'.format(valve)
return result
cs=ChangeSet()
status=get_status(new_name,valve)
status['status']='CLOSED'
cs.append(status)
set_status(new_name,cs)
#step 2. run simulation
# 涉及关阀计算可能导致关阀后仍有流量改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '20.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration,
downloading_prohibition=True)
#step 3. restore the base model
# for valve in valves:
# execute_undo(prj_name)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# flushing analysis 03
#Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string
############################################################
def flushing_analysis(prj_name, date_time, valves, valves_k, drainage_node_ID, flushing_flow=0, duration=None)->str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'flushing_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
if not is_junction(new_name,drainage_node_ID):
return 'Wrong Drainage node type'
#step 1. change the valves status to 'closed'
for valve, valve_k in zip(valves, valves_k):
cs=ChangeSet()
status=get_status(new_name,valve)
# status['status']='CLOSED'
if valve_k == 0:
status['status'] = 'CLOSED'
elif valve_k < 1:
status['status'] = 'OPEN'
status['setting'] = 0.1036 * pow(valve_k, -3.105)
cs.append(status)
set_status(new_name,cs)
#step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node
emitter_demand=get_demand(new_name,drainage_node_ID)
cs=ChangeSet()
if flushing_flow>0:
for r in emitter_demand['demands']:
r['demand']+=(flushing_flow/3.6)
cs.append(emitter_demand)
set_demand(new_name,cs)
else:
pipes=get_node_links(new_name,drainage_node_ID)
flush_diameter=50
for pipe in pipes:
d=get_pipe(new_name,pipe)['diameter']
if flush_diameter<d:
flush_diameter=d
flush_diameter/=1000
emitter_coeff=0.65*3.14*(flush_diameter*flush_diameter/4)*sqrt(19.6)*1000 #全开口面积作为coeff
old_emitter=get_emitter(new_name,drainage_node_ID)
if(old_emitter!=None):
old_emitter['coefficient']=emitter_coeff #爆管的emitter coefficient设置
else:
old_emitter={'junction':drainage_node_ID,'coefficient':emitter_coeff}
new_emitter=ChangeSet()
new_emitter.append(old_emitter)
set_emitter(new_name,new_emitter)
#step 3. run simulation
# 涉及关阀计算可能导致关阀后仍有流量改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '20.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration,
downloading_prohibition=True)
#step 4. restore the base model
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# Contaminant simulation 04
#
############################################################
def contaminant_simulation(prj_name:str,date_time:str, source:str,concentration:float,duration=900,pattern:str=None)->str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'contaminant_Sim_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
dic_time = get_time(new_name)
dic_time['QUALITY TIMESTEP'] = '0:05:00'
cs = ChangeSet()
cs.operations.append(dic_time)
set_time(new_name, cs) # set QUALITY TIMESTEP
time_option=get_time(new_name)
hydraulic_step=time_option['HYDRAULIC TIMESTEP']
secs=from_clock_to_seconds_2(hydraulic_step)
operation_step=0
#step 1. set duration
if duration==None:
duration=secs
#step 2. set pattern
if pattern!=None:
pt=get_pattern(new_name,pattern)
if pt==None:
str_response=str('cant find pattern')
return str_response
else:
cs_pattern=ChangeSet()
pt={}
factors=[]
tmp_duration=duration
while tmp_duration>0:
factors.append(1.0)
tmp_duration=tmp_duration-secs
pt['id']='contam_pt'
pt['factors']=factors
cs_pattern.append(pt)
add_pattern(new_name,cs_pattern)
operation_step+=1
#step 3. set source/initial quality
# source quality
cs_source=ChangeSet()
source_schema={'node':source,'s_type':SOURCE_TYPE_CONCEN,'strength':concentration,'pattern':pt['id']}
cs_source.append(source_schema)
source_node=get_source(new_name,source)
if len(source_node)==0:
add_source(new_name,cs_source)
else:
set_source(new_name,cs_source)
dict_demand = get_demand(new_name, source)
for demands in dict_demand['demands']:
dict_demand['demands'][dict_demand['demands'].index(demands)]['demand'] = -1
dict_demand['demands'][dict_demand['demands'].index(demands)]['pattern'] = None
cs = ChangeSet()
cs.append(dict_demand)
set_demand(new_name, cs) # set inflow node
# # initial quality
# dict_quality = get_quality(new_name, source)
# dict_quality['quality'] = concentration
# cs = ChangeSet()
# cs.append(dict_quality)
# set_quality(new_name, cs)
operation_step+=1
#step 4 set option of quality to chemical
opt=get_option(new_name)
opt['QUALITY']=OPTION_QUALITY_CHEMICAL
cs_option=ChangeSet()
cs_option.append(opt)
set_option(new_name,cs_option)
operation_step+=1
#step 5. run simulation
result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration,
downloading_prohibition=True)
# for i in range(1,operation_step):
# execute_undo(prj_name)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# age analysis 05
############################################################
def age_analysis(prj_name, start_time, end_time, duration) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'age_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
# step 1. run simulation
result = run_simulation_ex(new_name, 'realtime', start_time, end_time, duration,
downloading_prohibition=True)
# step 2. restore the base model status
# execute_undo(prj_name) #有疑惑
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
output = Output("./temp/{}.db.out".format(new_name))
# element_name = output.element_name()
# node_name = element_name['nodes']
# link_name = element_name['links']
nodes_age = []
node_result = output.node_results()
for node in node_result:
nodes_age.append(node['result'][-1]['quality'])
links_age = []
link_result = output.link_results()
for link in link_result:
links_age.append(link['result'][-1]['quality'])
age_result = {'nodes': nodes_age, 'links': links_age}
# age_result = {'nodes': nodes_age, 'links': links_age, 'nodeIDs': node_name, 'linkIDs': link_name}
return json.dumps(age_result)
############################################################
# pressure regulation 06
############################################################
def pressure_regulation(prj_name, start_datetime, pump_control, tank_initial_level_control=None) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'pressure_regulation_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
# 全部关泵后压力计算不合理改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '15.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
result = run_simulation_ex(name=new_name,
simulation_type='realtime',
start_datetime=start_datetime,
duration=900,
pump_control=pump_control,
tank_initial_level_control=tank_initial_level_control,
downloading_prohibition=True)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# project management 07
############################################################
def project_management(prj_name, start_datetime, pump_control,
tank_initial_level_control=None, region_demand_control=None) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'project_management_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
result = run_simulation_ex(name=new_name,
simulation_type='realtime',
start_datetime=start_datetime,
duration=86400,
pump_control=pump_control,
tank_initial_level_control=tank_initial_level_control,
region_demand_control=region_demand_control,
downloading_prohibition=True)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# scheduling analysis 08
############################################################
def scheduling_simulation(prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'scheduling_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
run_simulation_ex(new_name, 'realtime', start_time, duration=0, pump_control=pump_control)
if not is_project_open(new_name):
open_project(new_name)
tank = get_tank(new_name, tank_id) # 水塔信息
tank_floor_space = pi * pow(tank['diameter'] / 2, 2) # 水塔底面积(m^2)
tank_init_level = tank['init_level'] # 水塔初始水位(m)
tank_pipes_id = tank['links'] # pipes list
tank_pipe_flow_direction = {} # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1
for pipe_id in tank_pipes_id:
if get_pipe(new_name, pipe_id)['node2'] == tank_id: # 水塔为下游节点
tank_pipe_flow_direction[pipe_id] = 1
else:
tank_pipe_flow_direction[pipe_id] = -1
output = Output("./temp/{}.db.out".format(new_name))
node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float}]}]
water_plant_output_pressure = 0
for node_result in node_results:
if node_result['node'] == water_plant_output_id: # 水厂出水压力(m)
water_plant_output_pressure = node_result['result'][-1]['pressure']
water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa)
pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}]
tank_inflow = 0
for pipe_result in pipe_results:
for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道
if pipe_result['link'] == pipe_id: # 水塔入流流量(L/s)
tank_inflow += pipe_result['result'][-1]['flow'] * tank_pipe_flow_direction[pipe_id]
tank_inflow /= 1000 # 水塔入流流量(m^3/s)
tank_level_delta = tank_inflow * time_delta / tank_floor_space # 水塔水位改变值(m)
tank_level = tank_init_level + tank_level_delta # 预计水塔水位(m)
simulation_results = {'water_plant_output_pressure': water_plant_output_pressure,
'tank_init_level': tank_init_level,
'tank_level': tank_level}
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return json.dumps(simulation_results)
def daily_scheduling_simulation(prj_name, start_time, pump_control,
reservoir_id, tank_id, water_plant_output_id) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'daily_scheduling_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
run_simulation_ex(new_name, 'realtime', start_time, duration=86400, pump_control=pump_control)
if not is_project_open(new_name):
open_project(new_name)
output = Output("./temp/{}.db.out".format(new_name))
node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float, 'head': float}]}]
water_plant_output_pressure = []
reservoir_level = []
tank_level = []
for node_result in node_results:
if node_result['node'] == water_plant_output_id:
for result in node_result['result']:
water_plant_output_pressure.append(result['pressure'] / 100) # 水厂出水压力(Mpa)
elif node_result['node'] == reservoir_id:
for result in node_result['result']:
reservoir_level.append(result['head'] - 250.35) # 清水池液位(m)
elif node_result['node'] == tank_id:
for result in node_result['result']:
tank_level.append(result['pressure']) # 调节池液位(m)
simulation_results = {'water_plant_output_pressure': water_plant_output_pressure,
'reservoir_level': reservoir_level,
'tank_level': tank_level}
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return json.dumps(simulation_results)
############################################################
# network_update 10
############################################################
def network_update(file_path: str) -> None:
read_inp('bb', file_path)
csv_path = './history_pattern_flow.csv'
# # 检查文件是否存在
# if os.path.exists(csv_path):
# print(f"history_patterns_flows文件存在开始处理...")
#
# # 读取 CSV 文件
# df = pd.read_csv(csv_path)
#
# # 连接到 PostgreSQL 数据库(这里是数据库 "bb"
# with psycopg.connect("dbname=bb host=127.0.0.1") as conn:
# with conn.cursor() as cur:
# for index, row in df.iterrows():
# # 直接将数据插入,不进行唯一性检查
# insert_sql = sql.SQL("""
# INSERT INTO history_patterns_flows (id, factor, flow)
# VALUES (%s, %s, %s);
# """)
# # 将数据插入数据库
# cur.execute(insert_sql, (row['id'], row['factor'], row['flow']))
# conn.commit()
# print("数据成功导入到 'history_patterns_flows' 表格。")
# else:
# print(f"history_patterns_flows文件不存在。")
# 检查文件是否存在
if os.path.exists(csv_path):
print(f"history_patterns_flows文件存在开始处理...")
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect("dbname=bb host=127.0.0.1") as conn:
with conn.cursor() as cur:
with open(csv_path, newline='', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# 直接将数据插入,不进行唯一性检查
insert_sql = sql.SQL("""
INSERT INTO history_patterns_flows (id, factor, flow)
VALUES (%s, %s, %s);
""")
# 将数据插入数据库
cur.execute(insert_sql, (row['id'], row['factor'], row['flow']))
conn.commit()
print("数据成功导入到 'history_patterns_flows' 表格。")
else:
print(f"history_patterns_flows文件不存在。")
def submit_scada_info(name: str, coord_id: str) -> None:
"""
将scada信息表导入pg数据库
:param name: 项目名称(数据库名称)
:param coord_id: 坐标系的id如4326根据原始坐标信息输入
:return:
"""
scada_info_path = './scada_info.csv'
# 检查文件是否存在
if os.path.exists(scada_info_path):
print(f"scada_info文件存在开始处理...")
# 自动检测文件编码
with open(scada_info_path, 'rb') as file:
raw_data = file.read()
detected = chardet.detect(raw_data)
file_encoding = detected['encoding']
print(f"检测到的文件编码:{file_encoding}")
try:
# 动态替换数据库名称
conn_string = f"dbname={name} host=127.0.0.1"
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 检查 scada_info 表是否为空
cur.execute("SELECT COUNT(*) FROM scada_info;")
count = cur.fetchone()[0]
if count > 0:
print("scada_info表中已有数据正在清空记录...")
cur.execute("DELETE FROM scada_info;")
print("表记录已清空。")
with open(scada_info_path, newline='', encoding=file_encoding) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# 将CSV单元格值为空的字段转换为 None
cleaned_row = {key: (value if value.strip() else None) for key, value in row.items()}
# 处理 associated_source_outflow_id 列动态变化
associated_columns = [f"associated_source_outflow_id{i}" for i in range(1, 21)]
associated_values = [
(cleaned_row.get(col).strip() if cleaned_row.get(col) and cleaned_row.get(
col).strip() else None)
for col in associated_columns
]
# 将 X_coor 和 Y_coor 转换为 geometry 类型
x_coor = float(cleaned_row['X_coor']) if cleaned_row['X_coor'] else None
y_coor = float(cleaned_row['Y_coor']) if cleaned_row['Y_coor'] else None
coord = f"SRID={coord_id};POINT({x_coor} {y_coor})" if x_coor and y_coor else None
# 准备插入 SQL 语句
insert_sql = sql.SQL("""
INSERT INTO scada_info (
id, type, associated_element_id, associated_pattern,
associated_pipe_flow_id, {associated_columns},
API_query_id, transmission_mode, transmission_frequency,
X_coor, Y_coor, coord
)
VALUES (
%s, %s, %s, %s, %s, {associated_placeholders},
%s, %s, %s, %s, %s, %s
);
""").format(
associated_columns=sql.SQL(", ").join(sql.Identifier(col) for col in associated_columns),
associated_placeholders=sql.SQL(", ").join(sql.Placeholder() for _ in associated_columns)
)
# 将数据插入数据库
cur.execute(insert_sql, (
cleaned_row['id'], cleaned_row['type'], cleaned_row['associated_element_id'],
cleaned_row.get('associated_pattern'), cleaned_row.get('associated_pipe_flow_id'),
*associated_values, cleaned_row.get('API_query_id'),
cleaned_row['transmission_mode'], cleaned_row['transmission_frequency'],
x_coor, y_coor, coord
))
conn.commit()
print("数据成功导入到 'scada_info' 表格。")
except Exception as e:
print(f"导入时出错:{e}")
else:
print(f"scada_info文件不存在。")
if __name__ == '__main__':
# contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800)
# flushing_analysis('bb_model','2024-04-01T08:00:00Z',{'GSD230719205857733F8F5214FF','GSD230719205857C0AF65B6A170'},'GSD2307192058570DEDF28E4F73',0,duration=900)
# flushing_analysis('bb_model', '2024-08-26T08:00:00Z', ['GSD2307192058572E5C0E14D83E'], [0.5], 'ZBBDTZDP009410', 0,
# duration=1800)
# valve_close_analysis('bb_model','2024-04-01T08:00:00Z',['GSD2307192058576122D929EE99(L)'],duration=1800)
# burst_analysis('bb','2024-04-01T08:00:00Z','ZBBGXSZW000001',burst_size=200,duration=1800)
#run_simulation('beibeizone','2024-04-01T08:00:00Z')
# str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_no_burst.out')
# with open("out_dump.txt", "w") as f:
# f.write(str_dump)
# str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_busrtID(ZBBGXSZW000001).out')
# with open("burst_out_dump.txt", "w") as f:
# f.write(str_dump)
# network_update('model22_1223.inp')
submit_scada_info('bb', '4490')