Files
TJWaterServer/main.py
2022-12-03 14:09:03 +08:00

1346 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import io
import json
from typing import *
from urllib.request import Request
from xml.dom import minicompat
from pydantic import BaseModel
from starlette.responses import FileResponse, JSONResponse
from fastapi import FastAPI, File, UploadFile, Response, status, Request, Body
from fastapi.responses import PlainTextResponse
from tjnetwork import *
JUNCTION = 0
RESERVOIR = 1
TANK = 2
PIPE = 1
NODE_COUNT = 0
LINK_COUNT = 2
prjs = []
inpDir = "C:/inpfiles/"
tmpDir = "C:/tmpfiles/"
lockedPrjs = {}
if not os.path.exists(inpDir):
os.mkdir(inpDir)
if not os.path.exists(tmpDir):
os.mkdir(tmpDir)
app = FastAPI()
### project
@app.get('/listprojects/')
async def fastapi_list_projects() -> list[str]:
return list_project()
@app.get("/haveproject/")
async def fastapi_have_project(network: str):
return have_project(network)
@app.post("/createproject/")
async def fastapi_create_project(network: str):
create_project(network)
return network
@app.post("/deleteproject/")
async def fastapi_delete_project(network: str):
delete_project(network)
return True
@app.get("/isprojectopen/")
async def fastapi_is_project_open(network: str):
return is_project_open(network)
@app.get('/getprojectopencount/')
async def fastapi_get_project_open_count(network: str) -> int:
return get_project_open_count(network)
@app.post("/openproject/")
async def fastapi_open_project(network: str):
open_project(network)
return network
@app.post("/closeproject/")
async def fastapi_close_project(network: str):
close_project(network)
return True
@app.post("/copyproject/")
async def fastapi_copy_project(source: str, target: str):
copy_project(source, target)
return True
@app.post("/readinp/")
async def fastapi_read_inp(network: str, inp: str) -> None:
return read_inp(network, inp)
@app.get("/dumpinp/")
async def fastapi_dump_inp(network: str, inp: str) -> None:
return dump_inp(network, inp)
# 必须用这个PlainTextResponse不然每个key都有引号
@app.get("/runproject/", response_class = PlainTextResponse)
async def fastapi_run_project(network: str) -> str:
return run_project(network)
# put in inp folder, name without extension
@app.get("/runinp/")
async def fastapi_run_inp(network: str) -> str:
return run_inp(network)
# path is absolute path
@app.get("/dumpoutput/")
async def fastapi_dump_output(output: str) -> str:
return dump_output(output)
@app.get("/isprojectlocked/")
async def fastapi_is_locked(network: str):
return lockedPrjs.get(network) != None
@app.post("/lockproject/")
async def fastapi_lock_project(network: str, id: str):
lockedPrjs[network] = id
return True
@app.post("/unlockproject/")
async def fastapi_unlock_project(network: str, id: str):
if lockedPrjs[network] == id:
del lockedPrjs[network]
return True
### operations
@app.get('/getcurrentoperationid/')
async def fastapi_get_current_operaiton_id(network: str) -> int:
return get_current_operation(network)
@app.post('/undo/')
async def fastapi_undo(network: str):
return execute_undo(network)
@app.post('/redo/')
async def fastapi_redo(network: str):
return execute_redo(network)
@app.get('/havesnapshot/')
async def fastapi_have_snapshot(network: str, tag: str) -> bool:
return have_snapshot(network, tag)
@app.post('/takesnapshot/')
def fastapi_take_snapshot(network: str, tag: str) -> int | None:
return take_snapshot(network, tag)
@app.post('/picksnapshot/')
def fastapi_pick_snapshot(network: str, tag: str, discard: bool = False) -> ChangeSet:
return pick_snapshot(network, tag, discard)
@app.get("/syncwithserver/")
async def fastapi_sync_with_server(network: str, operationid: int) -> ChangeSet:
return sync_with_server(network, operationid)
@app.post("/batch/")
async def fastapi_execute_batch_commands(network: str, req: Request)-> ChangeSet:
jo_root = await req.json()
cs: ChangeSet = ChangeSet()
cs.operations = jo_root['operations']
return execute_batch_commands(network, cs)
@app.post("/compressedbatch/")
async def fastapi_execute_compressed_batch_commands(network: str, req: Request)-> ChangeSet:
jo_root = await req.json()
cs: ChangeSet = ChangeSet()
cs.operations = jo_root['operations']
return execute_batch_command(network, cs)
@app.get("/getrestoreoperation/")
async def fastapi_get_restore_operation(network : str) -> int:
return get_restore_operation(network)
############################################################
# type
############################################################
@app.get('/isnode/')
async def fastapi_is_node(network: str, node: str) -> bool:
return is_node(network, node)
@app.get('/isjunction/')
async def fastapi_is_junction(network: str, node: str) -> bool:
return is_junction(network, node)
@app.get('/isreservoir/')
async def fastapi_is_reservoir(network: str, node: str) -> bool:
return is_reservoir(network, node)
@app.get('/istank/')
async def fastapi_is_tank(network: str, node: str) -> bool:
return is_tank(network, node)
@app.get('/islink/')
async def fastapi_is_link(network: str, link: str) -> bool:
return is_link(network, link)
@app.get('/ispipe/')
async def fastapi_is_pipe(network: str, link: str) -> bool:
return is_pipe(network, link)
@app.get('/ispump/')
async def fastapi_is_pump(network: str, link: str) -> bool:
return is_pump(network, link)
@app.get('/isvalve/')
async def fastapi_is_valve(network: str, link: str) -> bool:
return is_valve(network, link)
@app.get('/iscurve/')
async def fastapi_is_curve(network: str, curve: str) -> bool:
return is_curve(network, curve)
@app.get('/ispattern/')
async def fastapi_is_pattern(network: str, pattern: str) -> bool:
return is_pattern(network, pattern)
@app.get("/getnodes/")
async def fastapi_get_nodes(network: str) -> list[str]:
return get_nodes(network)
@app.get("/getlinks/")
async def fastapi_get_links(network: str) -> list[str]:
return get_links(network)
@app.get("/getcurves/")
async def fastapi_get_curves(network: str) -> list[str]:
return get_curves(network)
@app.get("/getpatterns/")
async def fastapi_get_patterns(network: str) -> list[str]:
return get_patterns(network)
@app.get("/getnodelinks/")
def get_node_links(network: str, node: str) -> list[str]:
return get_node_links(network, node)
############################################################
# title 1.[TITLE]
############################################################
@app.get('/gettitleschema/')
async def fast_get_title_schema(network: str) -> dict[str, dict[str, Any]]:
return get_title_schema(network)
@app.get('/gettitle/')
async def fast_get_title(network: str) -> dict[str, Any]:
return get_title(network)
@app.get('/settitle/')
async def fastapi_set_title(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_title(network, ChangeSet(props))
############################################################
# junction 2.[JUNCTIONS]
############################################################
@app.get('/getjunctionschema')
async def fast_get_junction_schema(network: str) -> dict[str, dict[str, Any]]:
return get_junction_schema(network)
@app.post("/addjunction/")
async def fastapi_add_junction(network: str, junction: str, x: float, y: float, z: float) -> ChangeSet:
ps = { 'id' : junction,
'x' : x,
'y' : y,
'elevation' : z }
return add_junction(network, ChangeSet(ps))
@app.post("/deletejunction/")
async def fastapi_delete_junction(network: str, junction: str) -> ChangeSet:
ps = {'id' : junction}
return delete_junction(network, ChangeSet(ps))
@app.get("/getjunctionelevation/")
async def fastapi_get_junction_elevation(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['elevation']
@app.get("/getjunctionx/")
async def fastapi_get_junction_x(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['x']
@app.get("/getjunctiony/")
async def fastapi_get_junction_x(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['y']
@app.get("/getjunctioncoord/")
async def fastapi_get_junction_coord(network: str, junction: str) -> dict[str, float]:
ps = get_junction(network, junction)
coord = { 'x' : ps['x'],
'y' : ps['y'] }
return coord
@app.get("/getjunctiondemand/")
async def fastapi_get_junction_demand(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['demand']
@app.get("/getjunctionpattern/")
async def fastapi_get_junction_pattern(network: str, junction: str) -> str:
ps = get_junction(network, junction)
return ps['pattern']
@app.post("/setjunctionelevation/")
async def fastapi_set_junction_elevation(network: str, junction: str, elevation: float) -> ChangeSet:
ps = { 'id' : junction,
'elevation' : elevation }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctionx/")
async def fastapi_set_junction_x(network: str, junction: str, x: float) -> ChangeSet:
ps = { 'id' : junction,
'x' : x }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctiony/")
async def fastapi_set_junction_y(network: str, junction: str, y: float) -> ChangeSet:
ps = { 'id' : junction,
'y' : y }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctioncoord/")
async def fastapi_set_junction_coord(network: str, junction: str, x: float, y: float) -> ChangeSet:
ps = { 'id' : junction,
'x' : x,
'y' : y }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctiondemand/")
async def fastapi_set_junction_demand(network: str, junction: str, demand: float) -> ChangeSet:
ps = { 'id' : junction,
'demand' : demand }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctionpattern/")
async def fastapi_set_junction_pattern(network: str, junction: str, pattern: str) -> ChangeSet:
ps = { 'id' : junction,
'pattern' : pattern }
return set_junction(network, ChangeSet(ps))
@app.get("/getjunctionproperties/")
async def fastapi_get_junction_properties(network: str, junction: str) -> dict[str, Any]:
return get_junction(network, junction)
@app.post("/setjunctionproperties/")
async def fastapi_set_junction_properties(network: str, junction: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : junction } | props
return set_junction(network, ChangeSet(ps))
############################################################
# reservoir 3.[RESERVOIRS]
############################################################
@app.get('/getreservoirschema')
async def fast_get_reservoir_schema(network: str) -> dict[str, dict[str, Any]]:
return get_reservoir_schema(network)
@app.post("/addreservoir/")
async def fastapi_add_reservoir(network: str, reservoir: str, x: float, y: float, head: float) -> ChangeSet:
ps = { 'id' : reservoir,
'x' : x,
'y' : y,
'head' : head }
return add_reservoir(network, ChangeSet(ps))
@app.post("/deletereservoir/")
async def fastapi_delete_reservoir(network: str, reservoir: str) -> ChangeSet:
ps = { 'id' : reservoir }
return delete_reservoir(network, ChangeSet(ps))
@app.get("/getreservoirhead/")
async def fastapi_get_reservoir_head(network: str, reservoir: str) -> float | None:
ps = get_reservoir(network, reservoir)
return ps['head']
@app.get("/getreservoirpattern/")
async def fastapi_get_reservoir_pattern(network: str, reservoir: str) -> str | None:
ps = get_reservoir(network, reservoir)
return ps['pattern']
@app.get("/getreservoirx/")
async def fastapi_get_reservoir_x(network: str, reservoir: str) -> dict[str, float] | None:
ps = get_reservoir(network, reservoir)
return ps['x']
@app.get("/getreservoiry/")
async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None:
ps = get_reservoir(network, reservoir)
return ps['y']
@app.get("/getreservoircoord/")
async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None:
ps = get_reservoir(network, reservoir)
coord = { 'id' : reservoir,
'x' : ps['x'],
'y' : ps['y'] }
return coord
@app.post("/setreservoirhead/")
async def fastapi_set_reservoir_head(network: str, reservoir: str, head: float) -> ChangeSet:
ps = { 'id' : reservoir,
'head' : head }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoirpattern/")
async def fastapi_set_reservoir_pattern(network: str, reservoir: str, pattern: str) -> ChangeSet:
ps = { 'id' : reservoir,
'pattern' : pattern }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoirx/")
async def fastapi_set_reservoir_x(network: str, reservoir: str, x: float) -> ChangeSet:
ps = { 'id' : reservoir,
'x' : x }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoirx/")
async def fastapi_set_reservoir_y(network: str, reservoir: str, y: float) -> ChangeSet:
ps = { 'id' : reservoir,
'y' : y }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoircoord/")
async def fastapi_set_reservoir_y(network: str, reservoir: str, x: float, y: float) -> ChangeSet:
ps = { 'id' : reservoir,
'x' : x,
'y' : y }
return set_reservoir(network, ChangeSet(ps))
@app.get("/getreservoirproperties/")
async def fastapi_get_reservoir_properties(network: str, reservoir: str) -> dict[str, Any]:
return get_reservoir(network, reservoir)
@app.post("/setreservoirproperties/")
async def fastapi_set_reservoir_properties(network: str, reservoir: str
, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : reservoir } | props
return set_reservoir(network, ChangeSet(ps))
############################################################
# tank 4.[TANKS]
############################################################
@app.get('/gettankschema')
async def fast_get_tank_schema(network: str) -> dict[str, dict[str, Any]]:
return get_tank_schema(network)
@app.post("/addtank/")
async def fastapi_add_tank(network: str, tank: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet:
ps = { 'id' : tank,
'x' : x,
'y' : y,
'elevation' : elevation,
'init_level' : init_level,
'min_level' : min_level,
'max_level' : max_level,
'diameter' : diameter,
'min_vol' : min_vol
}
return add_tank(network, ChangeSet(ps))
@app.post("/deletetank/")
async def fastapi_delete_tank(network: str, tank: str) -> ChangeSet:
ps = { 'id' : tank }
return delete_tank(network, ChangeSet(ps))
@app.get("/gettankelevation/")
async def fastapi_get_tank_elevation(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['elevation']
@app.get("/gettankinitlevel/")
async def fastapi_get_tank_init_level(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['init_level']
@app.get("/gettankminlevel/")
async def fastapi_get_tank_min_level(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['min_level']
@app.get("/gettankmaxlevel/")
async def fastapi_get_tank_max_level(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['max_level']
@app.get("/gettankdiameter/")
async def fastapi_get_tank_diameter(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['diameter']
@app.get("/gettankminvol/")
async def fastapi_get_tank_min_vol(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['min_vol']
@app.get("/gettankvolcurve/")
async def fastapi_get_tank_vol_curve(network: str, tank: str) -> str | None:
ps = get_tank(network, tank)
return ps['vol_curve']
@app.get("/gettankoverflow/")
async def fastapi_get_tank_overflow(network: str, tank: str) -> str | None:
ps = get_tank(network, tank)
return ps['overflow']
@app.get("/gettankx/")
async def fastapi_get_tank_x(network: str, tank: str) -> float:
ps = get_tank(network, tank)
return ps['x']
@app.get("/gettanky/")
async def fastapi_get_tank_x(network: str, tank: str) -> float:
ps = get_tank(network, tank)
return ps['y']
@app.get("/gettankcoord/")
async def fastapi_get_tank_coord(network: str, tank: str) -> dict[str, float]:
ps = get_tank(network, tank)
coord = { 'x' : ps['x'],
'y' : ps['y'] }
return coord
@app.post("/settankelevation/")
async def fastapi_set_tank_elevation(network: str, tank: str, elevation: float) -> ChangeSet:
ps = { 'id' : tank,
'elevation' : elevation }
return set_tank(network, ChangeSet(ps))
@app.post("/settankinitlevel/")
async def fastapi_set_tank_init_level(network: str, tank: str, init_level: float) -> ChangeSet:
ps = { 'id' : tank,
'init_level' : init_level }
return set_tank(network, ChangeSet(ps))
@app.post("/settankminlevel/")
async def fastapi_set_tank_min_level(network: str, tank: str, min_level: float) -> ChangeSet:
ps = { 'id' : tank,
'min_level' : min_level }
return set_tank(network, ChangeSet(ps))
@app.post("/settankmaxlevel/")
async def fastapi_set_tank_max_level(network: str, tank: str, max_level: float) -> ChangeSet:
ps = { 'id' : tank,
'max_level' : max_level }
return set_tank(network, ChangeSet(ps))
@app.post("settankdiameter//")
async def fastapi_set_tank_diameter(network: str, tank: str, diameter: float) -> ChangeSet:
ps = { 'id' : tank,
'diameter' : diameter }
return set_tank(network, ChangeSet(ps))
@app.post("/settankminvol/")
async def fastapi_set_tank_min_vol(network: str, tank: str, min_vol: float) -> ChangeSet:
ps = { 'id' : tank,
'min_vol' : min_vol }
return set_tank(network, ChangeSet(ps))
@app.post("/settankvolcurve/")
async def fastapi_set_tank_vol_curve(network: str, tank: str, vol_curve: str) -> ChangeSet:
ps = { 'id' : tank,
'vol_curve' : vol_curve}
return set_tank(network, ChangeSet(ps))
@app.post("/settankoverflow/")
async def fastapi_set_tank_overflow(network: str, tank: str, overflow: str) -> ChangeSet:
ps = { 'id' : tank,
'overflow' : overflow }
return set_tank(network, ChangeSet(ps))
@app.post("/settankx/")
async def fastapi_set_tank_x(network: str, tank: str, x: float) -> ChangeSet:
ps = { 'id' : tank,
'x' : x }
return set_tank(network, ChangeSet(ps))
@app.post("/settanky/")
async def fastapi_set_tank_y(network: str, tank: str, y: float) -> ChangeSet:
ps = { 'id' : tank,
'y' : y }
return set_tank(network, ChangeSet(ps))
@app.post("/settankcoord/")
async def fastapi_set_tank_coord(network: str, tank: str, x: float, y: float) -> ChangeSet:
ps = { 'id' : tank,
'x' : x,
'y' : y }
return set_tank(network, ChangeSet(ps))
@app.get("/gettankproperties/")
async def fastapi_get_tank_properties(network: str, tank: str) -> dict[str, Any]:
return get_tank(network, tank)
@app.post("/settankproperties/")
async def fastapi_set_tank_properties(network: str, tank: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : tank } | props
return set_tank(network, ChangeSet(ps))
############################################################
# pipe 4.[PIPES]
############################################################
@app.get('/getpipeschema')
async def fastapi_get_pipe_schema(network: str) -> dict[str, dict[str, Any]]:
return get_pipe_schema(network)
@app.post("/addpipe/")
async def fastapi_add_pipe(network: str, pipe: str, node1: str, node2: str, length: float = 0,
diameter: float = 0, roughness: float = 0, minor_loss: float = 0, status: str = PIPE_STATUS_OPEN) -> ChangeSet:
ps = { 'id' : pipe,
'node1' : node1,
'node2' : node2,
'length' : length,
'diameter' : diameter,
'roughness' : roughness,
'minor_loss' : minor_loss,
'status' : status }
return add_pipe(network, ChangeSet(ps))
@app.post("/deletepipe/")
async def fastapi_delete_pipe(network: str, pipe: str) -> ChangeSet:
ps = {'id' : pipe}
return delete_pipe(network, ChangeSet(ps))
@app.get("/getpipenode1/")
async def fastapi_get_pipe_node1(network: str, pipe: str) -> str | None:
ps = get_pipe(network, pipe)
return ps['node1']
@app.get("/getpipenode2/")
async def fastapi_get_pipe_node2(network: str, pipe: str) -> str | None:
ps = get_pipe(network, pipe)
return ps['node2']
@app.get("/getpipelength/")
async def fastapi_get_pipe_length(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['length']
@app.get("/getpipediameter/")
async def fastapi_get_pipe_diameter(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['diameter']
@app.get("/getpiperoughness/")
async def fastapi_get_pipe_roughness(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['roughness']
@app.get("/getpipeminorloss/")
async def fastapi_get_pipe_minor_loss(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['minor_loss']
@app.get("/getpipestatus/")
async def fastapi_get_pipe_status(network: str, pipe: str) -> str | None:
ps = get_pipe(network, pipe)
return ps['status']
@app.post("/setpipenode1/")
async def fastapi_set_pipe_node1(network: str, pipe: str, node1: str) -> ChangeSet:
ps = { 'id' : pipe,
'node1' : node1 }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipenode2/")
async def fastapi_set_pipe_node2(network: str, pipe: str, node2: str) -> ChangeSet:
ps = { 'id' : pipe,
'node2' : node2 }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipelength/")
async def fastapi_set_pipe_length(network: str, pipe: str, length: float) -> ChangeSet:
ps = { 'id' : pipe,
'length' : length }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipediameter/")
async def fastapi_set_pipe_diameter(network: str, pipe: str, diameter: float) -> ChangeSet:
ps = { 'id' : pipe,
'diameter' : diameter }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpiperoughness/")
async def fastapi_set_pipe_roughness(network: str, pipe: str, roughness: float) -> ChangeSet:
ps = { 'id' : pipe,
'roughness' : roughness }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipeminorloss/")
async def fastapi_set_pipe_minor_loss(network: str, pipe: str, minor_loss: float) -> ChangeSet:
ps = { 'id' : pipe,
'minor_loss' : minor_loss }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipestatus/")
async def fastapi_set_pipe_status(network: str, pipe: str, status: float) -> ChangeSet:
ps = { 'id' : pipe,
'status' : status }
return set_pipe(network, ChangeSet(ps))
@app.get("/getpipeproperties/")
async def fastapi_get_pipe_properties(network: str, pipe: str) -> dict[str, Any]:
return get_pipe(network, pipe)
@app.post("/setpipeproperties/")
async def fastapi_set_pipe_properties(network: str, pipe: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : pipe } | props
return set_pipe(network, ChangeSet(ps))
############################################################
# pump 4.[PUMPS]
############################################################
@app.get('/getpumpschema')
async def fastapi_get_pump_schema(network: str) -> dict[str, dict[str, Any]]:
return get_pump_schema(network)
@app.post("/addpump/")
async def fastapi_add_pump(network: str, pump: str, node1: str, node2: str, power: float = 0.0) -> ChangeSet:
ps = { 'id' : pump,
'node1' : node1,
'node2' : node2,
'power' : power
}
return add_pump(network, ChangeSet(ps))
@app.post("/deletepump/")
async def fastapi_delete_pump(network: str, pump: str) -> ChangeSet:
ps = { 'id' : pump }
return delete_pump(network, ChangeSet(ps))
@app.get("/getpumpnode1/")
async def fastapi_get_pump_node1(network: str, pump: str) -> str | None:
ps = get_pump(network, pump)
return ps['node1']
@app.get("/getpumpnode2/")
async def fastapi_get_pump_node2(network: str, pump: str) -> str | None:
ps = get_pump(network, pump)
return ps['node2']
@app.post("/setpumpnode1/")
async def fastapi_set_pump_node1(network: str, pump: str, node1: str) -> ChangeSet:
ps = { 'id' : pump,
'node1' : node1 }
return set_pump(network, ChangeSet(ps))
@app.post("/setpumpnode2/")
async def fastapi_set_pump_node2(network: str, pump: str, node2: str) -> ChangeSet:
ps = { 'id' : pump,
'node2' : node2 }
return set_pump(network, ChangeSet(ps))
@app.get("/getpumpproperties/")
async def fastapi_get_pump_properties(network: str, pump: str) -> dict[str, Any]:
return get_pump(network, pump)
@app.post("/setpumpproperties/")
async def fastapi_set_pump_properties(network: str, pump: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : pump } | props
return set_pump(network, ChangeSet(ps))
############################################################
# valve 4.[VALVES]
############################################################
@app.get('/getvalveschema')
async def fastapi_get_valve_schema(network: str) -> dict[str, dict[str, Any]]:
return get_valve_schema(network)
@app.post("/addvalve/")
async def fastapi_add_valve(network: str, valve: str, node1: str, node2: str, diameter: float = 0, v_type: str = VALVES_TYPE_PRV, setting: float = 0, minor_loss: float = 0) -> ChangeSet:
ps = { 'id' : valve,
'node1' : node1,
'node2' : node2,
'diameter' : diameter,
'v_type' : v_type,
'setting' : setting,
'minor_loss' : minor_loss }
return add_valve(network, ChangeSet(ps))
@app.post("/deletevalve/")
async def fastapi_delete_valve(network: str, valve: str) -> ChangeSet:
ps = { 'id' : valve }
return delete_valve(network, ChangeSet(ps))
@app.get("/getvalvenode1/")
async def fastapi_get_valve_node1(network: str, valve: str) -> str | None:
ps = get_valve(network, valve)
return ps['node1']
@app.get("/getvalvenode2/")
async def fastapi_get_valve_node2(network: str, valve: str) -> str | None:
ps = get_valve(network, valve)
return ps['node2']
@app.get("/getvalvediameter/")
async def fastapi_get_valve_diameter(network: str, valve: str) -> float | None:
ps = get_valve(network, valve)
return ps['diameter']
@app.get("/getvalvetype/")
async def fastapi_get_valve_type(network: str, valve: str) -> str | None:
ps = get_valve(network, valve)
return ps['type']
@app.get("/getvalvesetting/")
async def fastapi_get_valve_setting(network: str, valve: str) -> float | None:
ps = get_valve(network, valve)
return ps['setting']
@app.get("/getvalveminorloss/")
async def fastapi_get_valve_minor_loss(network: str, valve: str) -> float | None:
ps = get_valve(network, valve)
return ps['minor_loss']
@app.post("/setvalvenode1/")
async def fastapi_set_valve_node1(network: str, valve: str, node1: str) -> ChangeSet:
ps = { 'id' : valve,
'node1' : node1 }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvenode2/")
async def fastapi_set_valve_node2(network: str, valve: str, node2: str) -> ChangeSet:
ps = { 'id' : valve,
'node2' : node2 }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvenodediameter/")
async def fastapi_set_valve_diameter(network: str, valve: str, diameter: float) -> ChangeSet:
ps = { 'id' : valve,
'diameter' : diameter }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvetype/")
async def fastapi_set_valve_type(network: str, valve: str, type: str) -> ChangeSet:
ps = { 'id' : valve,
'type' : type }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvesetting/")
async def fastapi_set_valve_setting(network: str, valve: str, setting: float) -> ChangeSet:
ps = { 'id' : valve,
'setting' : setting }
return set_valve(network, ChangeSet(ps))
@app.get("/getvalveproperties/")
async def fastapi_get_valve_properties(network: str, valve: str) -> dict[str, Any]:
return get_valve(network, valve)
@app.post("/setvalveproperties/")
async def fastapi_set_valve_properties(network: str, valve: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : valve } | props
return set_valve(network, ChangeSet(ps))
# node & link
@app.post("/deletenode/")
async def fastapi_delete_node(network: str, node: str) -> ChangeSet:
ps = {'id' : node}
if is_junction(network, node):
return delete_junction(network, ChangeSet(ps))
elif is_reservoir(network, node):
return delete_reservoir(network, ChangeSet(ps))
elif is_tank(network, node):
return delete_tank(network, ChangeSet(ps))
@app.post("/deletelink/")
async def fastapi_delete_link(network: str, link: str) -> ChangeSet:
ps = {'id' : link}
if is_pipe(network, link):
return delete_pipe(network, ChangeSet(ps))
elif is_pump(network, link):
return delete_pump(network, ChangeSet(ps))
elif is_valve(network, link):
return delete_valve(network, ChangeSet(ps))
############################################################
# tag 8.[TAGS]
############################################################
#
# TAG_TYPE_NODE = api.TAG_TYPE_NODE
# TAG_TYPE_LINK = api.TAG_TYPE_LINK
#
@app.get('/gettagschema/')
async def fastapi_get_tag_schema(network: str) -> dict[str, dict[str, Any]]:
return get_tag_schema(network)
@app.get('/gettag/')
async def fastapi_get_tag(network: str, t_type: str, id: str) -> dict[str, Any]:
return get_tag(network, t_type, id)
# example:
# set_tag(p, ChangeSet({'t_type': TAG_TYPE_NODE, 'id': 'j1', 'tag': 'j1t' }))
# set_tag(p, ChangeSet({'t_type': TAG_TYPE_LINK, 'id': 'p0', 'tag': 'p0t' }))
@app.post('/settag/')
async def fastapi_set_tag(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_tag(network, ChangeSet(props))
############################################################
# demand 9.[DEMANDS]
############################################################
@app.get('/getdemandschema')
async def fastapi_get_demand_schema(network: str) -> dict[str, dict[str, Any]]:
return get_demand_schema(network)
@app.get("/getdemandproperties/")
async def fastapi_get_demand_properties(network: str, junction: str) -> dict[str, Any]:
return get_demand(network, junction)
# example: set_demand(p, ChangeSet({'junction': 'j1', 'demands': [{'demand': 10.0, 'pattern': None, 'category': 'x'}, {'demand': 20.0, 'pattern': None, 'category': None}]}))
@app.post("/setdemandproperties/")
async def fastapi_set_demand_properties(network: str, junction: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'junction' : junction } | props
return set_demand(network, ChangeSet(ps))
############################################################
# status 10.[STATUS] init_status
############################################################
@app.get('/getstatusschema')
async def fastapi_get_status_schema(network: str) -> dict[str, dict[str, Any]]:
return get_status_schema(network)
@app.get("/getstatus/")
async def fastapi_get_status(network: str, link: str) -> dict[str, Any]:
return get_status(network, link)
# example: set_status(p, ChangeSet({'link': 'p0', 'status': LINK_STATUS_OPEN, 'setting': 10.0}))
@app.post("/setstatus/")
async def fastapi_set_status_properties(network: str, link: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'link' : link } | props
return set_status(network, ChangeSet(ps))
############################################################
# pattern 11.[PATTERNS]
############################################################
@app.get('/getpatternschema')
async def fastapi_get_pattern_schema(network: str) -> dict[str, dict[str, Any]]:
return get_pattern_schema(network)
@app.post("/addpattern/")
async def fastapi_add_pattern(network: str, pattern: str, req: Request) -> ChangeSet:
props = await req.json()
ps = {
'id' : pattern,
} | props
return add_pattern(network, ChangeSet(ps))
@app.post("/deletepattern/")
async def fastapi_delete_pattern(network: str, pattern: str) -> ChangeSet:
ps = { 'id' : pattern }
return delete_pattern(network, ChangeSet(ps))
@app.get("/getpatternproperties/")
async def fastapi_get_pattern_properties(network: str, pattern: str) -> dict[str, Any]:
return get_pattern(network, pattern)
# example: set_pattern(p, ChangeSet({'id' : 'p0', 'factors': [1.0, 2.0, 3.0]}))
@app.post("/setpatternproperties/")
async def fastapi_set_pattern_properties(network: str, pattern: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : pattern } | props
return set_pattern(network, ChangeSet(ps))
############################################################
# curve 12.[CURVES]
############################################################
@app.get('/getcurveschema')
async def fastapi_get_curve_schema(network: str) -> dict[str, dict[str, Any]]:
return get_curve_schema(network)
@app.post("/addcurve/")
async def fastapi_add_curve(network: str, curve: str, c_type: str, req: Request) -> ChangeSet:
props = await req.json()
ps = {
'id' : curve,
'c_type': c_type
} | props
return add_curve(network, ChangeSet(ps))
@app.post("/deletecurve/")
async def fastapi_delete_curve(network: str, curve: str) -> ChangeSet:
ps = { 'id' : curve }
return delete_curve(network, ChangeSet(ps))
@app.get("/getcurveproperties/")
async def fastapi_get_curve_properties(network: str, curve: str) -> dict[str, Any]:
return get_curve(network, curve)
# example: set_curve(p, ChangeSet({'id' : 'c0', 'c_type' : CURVE_TYPE_PUMP, 'coords': [{'x': 1.0, 'y': 2.0}, {'x': 2.0, 'y': 1.0}]}))
@app.post("/setcurveproperties/")
async def fastapi_set_curve_properties(network: str, curve: str, req: Request) -> ChangeSet:
props = await req.json()
# c_type放到request中
ps = { 'id' : curve } | props
return set_curve(network, ChangeSet(ps))
############################################################
# control 13.[CONTROLS]
############################################################
@app.get('/getcontrolschema/')
async def fastapi_get_control_schema(network: str) -> dict[str, dict[str, Any]]:
return get_control_schema(network)
@app.get("/getcontrolproperties/")
async def fastapi_get_control_properties(network: str) -> dict[str, Any]:
return get_control(network)
# example: set_control(p, ChangeSet({'control': 'x'}))
@app.post("/setcontrolproperties/")
async def fastapi_set_control_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_control(network, ChangeSet(props))
############################################################
# rule 14.[RULES]
############################################################
@app.get("/getruleschema/")
async def fastapi_get_rule_schema(network: str) -> dict[str, dict[str, Any]]:
return get_rule_schema(network)
@app.get("/getruleproperties/")
async def fastapi_get_rule_properties(network: str) -> dict[str, Any]:
return get_rule(network)
# example: set_rule(p, ChangeSet({'rule': 'x'}))
@app.post("/setruleproperties/")
async def fastapi_set_rule_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_rule(network, ChangeSet(props))
############################################################
# energy 15.[ENERGY]
############################################################
@app.get("/getenergyschema/")
async def fastapi_get_energy_schema(network: str) -> dict[str, dict[str, Any]]:
return get_energy_schema(network)
@app.get("/getenergyproperties/")
async def fastapi_get_energy_properties(network: str) -> dict[str, Any]:
return get_energy(network)
@app.post("/setenergyproperties/")
async def fastapi_set_energy_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_energy(network, ChangeSet(props))
@app.get("/getpumpenergyschema/")
async def fastapi_get_pump_energy_schema(network: str) -> dict[str, dict[str, Any]]:
return get_pump_energy_schema(network)
@app.get("/getpumpenergyproperties//")
async def fastapi_get_pump_energy_proeprties(network: str, pump: str) -> dict[str, Any]:
return get_pump_energy(network, pump)
@app.get("/setpumpenergyproperties//")
async def fastapi_set_pump_energy_properties(network: str, pump: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : pump } | props
return set_pump_energy(network, ChangeSet(ps))
############################################################
# emitter 16.[EMITTERS]
############################################################
@app.get('/getemitterschema')
async def fastapi_get_emitter_schema(network: str) -> dict[str, dict[str, Any]]:
return get_emitter_schema(network)
@app.get("/getemitterproperties/")
async def fastapi_get_emitter_properties(network: str, junction: str) -> dict[str, Any]:
return get_emitter(network, junction)
# example: set_emitter(p, ChangeSet({'junction': 'j1', 'coefficient': 10.0}))
@app.post("/setemitterproperties/")
async def fastapi_set_emitter_properties(network: str, junction: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'junction' : junction } | props
return set_emitter(network, ChangeSet(ps))
############################################################
# quality 17.[QUALITY]
############################################################
@app.get('/getqualityschema/')
async def fastapi_get_quality_schema(network: str) -> dict[str, dict[str, Any]]:
return get_quality_schema(network)
@app.get('/getqualityproperties/')
async def fastapi_get_quality_properties(network: str, node: str) -> dict[str, Any]:
return get_quality(network, node)
# example: set_quality(p, ChangeSet({'node': 'j1', 'quality': 10.0}))
@app.post("/setqualityproperties/")
async def fastapi_set_quality_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_quality(network, ChangeSet(props))
############################################################
# source 18.[SOURCES]
############################################################
@app.get('/getsourcechema/')
async def fastapi_get_source_schema(network: str) -> dict[str, dict[str, Any]]:
return get_source_schema(network)
@app.get('/getsource/')
async def fastapi_get_source(network: str, node: str) -> dict[str, Any]:
return get_source(network, node)
@app.post('/setsource/')
async def fastapi_set_source(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_source(network, ChangeSet(props))
# example: add_source(p, ChangeSet({'node': 'j0', 's_type': SOURCE_TYPE_CONCEN, 'strength': 10.0, 'pattern': 'p0'}))
@app.post('/addsource/')
async def fastapi_add_source(network: str, req: Request) -> ChangeSet:
props = await req.json()
return add_source(network, ChangeSet(props))
@app.post('/deletesource/')
async def fastapi_delete_source(network: str, node: str) -> ChangeSet:
props = { 'node': node }
return delete_source(network, ChangeSet(props))
############################################################
# reaction 19.[REACTIONS]
############################################################
@app.get('/getreactionschema/')
async def fastapi_get_reaction_schema(network: str) -> dict[str, dict[str, Any]]:
return get_reaction_schema(network)
@app.get('/getreaction/')
async def fastapi_get_reaction(network: str) -> dict[str, Any]:
return get_reaction(network)
@app.post('/setreaction/')
# set_reaction(p, ChangeSet({ 'ORDER BULK' : '10' }))
async def fastapi_set_reaction(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_reaction(network, ChangeSet(props))
@app.get('/getpipereactionschema/')
async def fastapi_get_pipe_reaction_schema(network: str) -> dict[str, dict[str, Any]]:
return get_pipe_reaction_schema(network)
@app.get('/getpipereaction/')
async def fastapi_get_pipe_reaction(network: str, pipe: str) -> dict[str, Any]:
return get_pipe_reaction(network, pipe)
@app.post('/setpipereaction/')
async def fastapi_set_pipe_reaction(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_pipe_reaction(network, ChangeSet(props))
@app.get('/gettankreactionschema/')
async def fastapi_get_tank_reaction_schema(network: str) -> dict[str, dict[str, Any]]:
return get_tank_reaction_schema(network)
@app.get('/gettankreaction/')
async def fastapi_get_tank_reaction(network: str, tank: str) -> dict[str, Any]:
return get_tank_reaction(network, tank)
@app.post('/settankreaction/')
async def fastapi_set_tank_reaction(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_tank_reaction(network, ChangeSet(props))
############################################################
# mixing 20.[MIXING]
############################################################
@app.get('/getmixingschema/')
async def fastapi_get_mixing_schema(network: str) -> dict[str, dict[str, Any]]:
return get_mixing_schema(network)
@app.get('/getmixing/')
async def fastapi_get_mixing(network: str, tank: str) -> dict[str, Any]:
return get_mixing(network, tank)
@app.post('/setmixing/')
async def fastapi_set_mixing(network: str, req: Request) -> ChangeSet:
props = await req.json()
return api.set_mixing(network, ChangeSet(props))
# example: add_mixing(p, ChangeSet({'tank': 't0', 'model': MIXING_MODEL_MIXED, 'value': 10.0}))
@app.post('/addmixing/')
async def fastapi_add_mixing(network: str, req: Request) -> ChangeSet:
props = await req.json()
return add_mixing(network, ChangeSet(props))
@app.post('/deletemixing/')
async def fastapi_delete_mixing(network: str, req: Request) -> ChangeSet:
props = await req.json()
return delete_mixing(network, ChangeSet(props))
############################################################
# time 21.[TIME]
############################################################
@app.get('/gettimeschema')
async def fastapi_get_time_schema(network: str) -> dict[str, dict[str, Any]]:
return get_time_schema(network)
@app.get("/gettimeproperties/")
async def fastapi_get_time_properties(network: str) -> dict[str, Any]:
return get_time(network)
@app.post("/settimeproperties/")
async def fastapi_set_time_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_time(network, ChangeSet(props))
############################################################
# option 23.[OPTIONS]
############################################################
@app.get('/getoptionschema/')
async def fastapi_get_option_schema(network: str) -> dict[str, dict[str, Any]]:
return get_option_schema(network)
@app.get("/getoptionproperties/")
async def fastapi_get_option_properties(network: str) -> dict[str, Any]:
return get_option(network)
@app.post("/setoptionproperties/")
async def fastapi_set_option_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_option(network, ChangeSet(props))
############################################################
# coord 24.[COORDINATES]
############################################################
@app.get("/getnodecoord/")
async def fastapi_get_node_coord(network: str, node: str) -> dict[str, float] | None:
return get_node_coord(network, node)
############################################################
# vertex 25.[VERTICES]
############################################################
@app.get('/getvertexschema/')
async def fastapi_get_vertex_schema(network: str) -> dict[str, dict[str, Any]]:
return get_vertex_schema(network)
@app.get('/getvertexproperties/')
async def fastapi_get_vertex_properties(network: str, link: str) -> dict[str, Any]:
return get_vertex(network, link)
# set_vertex(p, ChangeSet({'link' : 'p0', 'coords': [{'x': 1.0, 'y': 2.0}, {'x': 2.0, 'y': 1.0}]}))
@app.post('/setvertexproperties/')
async def fastapi_set_vertex_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_vertex(network, ChangeSet(props))
@app.post('/addvertex/')
async def fastapi_add_vertex(network: str, req: Request) -> ChangeSet:
props = await req.json()
return add_vertex(network, ChangeSet(props))
@app.post('/deletevertex/')
async def fastapi_delete_vertex(network: str, req: Request) -> ChangeSet:
props = await req.json()
return api.delete_vertex(network, ChangeSet(props))
############################################################
# label 26.[LABELS]
############################################################
@app.get('/getlabelschema/')
async def fastapi_get_label_schema(network: str) -> dict[str, dict[str, Any]]:
return get_label_schema(network)
@app.get('/getlabelproperties/')
async def fastapi_get_label_properties(network: str, x: float, y: float) -> dict[str, Any]:
return get_label(network, x, y)
@app.post('/setlabelproperties/')
async def fastapi_set_label_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_label(network, ChangeSet(props))
@app.post('/addlabel/')
async def fastapi_add_label(network: str, req: Request) -> ChangeSet:
props = await req.json()
return add_label(network, ChangeSet(props))
@app.post('/deletelabel/')
async def fastapi_delete_label(network: str, req: Request) -> ChangeSet:
props = await req.json()
return delete_label(network, ChangeSet(props))
############################################################
# backdrop 27.[BACKDROP]
############################################################
@app.get('/getbackdropschema/')
async def fastapi_get_backdrop_schema(network: str) -> dict[str, dict[str, Any]]:
return get_backdrop_schema(network)
@app.get('/getbackdropproperties/')
async def fastapi_get_backdrop_properties(network: str) -> dict[str, Any]:
return get_backdrop(network)
@app.post('/setbackdropproperties/')
async def fastapi_set_backdrop_properties(network: str, req: Request) -> ChangeSet:
props = await req.json()
return set_backdrop(network, ChangeSet(props))
# inp file
@app.post("/uploadinp/", status_code=status.HTTP_200_OK)
async def upload_inp(file: bytes = File(), name: str = None):
filePath = inpDir + str(name)
f = open(filePath, 'wb')
f.write(file)
f.close()
return True
@app.get("/downloadinp/", status_code=status.HTTP_200_OK)
async def download_inp(name: str, response: Response):
filePath = inpDir + name
if os.path.exists(filePath):
return FileResponse(filePath, media_type='application/octet-stream', filename="inp.inp")
else:
response.status_code = status.HTTP_400_BAD_REQUEST
return True
@app.get("/getjson/")
async def get_json():
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content={
'code': 400,
'message': "this is message",
'data': 123,
}
)