import os from tjnetwork import * from api.project import CopyProjectEx from run_simulation import run_simulation_ex, from_clock_to_seconds_2 from math import sqrt, pi from epanet.epanet import Output import json from datetime import datetime import time import pytz import psycopg from psycopg import sql import pandas as pd import csv import chardet ############################################################ # burst analysis 01 ############################################################ def burst_analysis(prj_name, date_time, burst_ID: list | str, burst_size: list | float | int = None, duration=900, pump_control=None, valve_closed=None) -> str: ''' burst 分析 :param prj_name: :param date_time: :param burst_ID: :param burst_siz: :param duration: :param pump_control: :param valve_closed: :return: ''' print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'burst_Anal_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") ##step 1 set the emitter coefficient of end node of busrt pipe if isinstance(burst_ID, list): if (burst_size is not None) and (type(burst_size) is not list): return json.dumps('Type mismatch.') elif isinstance(burst_ID, str): burst_ID = [burst_ID] if burst_size is not None: if isinstance(burst_size, float) or isinstance(burst_size, int): burst_size = [burst_size] else: return json.dumps('Type mismatch.') else: return json.dumps('Type mismatch.') if burst_size is None: burst_size = [-1] * len(burst_ID) elif len(burst_size) < len(burst_ID): burst_size += [-1] * (len(burst_ID) - len(burst_size)) elif len(burst_size) > len(burst_ID): # burst_size = burst_size[:len(burst_ID)] return json.dumps('Length mismatch.') for burst_ID_, burst_size_ in zip(burst_ID, burst_size): pipe = get_pipe(new_name, burst_ID_) str_start_node = pipe['node1'] str_end_node = pipe['node2'] d_pipe = pipe['diameter'] / 1000.0 if burst_size_ <= 0: burst_size_ = 3.14 * d_pipe * d_pipe / 4 / 8 else: burst_size_ = burst_size_ / 10000 emitter_coeff = 0.65 * burst_size_ * sqrt(19.6) * 1000#1/8开口面积作为coeff emitter_node = '' if is_junction(new_name, str_end_node): emitter_node = str_end_node elif is_junction(new_name, str_start_node): emitter_node = str_start_node old_emitter = get_emitter(new_name, emitter_node) if(old_emitter != None): old_emitter['coefficient'] = emitter_coeff #爆管的emitter coefficient设置 else: old_emitter = {'junction': emitter_node, 'coefficient': emitter_coeff} new_emitter = ChangeSet() new_emitter.append(old_emitter) set_emitter(new_name, new_emitter) #step 2. run simulation # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA options = get_option(new_name) options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA options['REQUIRED PRESSURE'] = '20.0000' cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) valve_control = None if valve_closed is not None: valve_control = {} for valve in valve_closed: valve_control[valve] = {'status': 'CLOSED'} result = run_simulation_ex(new_name,'realtime', date_time, end_datetime=date_time, duration=duration, pump_control=pump_control, valve_control=valve_control, downloading_prohibition=True) #step 3. restore the base model status # execute_undo(prj_name) #有疑惑 if is_project_open(new_name): close_project(new_name) delete_project(new_name) return result ############################################################ # valve closing analysis 02 ############################################################ def valve_close_analysis(prj_name, date_time, valves, duration=None)->str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'valve_close_Anal_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") #step 1. change the valves status to 'closed' for valve in valves: if not is_valve(new_name,valve): result='ID:{}is not a valve type'.format(valve) return result cs=ChangeSet() status=get_status(new_name,valve) status['status']='CLOSED' cs.append(status) set_status(new_name,cs) #step 2. run simulation # 涉及关阀计算,可能导致关阀后仍有流量,改为压力驱动PDA options = get_option(new_name) options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA options['REQUIRED PRESSURE'] = '20.0000' cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration, downloading_prohibition=True) #step 3. restore the base model # for valve in valves: # execute_undo(prj_name) if is_project_open(new_name): close_project(new_name) delete_project(new_name) return result ############################################################ # flushing analysis 03 #Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string ############################################################ def flushing_analysis(prj_name, date_time, valves, valves_k, drainage_node_ID, flushing_flow=0, duration=None)->str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'flushing_Anal_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") if not is_junction(new_name,drainage_node_ID): return 'Wrong Drainage node type' #step 1. change the valves status to 'closed' for valve, valve_k in zip(valves, valves_k): cs=ChangeSet() status=get_status(new_name,valve) # status['status']='CLOSED' if valve_k == 0: status['status'] = 'CLOSED' elif valve_k < 1: status['status'] = 'OPEN' status['setting'] = 0.1036 * pow(valve_k, -3.105) cs.append(status) set_status(new_name,cs) #step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node emitter_demand=get_demand(new_name,drainage_node_ID) cs=ChangeSet() if flushing_flow>0: for r in emitter_demand['demands']: r['demand']+=(flushing_flow/3.6) cs.append(emitter_demand) set_demand(new_name,cs) else: pipes=get_node_links(new_name,drainage_node_ID) flush_diameter=50 for pipe in pipes: d=get_pipe(new_name,pipe)['diameter'] if flush_diameterstr: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'contaminant_Sim_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") dic_time = get_time(new_name) dic_time['QUALITY TIMESTEP'] = '0:05:00' cs = ChangeSet() cs.operations.append(dic_time) set_time(new_name, cs) # set QUALITY TIMESTEP time_option=get_time(new_name) hydraulic_step=time_option['HYDRAULIC TIMESTEP'] secs=from_clock_to_seconds_2(hydraulic_step) operation_step=0 #step 1. set duration if duration==None: duration=secs #step 2. set pattern if pattern!=None: pt=get_pattern(new_name,pattern) if pt==None: str_response=str('cant find pattern') return str_response else: cs_pattern=ChangeSet() pt={} factors=[] tmp_duration=duration while tmp_duration>0: factors.append(1.0) tmp_duration=tmp_duration-secs pt['id']='contam_pt' pt['factors']=factors cs_pattern.append(pt) add_pattern(new_name,cs_pattern) operation_step+=1 #step 3. set source/initial quality # source quality cs_source=ChangeSet() source_schema={'node':source,'s_type':SOURCE_TYPE_CONCEN,'strength':concentration,'pattern':pt['id']} cs_source.append(source_schema) source_node=get_source(new_name,source) if len(source_node)==0: add_source(new_name,cs_source) else: set_source(new_name,cs_source) dict_demand = get_demand(new_name, source) for demands in dict_demand['demands']: dict_demand['demands'][dict_demand['demands'].index(demands)]['demand'] = -1 dict_demand['demands'][dict_demand['demands'].index(demands)]['pattern'] = None cs = ChangeSet() cs.append(dict_demand) set_demand(new_name, cs) # set inflow node # # initial quality # dict_quality = get_quality(new_name, source) # dict_quality['quality'] = concentration # cs = ChangeSet() # cs.append(dict_quality) # set_quality(new_name, cs) operation_step+=1 #step 4 set option of quality to chemical opt=get_option(new_name) opt['QUALITY']=OPTION_QUALITY_CHEMICAL cs_option=ChangeSet() cs_option.append(opt) set_option(new_name,cs_option) operation_step+=1 #step 5. run simulation result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration, downloading_prohibition=True) # for i in range(1,operation_step): # execute_undo(prj_name) if is_project_open(new_name): close_project(new_name) delete_project(new_name) return result ############################################################ # age analysis 05 ############################################################ def age_analysis(prj_name, start_time, end_time, duration) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'age_Anal_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") # step 1. run simulation result = run_simulation_ex(new_name, 'realtime', start_time, end_time, duration, downloading_prohibition=True) # step 2. restore the base model status # execute_undo(prj_name) #有疑惑 if is_project_open(new_name): close_project(new_name) delete_project(new_name) output = Output("./temp/{}.db.out".format(new_name)) # element_name = output.element_name() # node_name = element_name['nodes'] # link_name = element_name['links'] nodes_age = [] node_result = output.node_results() for node in node_result: nodes_age.append(node['result'][-1]['quality']) links_age = [] link_result = output.link_results() for link in link_result: links_age.append(link['result'][-1]['quality']) age_result = {'nodes': nodes_age, 'links': links_age} # age_result = {'nodes': nodes_age, 'links': links_age, 'nodeIDs': node_name, 'linkIDs': link_name} return json.dumps(age_result) ############################################################ # pressure regulation 06 ############################################################ def pressure_regulation(prj_name, start_datetime, pump_control, tank_initial_level_control=None) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'pressure_regulation_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") # 全部关泵后,压力计算不合理,改为压力驱动PDA options = get_option(new_name) options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA options['REQUIRED PRESSURE'] = '15.0000' cs_options = ChangeSet() cs_options.append(options) set_option(new_name, cs_options) result = run_simulation_ex(name=new_name, simulation_type='realtime', start_datetime=start_datetime, duration=900, pump_control=pump_control, tank_initial_level_control=tank_initial_level_control, downloading_prohibition=True) if is_project_open(new_name): close_project(new_name) delete_project(new_name) return result ############################################################ # project management 07 ############################################################ def project_management(prj_name, start_datetime, pump_control, tank_initial_level_control=None, region_demand_control=None) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'project_management_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") result = run_simulation_ex(name=new_name, simulation_type='realtime', start_datetime=start_datetime, duration=86400, pump_control=pump_control, tank_initial_level_control=tank_initial_level_control, region_demand_control=region_demand_control, downloading_prohibition=True) if is_project_open(new_name): close_project(new_name) delete_project(new_name) return result ############################################################ # scheduling analysis 08 ############################################################ def scheduling_simulation(prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'scheduling_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") run_simulation_ex(new_name, 'realtime', start_time, duration=0, pump_control=pump_control) if not is_project_open(new_name): open_project(new_name) tank = get_tank(new_name, tank_id) # 水塔信息 tank_floor_space = pi * pow(tank['diameter'] / 2, 2) # 水塔底面积(m^2) tank_init_level = tank['init_level'] # 水塔初始水位(m) tank_pipes_id = tank['links'] # pipes list tank_pipe_flow_direction = {} # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1 for pipe_id in tank_pipes_id: if get_pipe(new_name, pipe_id)['node2'] == tank_id: # 水塔为下游节点 tank_pipe_flow_direction[pipe_id] = 1 else: tank_pipe_flow_direction[pipe_id] = -1 output = Output("./temp/{}.db.out".format(new_name)) node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float}]}] water_plant_output_pressure = 0 for node_result in node_results: if node_result['node'] == water_plant_output_id: # 水厂出水压力(m) water_plant_output_pressure = node_result['result'][-1]['pressure'] water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa) pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}] tank_inflow = 0 for pipe_result in pipe_results: for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道 if pipe_result['link'] == pipe_id: # 水塔入流流量(L/s) tank_inflow += pipe_result['result'][-1]['flow'] * tank_pipe_flow_direction[pipe_id] tank_inflow /= 1000 # 水塔入流流量(m^3/s) tank_level_delta = tank_inflow * time_delta / tank_floor_space # 水塔水位改变值(m) tank_level = tank_init_level + tank_level_delta # 预计水塔水位(m) simulation_results = {'water_plant_output_pressure': water_plant_output_pressure, 'tank_init_level': tank_init_level, 'tank_level': tank_level} if is_project_open(new_name): close_project(new_name) delete_project(new_name) return json.dumps(simulation_results) def daily_scheduling_simulation(prj_name, start_time, pump_control, reservoir_id, tank_id, water_plant_output_id) -> str: print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.") new_name = f'daily_scheduling_{prj_name}' if have_project(new_name): if is_project_open(new_name): close_project(new_name) delete_project(new_name) if is_project_open(prj_name): close_project(prj_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.") CopyProjectEx()(prj_name, new_name, ['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table']) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.") open_project(new_name) print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.") run_simulation_ex(new_name, 'realtime', start_time, duration=86400, pump_control=pump_control) if not is_project_open(new_name): open_project(new_name) output = Output("./temp/{}.db.out".format(new_name)) node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float, 'head': float}]}] water_plant_output_pressure = [] reservoir_level = [] tank_level = [] for node_result in node_results: if node_result['node'] == water_plant_output_id: for result in node_result['result']: water_plant_output_pressure.append(result['pressure'] / 100) # 水厂出水压力(Mpa) elif node_result['node'] == reservoir_id: for result in node_result['result']: reservoir_level.append(result['head'] - 250.35) # 清水池液位(m) elif node_result['node'] == tank_id: for result in node_result['result']: tank_level.append(result['pressure']) # 调节池液位(m) simulation_results = {'water_plant_output_pressure': water_plant_output_pressure, 'reservoir_level': reservoir_level, 'tank_level': tank_level} if is_project_open(new_name): close_project(new_name) delete_project(new_name) return json.dumps(simulation_results) ############################################################ # network_update 10 ############################################################ def network_update(file_path: str) -> None: read_inp('bb', file_path) csv_path = './history_pattern_flow.csv' # # 检查文件是否存在 # if os.path.exists(csv_path): # print(f"history_patterns_flows文件存在,开始处理...") # # # 读取 CSV 文件 # df = pd.read_csv(csv_path) # # # 连接到 PostgreSQL 数据库(这里是数据库 "bb") # with psycopg.connect("dbname=bb host=127.0.0.1") as conn: # with conn.cursor() as cur: # for index, row in df.iterrows(): # # 直接将数据插入,不进行唯一性检查 # insert_sql = sql.SQL(""" # INSERT INTO history_patterns_flows (id, factor, flow) # VALUES (%s, %s, %s); # """) # # 将数据插入数据库 # cur.execute(insert_sql, (row['id'], row['factor'], row['flow'])) # conn.commit() # print("数据成功导入到 'history_patterns_flows' 表格。") # else: # print(f"history_patterns_flows文件不存在。") # 检查文件是否存在 if os.path.exists(csv_path): print(f"history_patterns_flows文件存在,开始处理...") # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect("dbname=bb host=127.0.0.1") as conn: with conn.cursor() as cur: with open(csv_path, newline='', encoding='utf-8-sig') as csvfile: reader = csv.DictReader(csvfile) for row in reader: # 直接将数据插入,不进行唯一性检查 insert_sql = sql.SQL(""" INSERT INTO history_patterns_flows (id, factor, flow) VALUES (%s, %s, %s); """) # 将数据插入数据库 cur.execute(insert_sql, (row['id'], row['factor'], row['flow'])) conn.commit() print("数据成功导入到 'history_patterns_flows' 表格。") else: print(f"history_patterns_flows文件不存在。") def submit_scada_info(name: str, coord_id: str) -> None: """ 将scada信息表导入pg数据库 :param name: 项目名称(数据库名称) :param coord_id: 坐标系的id,如4326,根据原始坐标信息输入 :return: """ scada_info_path = './scada_info.csv' # 检查文件是否存在 if os.path.exists(scada_info_path): print(f"scada_info文件存在,开始处理...") # 自动检测文件编码 with open(scada_info_path, 'rb') as file: raw_data = file.read() detected = chardet.detect(raw_data) file_encoding = detected['encoding'] print(f"检测到的文件编码:{file_encoding}") try: # 动态替换数据库名称 conn_string = f"dbname={name} host=127.0.0.1" # 连接到 PostgreSQL 数据库(这里是数据库 "bb") with psycopg.connect(conn_string) as conn: with conn.cursor() as cur: # 检查 scada_info 表是否为空 cur.execute("SELECT COUNT(*) FROM scada_info;") count = cur.fetchone()[0] if count > 0: print("scada_info表中已有数据,正在清空记录...") cur.execute("DELETE FROM scada_info;") print("表记录已清空。") with open(scada_info_path, newline='', encoding=file_encoding) as csvfile: reader = csv.DictReader(csvfile) for row in reader: # 将CSV单元格值为空的字段转换为 None cleaned_row = {key: (value if value.strip() else None) for key, value in row.items()} # 处理 associated_source_outflow_id 列动态变化 associated_columns = [f"associated_source_outflow_id{i}" for i in range(1, 21)] associated_values = [ (cleaned_row.get(col).strip() if cleaned_row.get(col) and cleaned_row.get( col).strip() else None) for col in associated_columns ] # 将 X_coor 和 Y_coor 转换为 geometry 类型 x_coor = float(cleaned_row['X_coor']) if cleaned_row['X_coor'] else None y_coor = float(cleaned_row['Y_coor']) if cleaned_row['Y_coor'] else None coord = f"SRID={coord_id};POINT({x_coor} {y_coor})" if x_coor and y_coor else None # 准备插入 SQL 语句 insert_sql = sql.SQL(""" INSERT INTO scada_info ( id, type, associated_element_id, associated_pattern, associated_pipe_flow_id, {associated_columns}, API_query_id, transmission_mode, transmission_frequency, X_coor, Y_coor, coord ) VALUES ( %s, %s, %s, %s, %s, {associated_placeholders}, %s, %s, %s, %s, %s, %s ); """).format( associated_columns=sql.SQL(", ").join(sql.Identifier(col) for col in associated_columns), associated_placeholders=sql.SQL(", ").join(sql.Placeholder() for _ in associated_columns) ) # 将数据插入数据库 cur.execute(insert_sql, ( cleaned_row['id'], cleaned_row['type'], cleaned_row['associated_element_id'], cleaned_row.get('associated_pattern'), cleaned_row.get('associated_pipe_flow_id'), *associated_values, cleaned_row.get('API_query_id'), cleaned_row['transmission_mode'], cleaned_row['transmission_frequency'], x_coor, y_coor, coord )) conn.commit() print("数据成功导入到 'scada_info' 表格。") except Exception as e: print(f"导入时出错:{e}") else: print(f"scada_info文件不存在。") if __name__ == '__main__': # contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800) # flushing_analysis('bb_model','2024-04-01T08:00:00Z',{'GSD230719205857733F8F5214FF','GSD230719205857C0AF65B6A170'},'GSD2307192058570DEDF28E4F73',0,duration=900) # flushing_analysis('bb_model', '2024-08-26T08:00:00Z', ['GSD2307192058572E5C0E14D83E'], [0.5], 'ZBBDTZDP009410', 0, # duration=1800) # valve_close_analysis('bb_model','2024-04-01T08:00:00Z',['GSD2307192058576122D929EE99(L)'],duration=1800) # burst_analysis('bb','2024-04-01T08:00:00Z','ZBBGXSZW000001',burst_size=200,duration=1800) #run_simulation('beibeizone','2024-04-01T08:00:00Z') # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_no_burst.out') # with open("out_dump.txt", "w") as f: # f.write(str_dump) # str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_busrtID(ZBBGXSZW000001).out') # with open("burst_out_dump.txt", "w") as f: # f.write(str_dump) # network_update('model22_1223.inp') submit_scada_info('bb', '4490')