Add WMH code

This commit is contained in:
DingZQ
2025-02-07 22:58:15 +08:00
parent c40903289e
commit 15147b97fc
7 changed files with 3565 additions and 63 deletions

View File

@@ -0,0 +1,76 @@
import influxdb_api
import globals
from datetime import datetime, timedelta, timezone
import schedule
import time
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
# 2025/02/06
def get_next_period_time() -> str:
"""
获取下一个6小时时间点返回格式为字符串'YYYY-MM-DDTHH:00:00+08:00'
:return: 返回字符串格式的时间表示下一个6小时执行时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的小时数并计算下一个6小时时间点
next_period_hour = (now.hour // 6 + 1) * 6 # next_period_hour 类型为 int表示下一个6小时时间点的小时部分
# 如果计算的小时大于23表示进入第二天调整为00:00
if next_period_hour >= 24:
next_period_hour = 0
now = now + timedelta(days=1) # 如果超过24小时日期增加1天
# 将秒和微秒部分清除构建出下一个6小时点的datetime对象
next_period_time = now.replace(hour=next_period_hour, minute=0, second=0, microsecond=0)
return next_period_time.strftime('%Y-%m-%dT%H:%M:%S+08:00') # 格式化为指定的字符串格式并返回
# 2025/02/06
def store_non_realtime_SCADA_data_job() -> None:
"""
定义的任务2每6小时执行一次在0点、6点、12点、18点执行执行时更新get_history_data_end_time并调用store_non_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间
current_time = datetime.now()
# 只在0点、6点、12点、18点执行任务
if current_time.hour % 6 == 0 and current_time.minute == 0:
# 获取下一个6小时的时间点并更新get_history_data_end_time
get_history_data_end_time: str = get_next_period_time() # get_history_data_end_time 类型为 str格式为'2025-02-06T12:00:00+08:00'
# 调用函数执行任务
influxdb_api.store_non_realtime_SCADA_data_to_influxdb(get_history_data_end_time)
print('{} -- Successfully store non realtime SCADA data.'.format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 2025/02/06
def store_non_realtime_SCADA_data_task() -> None:
"""
定时执行6小时的任务使用schedule库每分钟执行一次store_non_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_non_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
try:
# 每分钟检查一次执行store_non_realtime_SCADA_data_job
schedule.every(1).minute.do(store_non_realtime_SCADA_data_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
pass
except Exception as e:
print(f"Error occurred in store_non_realtime_SCADA_data_task: {e}")
if __name__ == "__main__":
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token
org_name = "beibei" # 替换为你的Organization名称
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_non_realtime('bb')
# 自动执行
store_non_realtime_SCADA_data_task()

View File

@@ -0,0 +1,61 @@
import influxdb_api
import globals
from datetime import datetime, timedelta, timezone
import schedule
import time
from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi
# 2025/02/01
def get_next_time() -> str:
"""
获取下一个1分钟时间点返回格式为字符串'YYYY-MM-DDTHH:MM:00+08:00'
:return: 返回字符串格式的时间表示下一个1分钟的时间点
"""
# 获取当前时间,并设定为北京时间
now = datetime.now() # now 类型为 datetime表示当前本地时间
# 获取当前的分钟,并且将秒和微秒置为零
current_time = now.replace(second=0, microsecond=0) # current_time 类型为 datetime时间的秒和微秒部分被清除
return current_time.strftime('%Y-%m-%dT%H:%M:%S+08:00')
# 2025/02/06
def store_realtime_SCADA_data_job() -> None:
"""
定义的任务1每分钟执行1次每次执行时更新get_real_value_time并调用store_realtime_SCADA_data_to_influxdb函数
:return: None
"""
# 获取当前时间并更新get_real_value_time转换为字符串格式
get_real_value_time: str = get_next_time() # get_real_value_time 类型为 str格式为'2025-02-01T18:45:00+08:00'
# 调用函数执行任务
influxdb_api.store_realtime_SCADA_data_to_influxdb(get_real_value_time)
print('{} -- Successfully store realtime SCADA data.'.format(
datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 2025/02/06
def store_realtime_SCADA_data_task() -> None:
"""
定时执行任务1和使用schedule库每1分钟执行一次store_realtime_SCADA_data_job函数。
该任务会一直运行定期调用store_realtime_SCADA_data_job获取SCADA数据。
:return:
"""
# 每1分钟执行一次store_realtime_SCADA_data_job
schedule.every(1).minute.do(store_realtime_SCADA_data_job)
# 持续执行任务,检查是否有待执行的任务
while True:
schedule.run_pending() # 执行所有待处理的定时任务
time.sleep(1) # 暂停1秒避免过于频繁的任务检查
if __name__ == "__main__":
url = "http://localhost:8086" # 替换为你的InfluxDB实例地址
token = "Z4UZj9HuLwLlwoApywvT2nGVP3bwLy18y-sJQ7enzZlJd8YMzMWbBA6F-q4gBiZ-7-IqdxR5aR9LvicKiSNmnA==" # 替换为你的InfluxDB Token
org_name = "beibei" # 替换为你的Organization名称
client = InfluxDBClient(url=url, token=token)
# step2: 先查询pg数据库中scada_info的信息然后存储SCADA数据到SCADA_data这个bucket里
influxdb_api.query_pg_scada_info_realtime('bb')
# 自动执行
store_realtime_SCADA_data_task()

54
globals.py Normal file
View File

@@ -0,0 +1,54 @@
# simulation.py中的全局变量
# reservoir basic height
RESERVOIR_BASIC_HEIGHT = float(250.35)
# 实时数据类element_id和api_query_id对应
reservoirs_id = {}
tanks_id = {}
fixed_pumps_id ={}
variable_pumps_id = {}
pressure_id = {}
demand_id = {}
quality_id = {}
# 实时数据类pattern_id和api_query_id对应
source_outflow_pattern_id = {}
realtime_pipe_flow_pattern_id = {}
pipe_flow_region_patterns = {} # 根据realtime的pipe_flow对non_realtime的demand进行分区
# 分区查询
source_outflow_region = {} # 以绑定的管段作为value
source_outflow_region_id = {} # 以api_query_id作为value
source_outflow_region_patterns = {} # 以associated_pattern作为value
# 非实时数据的pattern
non_realtime_region_patterns = {} # 基于source_outflow_region进行区分
realtime_region_pipe_flow_and_demand_id = {} # 基于source_outflow_region搜索该分区中的实时pipe_flow和demand的api_query_id后续用region的流量 - 实时流量计的流量
realtime_region_pipe_flow_and_demand_patterns = {} # 基于source_outflow_region搜索该分区中的实时pipe_flow和demand的associated_pattern后续用region的流量 - 实时流量计的流量
# ---------------------------------------------------------
# influxdb_api.py中的全局变量
# 全局变量用于存储不同类型的realtime api_query_id
reservoir_liquid_level_realtime_ids = []
tank_liquid_level_realtime_ids = []
fixed_pump_realtime_ids = []
variable_pump_realtime_ids = []
source_outflow_realtime_ids = []
pipe_flow_realtime_ids = []
pressure_realtime_ids = []
demand_realtime_ids = []
quality_realtime_ids = []
# transmission_frequency的最大值
transmission_frequency = None
hydraulic_timestep = None
reservoir_liquid_level_non_realtime_ids = []
tank_liquid_level_non_realtime_ids = []
fixed_pump_non_realtime_ids = []
variable_pump_non_realtime_ids = []
source_outflow_non_realtime_ids = []
pipe_flow_non_realtime_ids = []
pressure_non_realtime_ids = []
demand_non_realtime_ids = []
quality_non_realtime_ids = []

File diff suppressed because it is too large Load Diff

570
main.py
View File

@@ -7,7 +7,9 @@ from urllib.request import Request
from xml.dom import minicompat
from pydantic import BaseModel
from starlette.responses import FileResponse, JSONResponse
from fastapi import FastAPI, File, UploadFile, Response, status, Request, Body, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import Receive
from fastapi import FastAPI, File, UploadFile, Response, status, Request, Body, HTTPException,Query
from fastapi.responses import PlainTextResponse
from fastapi.middleware.gzip import GZipMiddleware
from tjnetwork import *
@@ -15,8 +17,14 @@ import asyncio
import threading
import uvicorn
from multiprocessing import Value
import uvicorn
from run_simulation import run_simulation, run_simulation_ex
from online_Analysis import *
import logging
from fastapi.middleware.cors import CORSMiddleware
import random
from datetime import datetime
import shutil
import logging
import redis
import datetime
@@ -39,6 +47,7 @@ tmpDir = "C:/tmpfiles/"
lockedPrjs = {}
if not os.path.exists(inpDir):
os.mkdir(inpDir)
@@ -58,7 +67,11 @@ influx_org_name = "TJWATERORG" # 替换为你的Organization名称
influx_client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org_name)
# 配置日志记录器
logging.basicConfig(level=logging.INFO)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# 配置 CORS 中间件
@@ -69,11 +82,69 @@ app.add_middleware(
allow_methods=["*"], # 允许所有 HTTP 方法
allow_headers=["*"], # 允许所有 HTTP 头
)
# 定义一个共享变量
lock_simulation = Value('i', 0)
app.add_middleware(GZipMiddleware, minimum_size=1000)
async def receive_with_body(body: bytes) -> Receive:
async def receive() -> dict:
return {
"type": "http.request",
"body": body,
"more_body": False,
}
return receive
@app.middleware("http")
async def log_requests(request: Request, call_next):
if request.url.path == "/favicon.ico":
return Response(status_code=204)
# 记录接收请求的时间
request_time = time.time()
request_time_str = datetime.fromtimestamp(request_time).strftime('%Y-%m-%d %H:%M:%S')
# 判断是否为文件上传
is_file_upload = request.headers.get("content-type", "").startswith("multipart/form-data")
# 记录接收的请求数据
logging.info(f"Received request: {request.method} {request.url} at {request_time_str}")
if not is_file_upload:
request_body = await request.body()
logging.info(f"Request body: {request_body.decode('utf-8')}")
# 创建新的 Request 对象,传递缓存的请求体
receive = await receive_with_body(request_body)
request = Request(request.scope, receive=receive)
else:
logging.info(f"Request body: File")
# 处理请求
response = await call_next(request)
# 记录发送响应的时间
response_time = time.time()
response_time_str = datetime.fromtimestamp(response_time).strftime('%Y-%m-%d %H:%M:%S')
processing_time = response_time - request_time
# 记录发送的响应数据
# response_body = b""
# async for chunk in response.body_iterator:
# response_body += chunk
# 记录响应的状态码以及是否成功
success_status = response.status_code < 400
logging.info(f"Response status: {response.status_code} at {response_time_str}, success: {success_status}")
# logging.info(f"Response body: {response_body.decode('utf-8')}")
logging.info(f"Processing time: {processing_time:.3f} seconds")
return response
@app.on_event("startup")
async def startup_db():
logger.info('\n')
@@ -82,7 +153,8 @@ async def startup_db():
logger.info("TJWater CloudService is starting...")
logger.info('**********************************************************')
logger.info('\n')
############################################################
# extension_data
############################################################
@@ -382,7 +454,7 @@ async def fastapi_is_pipe(network: str, link: str) -> bool:
@app.get('/ispump/')
async def fastapi_is_pump(network: str, link: str) -> bool:
return is_pump(network, lin)
return is_pump(network, link)
@app.get('/isvalve/')
async def fastapi_is_valve(network: str, link: str) -> bool:
@@ -1712,7 +1784,7 @@ async def fastapi_get_scada_element_schema(network: str) -> dict[str, dict[str,
@app.get('/getscadaelements/')
async def fastapi_get_scada_elements(network: str) -> list[str]:
return get_all_scada_elements(network)
return get_scada_elements(network)
@app.get('/getscadaelement/')
async def fastapi_get_scada_element(network: str, id: str) -> dict[str, Any]:
@@ -2050,8 +2122,492 @@ def generate_openapi_json():
with open(openapi_json_path, "w") as file:
json.dump(app.openapi(), file, indent=4)
############################################################
# real_time api 37
# example: http://127.0.0.1:8000/runsimulation?network=beibeizone&start_time=2024-04-01T08:00:00Z
############################################################
# 必须用这个PlainTextResponse不然每个key都有引号
# @app.get("/runsimulation/", response_class = PlainTextResponse)
# async def fastapi_run_project(network: str,start_time:str,end_time=None) -> str:
# filename = 'c:/lock.simulation'
# filename2 = 'c:/lock.simulation2'
# if os.path.exists(filename2):
# print('file exists')
# raise HTTPException(status_code=409, detail="is in simulation")
# else:
# print('file doesnt exists')
# #os.rename(filename, filename2)
# result = run_simulation(network,start_time,end_time)
# #os.rename(filename2, filename)
# return result
############################################################
# real_time api 37
# example: http://127.0.0.1:8000/runsimulation?network=beibeizone&start_time=2024-04-01T08:00:00Z
############################################################
# 必须用这个PlainTextResponse不然每个key都有引号
@app.get("/runsimulation/", response_class = PlainTextResponse)
async def fastapi_run_project(network: str,start_time:str,end_time=None) -> str:
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = run_simulation_ex(name=network, simulation_type='realtime',
start_datetime=start_time, end_datetime=end_time)
#os.rename(filename2, filename)
return result
############################################################
# real_time api 37.5
# example:
# response = requests.post("http://127.0.0.1:8000/runsimulation",
# data=json.dumps({'network': 'bb_server', 'simulation_type': 'extended',
# 'start_time': '2024-05-17T09:30:00Z', 'duration': 900,
# 'pump_control': {'1#': [0, 0], '2#': [1, 1], '3#': [1, 1], '4#': [1, 0],
# '5#': [45, 43], '6#': [0, 0], '7#': [0, 0]}}),
# headers={'accept': 'application/json', 'Content-Type': 'application/json'})
############################################################
# class RunSimuItem(BaseModel):
# network: str
# simulation_type: str
# start_time: str
# end_time: Optional[str] = None
# duration: Optional[int] = 900
# pump_control: Optional[dict] = None
#
#
# @app.post("/runsimulation/")
# async def fastapi_run_project(item: RunSimuItem) -> str:
# item = item.dict()
# filename = 'c:/lock.simulation'
# filename2 = 'c:/lock.simulation2'
# if os.path.exists(filename2):
# print('file exists')
# raise HTTPException(status_code=409, detail="is in simulation")
# else:
# print('file doesnt exists')
# #os.rename(filename, filename2)
# result = run_simulation_ex(item['network'], item['simulation_type'],
# item['start_time'], item['end_time'],
# item['duration'], item['pump_control'])
# #os.rename(filename2, filename)
# return result
############################################################
# burst analysis api 38
#example:http://127.0.0.1:8000/burst_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&burst_ID=ZBBGXSZW000001&burst_size=200&duration=1800
############################################################
# @app.get("/burst_analysis/", response_class = PlainTextResponse)
# async def fastapi_burst_analysis(network: str,start_time:str,burst_ID:str,burst_size:float,burst_flow:float=None,duration:int=None) -> str:
# filename = 'c:/lock.simulation'
# filename2 = 'c:/lock.simulation2'
# if os.path.exists(filename2):
# print('file exists')
# raise HTTPException(status_code=409, detail="is in simulation")
# else:
# print('file doesnt exists')
# #os.rename(filename, filename2)
# result = burst_analysis(network,start_time,burst_ID,burst_size,burst_flow,duration)
# #os.rename(filename2, filename)
# return result
############################################################
# burst analysis api 38.5
# example:
# response = requests.post("http://127.0.0.1:8000/burst_analysis",
# data=json.dumps({'network': 'bb_server',
# 'start_time': '2024-05-17T09:30:00Z',
# 'burst_ID': ['ZBBGXSZW000001'],
# 'burst_size': [200],
# 'duration': 1800,
# 'pump_control': {'1#': [0, 0, 0], '2#': [1, 1, 1], '3#': [1, 1, 1], '4#': [1, 1, 1],
# '5#': [45, 45, 45], '6#': [0, 0, 0], '7#': [0, 0, 0]}
# 'valve_closed': ['GSD2307192058576667FF7B41FF']),
# headers={'accept': 'application/json', 'Content-Type': 'application/json'})
############################################################
class BurstAnalysis(BaseModel):
network: str
start_time: str
burst_ID: list[str] | str
burst_size: list[float] | float
duration: int
pump_control: Optional[dict] = None
valve_closed: Optional[list] = None
@app.post("/burst_analysis/")
async def fastapi_burst_analysis(data: BurstAnalysis) -> str:
item = data.dict()
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = burst_analysis(prj_name=item['network'],
date_time=item['start_time'],
burst_ID=item['burst_ID'],
burst_size=item['burst_size'],
duration=item['duration'],
pump_control=item['pump_control'],
valve_closed=item['valve_closed'])
#os.rename(filename2, filename)
return result
############################################################
# valve close analysis api 39
#example:http://127.0.0.1:8000/valve_close_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valves=GSD2307192058577780A3287D78&valves=GSD2307192058572E953B707226(S2)&duration=1800
############################################################
@app.get("/valve_close_analysis/", response_class = PlainTextResponse)
async def fastapi_valve_close_analysis(network: str,start_time:str,valves:Annotated[list[str], Query()],duration:int=None) -> str:
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = valve_close_analysis(network,start_time,valves,duration)
#os.rename(filename2, filename)
return result
############################################################
# pipe flushing analysis api 40
#example:http://127.0.0.1:8000/flushing_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valves=GSD230719205857733F8F5214FF&valves=GSD230719205857C0AF65B6A170&valves_k=0.5&valves_k=0.5&drainage_node_ID=GSD2307192058570DEDF28E4F73&flush_flow=0&duration=1800
############################################################
@app.get("/flushing_analysis/", response_class = PlainTextResponse)
async def fastapi_flushing_analysis(network: str,start_time:str,valves:Annotated[list[str], Query()],valves_k:Annotated[list[float], Query()],drainage_node_ID:str,flush_flow:float=0,duration:int=None) -> str:
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = flushing_analysis(network,start_time,valves,valves_k,drainage_node_ID,flush_flow,duration)
#os.rename(filename2, filename)
return result
############################################################
# contaminant_simulation api 41
#example:http://127.0.0.1:8000/contaminant_simulation?network=beibeizone&start_time=2024-04-01T08:00:00Z&source=ZBBDTZDP002677&concentration=100&duration=1800
############################################################
@app.get("/contaminant_simulation/", response_class = PlainTextResponse)
async def fastapi_contaminant_simulation(network: str,start_time:str,source:str,concentration:float,duration:int=900,pattern:str=None) -> str:
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = contaminant_simulation(network,start_time,source,concentration,duration,pattern)
#os.rename(filename2, filename)
return result
############################################################
# age analysis api 42
#example:http://127.0.0.1:8000/age_analysis/?network=bb&start_time=2024-04-01T00:00:00Z&end_time=2024-04-01T08:00:00Z&duration=28800
############################################################
@app.get("/age_analysis/", response_class = PlainTextResponse)
async def fastapi_age_analysis(network: str, start_time:str, end_time:str, duration:int) -> str:
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = age_analysis(network,start_time,end_time,duration)
#os.rename(filename2, filename)
return result
############################################################
# scheduling analysis api 43
############################################################
class SchedulingAnalysis(BaseModel):
network: str
start_time: str
pump_control: dict
tank_id: str
water_plant_output_id: str
time_delta: Optional[int] = 300
@app.post("/scheduling_analysis/")
async def fastapi_scheduling_analysis(data: SchedulingAnalysis) -> str:
data = data.dict()
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = scheduling_simulation(data['network'], data['start_time'],
data['pump_control'], data['tank_id'],
data['water_plant_output_id'], data['time_delta'])
#os.rename(filename2, filename)
return result
############################################################
# pressure_regulating api 44
# example:
# response = requests.post("http://127.0.0.1:8000/pressure_regulating",
# data=json.dumps({'network': 'bb_server',
# 'start_time': '2024-05-17T09:30:00Z',
# 'pump_control': {'1#': [0, 0], '2#': [1, 1], '3#': [1, 1], '4#': [1, 1],
# '5#': [45, 45], '6#': [0, 0], '7#': [0, 0]}
# 'tank_init_level': {'ZBBDTJSC000002': 2, 'ZBBDTJSC000001': 2}}),
# headers={'accept': 'application/json', 'Content-Type': 'application/json'})
############################################################
class PressureRegulation(BaseModel):
network: str
start_time: str
pump_control: dict
tank_init_level: Optional[dict] = None
@app.post("/pressure_regulation/")
async def fastapi_pressure_regulation(data: PressureRegulation) -> str:
item = data.dict()
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = pressure_regulation(prj_name=item['network'],
start_datetime=item['start_time'],
pump_control=item['pump_control'],
tank_initial_level_control=item['tank_init_level'])
#os.rename(filename2, filename)
return result
############################################################
# project_management api 45
# example:
# response = requests.post("http://127.0.0.1:8000/project_management",
# data=json.dumps({'network': 'bb_server',
# 'start_time': '2024-05-17T00:00:00Z',
# 'pump_control':
# {'1#':(list:97), '2#':(list:97), '3#':(list:97), '4#':(list:97),
# '5#':(list:97), '6#':(list:97), '7#':(list:97)}
# 'tank_init_level': {'ZBBDTJSC000002': 2, 'ZBBDTJSC000001': 2}
# 'region_demand': {'hp': 150000, 'lp': 40000}}),
# headers={'accept': 'application/json', 'Content-Type': 'application/json'})
############################################################
class ProjectManagement(BaseModel):
network: str
start_time: str
pump_control: dict
tank_init_level: Optional[dict] = None
region_demand: Optional[dict] = None
@app.post("/project_management/")
async def fastapi_project_management(data: ProjectManagement) -> str:
item = data.dict()
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = project_management(prj_name=item['network'],
start_datetime=item['start_time'],
pump_control=item['pump_control'],
tank_initial_level_control=item['tank_init_level'],
region_demand_control=item['region_demand'])
#os.rename(filename2, filename)
return result
############################################################
# project_management api 46
# example:
# with open('./inp/bb_temp.inp', 'rb') as file:
# response = requests.post("http://127.0.0.1:8000/network_project",
# files={'file': file})
############################################################
@app.post("/network_project/")
async def fastapi_network_project(file: UploadFile = File()) -> str:
temp_file_path = './inp/'
if not os.path.exists(temp_file_path):
os.mkdir(temp_file_path)
temp_file_name = f'network_project_{datetime.now().strftime("%Y%m%d")}'
temp_file_path = f'{temp_file_path}{temp_file_name}.inp'
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
buffer.close()
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
result = run_inp(temp_file_name)
#os.rename(filename2, filename)
return result
############################################################
# daily scheduling analysis api 47
############################################################
class DailySchedulingAnalysis(BaseModel):
network: str
start_time: str
pump_control: dict
reservoir_id: str
tank_id: str
water_plant_output_id: str
time_delta: Optional[int] = 300
@app.post("/daily_scheduling_analysis/")
async def fastapi_daily_scheduling_analysis(data: DailySchedulingAnalysis) -> str:
data = data.dict()
filename = 'c:/lock.simulation'
filename2 = 'c:/lock.simulation2'
if os.path.exists(filename2):
print('file exists')
raise HTTPException(status_code=409, detail="is in simulation")
else:
print('file doesnt exists')
#os.rename(filename, filename2)
result = daily_scheduling_simulation(data['network'], data['start_time'],
data['pump_control'], data['reservoir_id'], data['tank_id'],
data['water_plant_output_id'])
#os.rename(filename2, filename)
return result
############################################################
# network_update api 48
############################################################
@app.post("/network_update/")
async def fastapi_network_update(file: UploadFile = File()) -> str:
# 默认文件夹
default_folder = './'
# 使用当前时间生成临时文件名
temp_file_name = f'network_update_{datetime.now().strftime("%Y%m%d")}'
temp_file_path = os.path.join(default_folder, temp_file_name)
# 保存上传的文件到服务器
try:
with open(temp_file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
buffer.close()
print(f"文件 {temp_file_name} 已成功保存。")
except Exception as e:
raise HTTPException(status_code=500, detail=f"文件保存失败: {e}")
# 更新数据库
try:
network_update(temp_file_path)
return json.dumps({"message": "管网更新成功"})
except Exception as e:
raise HTTPException(status_code=500, detail=f"数据库操作失败: {e}")
############################################################
# pump failure api 49
############################################################
class PumpFailureState(BaseModel):
time: str
pump_status: dict
@app.post("/pump_failure/")
async def fastapi_pump_failure(data: PumpFailureState) -> str:
item = data.dict()
with open('./pump_failure_message.txt', 'a', encoding='utf-8-sig') as f1:
f1.write('[{}] {}\n'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), item)) # save message
status_info = item.copy()
with open('./pump_failure_status.txt', 'r', encoding='utf-8-sig') as f2:
lines = f2.readlines()
first_stage_pump_status_dict = json.loads(json.dumps(eval(lines[0])))
second_stage_pump_status_dict = json.loads(json.dumps(eval(lines[-1]))) # read local file
pump_status_dict = {'first': first_stage_pump_status_dict, # first-stage pump
'second': second_stage_pump_status_dict} # second-stage pump
for pump_type in status_info['pump_status'].keys(): # 'first' or 'second'
if pump_type in pump_status_dict.keys(): # the type of pumps exists
if all(pump_id in pump_status_dict[pump_type].keys()
for pump_id in status_info['pump_status'][pump_type].keys()): # all pump IDs exist
for pump_id in status_info['pump_status'][pump_type].keys():
pump_status_dict[pump_type][pump_id] = int(
status_info['pump_status'][pump_type][pump_id]) # modify status dict
else:
return json.dumps('ERROR: Wrong Pump ID')
else:
return json.dumps('ERROR: Wrong Pump Type')
with open('./pump_failure_status.txt', 'w', encoding='utf-8-sig') as f2_:
f2_.write('{}\n{}'.format(pump_status_dict['first'], pump_status_dict['second'])) # save local file
return json.dumps('SUCCESS')
# DingZQ, 2024-12-31, run main
# if __name__ == "__main__":
# generate_openapi_json()
# uvicorn.run(app, host="127.0.0.1", port=80)
# uvicorn.run(app, host="127.0.0.1", port=80)

776
online_Analysis.py Normal file
View File

@@ -0,0 +1,776 @@
import os
from tjnetwork import *
from api.project import CopyProjectEx
from run_simulation import run_simulation_ex, from_clock_to_seconds_2
from math import sqrt, pi
from epanet.epanet import Output
import json
from datetime import datetime
import time
import pytz
import psycopg
from psycopg import sql
import pandas as pd
import csv
import chardet
############################################################
# burst analysis 01
############################################################
def burst_analysis(prj_name, date_time, burst_ID: list | str,
burst_size: list | float | int = None, duration=900, pump_control=None, valve_closed=None) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'burst_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
##step 1 set the emitter coefficient of end node of busrt pipe
if isinstance(burst_ID, list):
if (burst_size is not None) and (type(burst_size) is not list):
return json.dumps('Type mismatch.')
elif isinstance(burst_ID, str):
burst_ID = [burst_ID]
if burst_size is not None:
if isinstance(burst_size, float) or isinstance(burst_size, int):
burst_size = [burst_size]
else:
return json.dumps('Type mismatch.')
else:
return json.dumps('Type mismatch.')
if burst_size is None:
burst_size = [-1] * len(burst_ID)
elif len(burst_size) < len(burst_ID):
burst_size += [-1] * (len(burst_ID) - len(burst_size))
elif len(burst_size) > len(burst_ID):
# burst_size = burst_size[:len(burst_ID)]
return json.dumps('Length mismatch.')
for burst_ID_, burst_size_ in zip(burst_ID, burst_size):
pipe = get_pipe(new_name, burst_ID_)
str_start_node = pipe['node1']
str_end_node = pipe['node2']
d_pipe = pipe['diameter'] / 1000.0
if burst_size_ <= 0:
burst_size_ = 3.14 * d_pipe * d_pipe / 4 / 8
else:
burst_size_ = burst_size_ / 10000
emitter_coeff = 0.65 * burst_size_ * sqrt(19.6) * 1000#1/8开口面积作为coeff
emitter_node = ''
if is_junction(new_name, str_end_node):
emitter_node = str_end_node
elif is_junction(new_name, str_start_node):
emitter_node = str_start_node
old_emitter = get_emitter(new_name, emitter_node)
if(old_emitter != None):
old_emitter['coefficient'] = emitter_coeff #爆管的emitter coefficient设置
else:
old_emitter = {'junction': emitter_node, 'coefficient': emitter_coeff}
new_emitter = ChangeSet()
new_emitter.append(old_emitter)
set_emitter(new_name, new_emitter)
#step 2. run simulation
# 涉及关阀计算可能导致关阀后仍有流量改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '20.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
valve_control = None
if valve_closed is not None:
valve_control = {}
for valve in valve_closed:
valve_control[valve] = {'status': 'CLOSED'}
result = run_simulation_ex(new_name,'realtime', date_time,
end_datetime=date_time,
duration=duration,
pump_control=pump_control,
valve_control=valve_control,
downloading_prohibition=True)
#step 3. restore the base model status
# execute_undo(prj_name) #有疑惑
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# valve closing analysis 02
############################################################
def valve_close_analysis(prj_name, date_time, valves, duration=None)->str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'valve_close_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
#step 1. change the valves status to 'closed'
for valve in valves:
if not is_valve(new_name,valve):
result='ID:{}is not a valve type'.format(valve)
return result
cs=ChangeSet()
status=get_status(new_name,valve)
status['status']='CLOSED'
cs.append(status)
set_status(new_name,cs)
#step 2. run simulation
# 涉及关阀计算可能导致关阀后仍有流量改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '20.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration,
downloading_prohibition=True)
#step 3. restore the base model
# for valve in valves:
# execute_undo(prj_name)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# flushing analysis 03
#Pipe_Flushing_Analysis(prj_name,date_time, Valve_id_list, Drainage_Node_Id, Flushing_flow[opt], Flushing_duration[opt])->out_file:string
############################################################
def flushing_analysis(prj_name, date_time, valves, valves_k, drainage_node_ID, flushing_flow=0, duration=None)->str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'flushing_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
if not is_junction(new_name,drainage_node_ID):
return 'Wrong Drainage node type'
#step 1. change the valves status to 'closed'
for valve, valve_k in zip(valves, valves_k):
cs=ChangeSet()
status=get_status(new_name,valve)
# status['status']='CLOSED'
if valve_k == 0:
status['status'] = 'CLOSED'
elif valve_k < 1:
status['status'] = 'OPEN'
status['setting'] = 0.1036 * pow(valve_k, -3.105)
cs.append(status)
set_status(new_name,cs)
#step 2. set the emitter coefficient of drainage node or add flush flow to the drainage node
emitter_demand=get_demand(new_name,drainage_node_ID)
cs=ChangeSet()
if flushing_flow>0:
for r in emitter_demand['demands']:
r['demand']+=(flushing_flow/3.6)
cs.append(emitter_demand)
set_demand(new_name,cs)
else:
pipes=get_node_links(new_name,drainage_node_ID)
flush_diameter=50
for pipe in pipes:
d=get_pipe(new_name,pipe)['diameter']
if flush_diameter<d:
flush_diameter=d
flush_diameter/=1000
emitter_coeff=0.65*3.14*(flush_diameter*flush_diameter/4)*sqrt(19.6)*1000 #全开口面积作为coeff
old_emitter=get_emitter(new_name,drainage_node_ID)
if(old_emitter!=None):
old_emitter['coefficient']=emitter_coeff #爆管的emitter coefficient设置
else:
old_emitter={'junction':drainage_node_ID,'coefficient':emitter_coeff}
new_emitter=ChangeSet()
new_emitter.append(old_emitter)
set_emitter(new_name,new_emitter)
#step 3. run simulation
# 涉及关阀计算可能导致关阀后仍有流量改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '20.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration,
downloading_prohibition=True)
#step 4. restore the base model
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# Contaminant simulation 04
#
############################################################
def contaminant_simulation(prj_name:str,date_time:str, source:str,concentration:float,duration=900,pattern:str=None)->str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'contaminant_Sim_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
dic_time = get_time(new_name)
dic_time['QUALITY TIMESTEP'] = '0:05:00'
cs = ChangeSet()
cs.operations.append(dic_time)
set_time(new_name, cs) # set QUALITY TIMESTEP
time_option=get_time(new_name)
hydraulic_step=time_option['HYDRAULIC TIMESTEP']
secs=from_clock_to_seconds_2(hydraulic_step)
operation_step=0
#step 1. set duration
if duration==None:
duration=secs
#step 2. set pattern
if pattern!=None:
pt=get_pattern(new_name,pattern)
if pt==None:
str_response=str('cant find pattern')
return str_response
else:
cs_pattern=ChangeSet()
pt={}
factors=[]
tmp_duration=duration
while tmp_duration>0:
factors.append(1.0)
tmp_duration=tmp_duration-secs
pt['id']='contam_pt'
pt['factors']=factors
cs_pattern.append(pt)
add_pattern(new_name,cs_pattern)
operation_step+=1
#step 3. set source/initial quality
# source quality
cs_source=ChangeSet()
source_schema={'node':source,'s_type':SOURCE_TYPE_CONCEN,'strength':concentration,'pattern':pt['id']}
cs_source.append(source_schema)
source_node=get_source(new_name,source)
if len(source_node)==0:
add_source(new_name,cs_source)
else:
set_source(new_name,cs_source)
dict_demand = get_demand(new_name, source)
for demands in dict_demand['demands']:
dict_demand['demands'][dict_demand['demands'].index(demands)]['demand'] = -1
dict_demand['demands'][dict_demand['demands'].index(demands)]['pattern'] = None
cs = ChangeSet()
cs.append(dict_demand)
set_demand(new_name, cs) # set inflow node
# # initial quality
# dict_quality = get_quality(new_name, source)
# dict_quality['quality'] = concentration
# cs = ChangeSet()
# cs.append(dict_quality)
# set_quality(new_name, cs)
operation_step+=1
#step 4 set option of quality to chemical
opt=get_option(new_name)
opt['QUALITY']=OPTION_QUALITY_CHEMICAL
cs_option=ChangeSet()
cs_option.append(opt)
set_option(new_name,cs_option)
operation_step+=1
#step 5. run simulation
result = run_simulation_ex(new_name,'realtime', date_time, date_time, duration,
downloading_prohibition=True)
# for i in range(1,operation_step):
# execute_undo(prj_name)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# age analysis 05
############################################################
def age_analysis(prj_name, start_time, end_time, duration) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'age_Anal_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
# step 1. run simulation
result = run_simulation_ex(new_name, 'realtime', start_time, end_time, duration,
downloading_prohibition=True)
# step 2. restore the base model status
# execute_undo(prj_name) #有疑惑
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
output = Output("./temp/{}.db.out".format(new_name))
# element_name = output.element_name()
# node_name = element_name['nodes']
# link_name = element_name['links']
nodes_age = []
node_result = output.node_results()
for node in node_result:
nodes_age.append(node['result'][-1]['quality'])
links_age = []
link_result = output.link_results()
for link in link_result:
links_age.append(link['result'][-1]['quality'])
age_result = {'nodes': nodes_age, 'links': links_age}
# age_result = {'nodes': nodes_age, 'links': links_age, 'nodeIDs': node_name, 'linkIDs': link_name}
return json.dumps(age_result)
############################################################
# pressure regulation 06
############################################################
def pressure_regulation(prj_name, start_datetime, pump_control, tank_initial_level_control=None) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'pressure_regulation_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
# 全部关泵后压力计算不合理改为压力驱动PDA
options = get_option(new_name)
options['DEMAND MODEL'] = OPTION_DEMAND_MODEL_PDA
options['REQUIRED PRESSURE'] = '15.0000'
cs_options = ChangeSet()
cs_options.append(options)
set_option(new_name, cs_options)
result = run_simulation_ex(name=new_name,
simulation_type='realtime',
start_datetime=start_datetime,
duration=900,
pump_control=pump_control,
tank_initial_level_control=tank_initial_level_control,
downloading_prohibition=True)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# project management 07
############################################################
def project_management(prj_name, start_datetime, pump_control,
tank_initial_level_control=None, region_demand_control=None) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'project_management_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
result = run_simulation_ex(name=new_name,
simulation_type='realtime',
start_datetime=start_datetime,
duration=86400,
pump_control=pump_control,
tank_initial_level_control=tank_initial_level_control,
region_demand_control=region_demand_control,
downloading_prohibition=True)
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return result
############################################################
# scheduling analysis 08
############################################################
def scheduling_simulation(prj_name, start_time, pump_control, tank_id, water_plant_output_id, time_delta=300) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'scheduling_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
run_simulation_ex(new_name, 'realtime', start_time, duration=0, pump_control=pump_control)
if not is_project_open(new_name):
open_project(new_name)
tank = get_tank(new_name, tank_id) # 水塔信息
tank_floor_space = pi * pow(tank['diameter'] / 2, 2) # 水塔底面积(m^2)
tank_init_level = tank['init_level'] # 水塔初始水位(m)
tank_pipes_id = tank['links'] # pipes list
tank_pipe_flow_direction = {} # 管道流向修正系数, 水塔为下游节点时为1, 水塔为上游节点时为-1
for pipe_id in tank_pipes_id:
if get_pipe(new_name, pipe_id)['node2'] == tank_id: # 水塔为下游节点
tank_pipe_flow_direction[pipe_id] = 1
else:
tank_pipe_flow_direction[pipe_id] = -1
output = Output("./temp/{}.db.out".format(new_name))
node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float}]}]
water_plant_output_pressure = 0
for node_result in node_results:
if node_result['node'] == water_plant_output_id: # 水厂出水压力(m)
water_plant_output_pressure = node_result['result'][-1]['pressure']
water_plant_output_pressure /= 100 # 预计水厂出水压力(Mpa)
pipe_results = output.link_results() # [{'link': str, 'result': [{'flow': float}]}]
tank_inflow = 0
for pipe_result in pipe_results:
for pipe_id in tank_pipes_id: # 遍历与水塔相连的管道
if pipe_result['link'] == pipe_id: # 水塔入流流量(L/s)
tank_inflow += pipe_result['result'][-1]['flow'] * tank_pipe_flow_direction[pipe_id]
tank_inflow /= 1000 # 水塔入流流量(m^3/s)
tank_level_delta = tank_inflow * time_delta / tank_floor_space # 水塔水位改变值(m)
tank_level = tank_init_level + tank_level_delta # 预计水塔水位(m)
simulation_results = {'water_plant_output_pressure': water_plant_output_pressure,
'tank_init_level': tank_init_level,
'tank_level': tank_level}
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return json.dumps(simulation_results)
def daily_scheduling_simulation(prj_name, start_time, pump_control,
reservoir_id, tank_id, water_plant_output_id) -> str:
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Analysis.")
new_name = f'daily_scheduling_{prj_name}'
if have_project(new_name):
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
if is_project_open(prj_name):
close_project(prj_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Copying Database.")
CopyProjectEx()(prj_name, new_name,
['operation', 'current_operation', 'restore_operation', 'batch_operation', 'operation_table'])
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Start Opening Database.")
open_project(new_name)
print(datetime.now(pytz.timezone('Asia/Shanghai')).strftime("%Y-%m-%d %H:%M:%S") + " -- Database Loading OK.")
run_simulation_ex(new_name, 'realtime', start_time, duration=86400, pump_control=pump_control)
if not is_project_open(new_name):
open_project(new_name)
output = Output("./temp/{}.db.out".format(new_name))
node_results = output.node_results() # [{'node': str, 'result': [{'pressure': float, 'head': float}]}]
water_plant_output_pressure = []
reservoir_level = []
tank_level = []
for node_result in node_results:
if node_result['node'] == water_plant_output_id:
for result in node_result['result']:
water_plant_output_pressure.append(result['pressure'] / 100) # 水厂出水压力(Mpa)
elif node_result['node'] == reservoir_id:
for result in node_result['result']:
reservoir_level.append(result['head'] - 250.35) # 清水池液位(m)
elif node_result['node'] == tank_id:
for result in node_result['result']:
tank_level.append(result['pressure']) # 调节池液位(m)
simulation_results = {'water_plant_output_pressure': water_plant_output_pressure,
'reservoir_level': reservoir_level,
'tank_level': tank_level}
if is_project_open(new_name):
close_project(new_name)
delete_project(new_name)
return json.dumps(simulation_results)
############################################################
# network_update 10
############################################################
def network_update(file_path: str) -> None:
read_inp('bb', file_path)
csv_path = './history_pattern_flow.csv'
# # 检查文件是否存在
# if os.path.exists(csv_path):
# print(f"history_patterns_flows文件存在开始处理...")
#
# # 读取 CSV 文件
# df = pd.read_csv(csv_path)
#
# # 连接到 PostgreSQL 数据库(这里是数据库 "bb"
# with psycopg.connect("dbname=bb host=127.0.0.1") as conn:
# with conn.cursor() as cur:
# for index, row in df.iterrows():
# # 直接将数据插入,不进行唯一性检查
# insert_sql = sql.SQL("""
# INSERT INTO history_patterns_flows (id, factor, flow)
# VALUES (%s, %s, %s);
# """)
# # 将数据插入数据库
# cur.execute(insert_sql, (row['id'], row['factor'], row['flow']))
# conn.commit()
# print("数据成功导入到 'history_patterns_flows' 表格。")
# else:
# print(f"history_patterns_flows文件不存在。")
# 检查文件是否存在
if os.path.exists(csv_path):
print(f"history_patterns_flows文件存在开始处理...")
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect("dbname=bb host=127.0.0.1") as conn:
with conn.cursor() as cur:
with open(csv_path, newline='', encoding='utf-8-sig') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# 直接将数据插入,不进行唯一性检查
insert_sql = sql.SQL("""
INSERT INTO history_patterns_flows (id, factor, flow)
VALUES (%s, %s, %s);
""")
# 将数据插入数据库
cur.execute(insert_sql, (row['id'], row['factor'], row['flow']))
conn.commit()
print("数据成功导入到 'history_patterns_flows' 表格。")
else:
print(f"history_patterns_flows文件不存在。")
def submit_scada_info(name: str, coord_id: str) -> None:
"""
将scada信息表导入pg数据库
:param name: 项目名称(数据库名称)
:param coord_id: 坐标系的id如4326根据原始坐标信息输入
:return:
"""
scada_info_path = './scada_info.csv'
# 检查文件是否存在
if os.path.exists(scada_info_path):
print(f"scada_info文件存在开始处理...")
# 自动检测文件编码
with open(scada_info_path, 'rb') as file:
raw_data = file.read()
detected = chardet.detect(raw_data)
file_encoding = detected['encoding']
print(f"检测到的文件编码:{file_encoding}")
try:
# 动态替换数据库名称
conn_string = f"dbname={name} host=127.0.0.1"
# 连接到 PostgreSQL 数据库(这里是数据库 "bb"
with psycopg.connect(conn_string) as conn:
with conn.cursor() as cur:
# 检查 scada_info 表是否为空
cur.execute("SELECT COUNT(*) FROM scada_info;")
count = cur.fetchone()[0]
if count > 0:
print("scada_info表中已有数据正在清空记录...")
cur.execute("DELETE FROM scada_info;")
print("表记录已清空。")
with open(scada_info_path, newline='', encoding=file_encoding) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# 将CSV单元格值为空的字段转换为 None
cleaned_row = {key: (value if value.strip() else None) for key, value in row.items()}
# 处理 associated_source_outflow_id 列动态变化
associated_columns = [f"associated_source_outflow_id{i}" for i in range(1, 21)]
associated_values = [
(cleaned_row.get(col).strip() if cleaned_row.get(col) and cleaned_row.get(
col).strip() else None)
for col in associated_columns
]
# 将 X_coor 和 Y_coor 转换为 geometry 类型
x_coor = float(cleaned_row['X_coor']) if cleaned_row['X_coor'] else None
y_coor = float(cleaned_row['Y_coor']) if cleaned_row['Y_coor'] else None
coord = f"SRID={coord_id};POINT({x_coor} {y_coor})" if x_coor and y_coor else None
# 准备插入 SQL 语句
insert_sql = sql.SQL("""
INSERT INTO scada_info (
id, type, associated_element_id, associated_pattern,
associated_pipe_flow_id, {associated_columns},
API_query_id, transmission_mode, transmission_frequency,
X_coor, Y_coor, coord
)
VALUES (
%s, %s, %s, %s, %s, {associated_placeholders},
%s, %s, %s, %s, %s, %s
);
""").format(
associated_columns=sql.SQL(", ").join(sql.Identifier(col) for col in associated_columns),
associated_placeholders=sql.SQL(", ").join(sql.Placeholder() for _ in associated_columns)
)
# 将数据插入数据库
cur.execute(insert_sql, (
cleaned_row['id'], cleaned_row['type'], cleaned_row['associated_element_id'],
cleaned_row.get('associated_pattern'), cleaned_row.get('associated_pipe_flow_id'),
*associated_values, cleaned_row.get('API_query_id'),
cleaned_row['transmission_mode'], cleaned_row['transmission_frequency'],
x_coor, y_coor, coord
))
conn.commit()
print("数据成功导入到 'scada_info' 表格。")
except Exception as e:
print(f"导入时出错:{e}")
else:
print(f"scada_info文件不存在。")
if __name__ == '__main__':
# contaminant_simulation('bb_model','2024-06-24T00:00:00Z','ZBBDTZDP009034',30,1800)
# flushing_analysis('bb_model','2024-04-01T08:00:00Z',{'GSD230719205857733F8F5214FF','GSD230719205857C0AF65B6A170'},'GSD2307192058570DEDF28E4F73',0,duration=900)
# flushing_analysis('bb_model', '2024-08-26T08:00:00Z', ['GSD2307192058572E5C0E14D83E'], [0.5], 'ZBBDTZDP009410', 0,
# duration=1800)
# valve_close_analysis('bb_model','2024-04-01T08:00:00Z',['GSD2307192058576122D929EE99(L)'],duration=1800)
# burst_analysis('bb','2024-04-01T08:00:00Z','ZBBGXSZW000001',burst_size=200,duration=1800)
#run_simulation('beibeizone','2024-04-01T08:00:00Z')
# str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_no_burst.out')
# with open("out_dump.txt", "w") as f:
# f.write(str_dump)
# str_dump=dump_output('h:\\OneDrive\\tjwaterserver\\temp\\beibeizone.db_busrtID(ZBBGXSZW000001).out')
# with open("burst_out_dump.txt", "w") as f:
# f.write(str_dump)
# network_update('model22_1223.inp')
submit_scada_info('bb', '4490')

1063
simulation.py Normal file

File diff suppressed because it is too large Load Diff