Files
TJWaterServer/main.py
2022-10-21 19:59:59 +08:00

817 lines
27 KiB
Python

from cmath import tan
from mimetypes import init
import os
import io
import json
from platform import node
from typing import *
from urllib.request import Request
from xml.dom import minicompat
from fastapi import FastAPI, File, UploadFile
from pydantic import BaseModel
from starlette.responses import FileResponse, JSONResponse
from fastapi import FastAPI, Response, status, Request, Body
from tjnetwork import *
JUNCTION = 0
RESERVOIR = 1
TANK = 2
PIPE = 1
NODE_COUNT = 0
LINK_COUNT = 2
prjs = []
inpDir = "C:/inpfiles/"
tmpDir = "C:/tmpfiles/"
lockedPrjs = {}
if not os.path.exists(inpDir):
os.mkdir(inpDir)
if not os.path.exists(tmpDir):
os.mkdir(tmpDir)
app = FastAPI()
# project operations
@app.get("/haveproject/")
async def fastapi_have_project(network: str):
return have_project(network)
@app.post("/createproject/")
async def fastapi_create_project(network: str):
create_project(network)
return network
@app.get("/isprojectopen/")
async def fastapi_is_project_open(network: str):
return is_project_open(network)
@app.post("/openproject/")
async def fastapi_open_project(network: str):
open_project(network)
return network
@app.post("/closeproject/")
async def fastapi_close_project(network: str):
close_project(network)
return True
@app.post("/deleteproject/")
async def fastapi_delete_project(network: str):
delete_project(network)
return True
@app.post("/copyproject/")
async def fastapi_copy_project(source: str, target: str):
copy_project(source, target)
return True
@app.get("/isprojectlocked/")
async def fastapi_is_locked(network: str):
return lockedPrjs.get(network) != None
@app.post("/lockproject/")
async def fastapi_lock_project(network: str, id: str):
lockedPrjs[network] = id
return True
@app.post("/unlockproject/")
async def fastapi_unlock_project(network: str, id: str):
if lockedPrjs[network] == id:
del lockedPrjs[network]
return True
# undo/redo
@app.post("/undo/")
async def fastapi_undo(network: str):
return execute_undo(network)
@app.post("/redo/")
async def fastapi_redo(network: str):
return execute_redo(network)
@app.get("/getcurrentoperationid/")
async def fastapi_get_current_operaiton_id(network: str) -> int:
return get_current_operation(network)
@app.get("/syncwithserver/")
async def fastapi_sync_with_server(network: str, operationid: int) -> ChangeSet:
return sync_with_server(network, operationid)
@app.post("/batch/")
async def fastapi_execute_batch_commands(network: str, req: Request)-> ChangeSet:
jo_root = await req.json()
cs: ChangeSet = ChangeSet()
cs.operations = jo_root['operations']
print(cs.operations)
print(cs)
return execute_batch_commands(network, cs)
# node
@app.get("/getnodes/")
async def fastapi_get_nodes(network: str) -> list[str]:
return get_nodes(network)
@app.get("/isnode/")
async def fastapi_is_node(network: str, node: str) -> bool:
return is_node(network, node)
@app.get("/isjunction/")
async def fastapi_is_junction(network: str, node: str) -> bool:
return is_junction(network, node)
@app.get("/isreservoir/")
async def fastapi_is_reservoir(network: str, node: str) -> bool:
return is_reservoir(network, node)
@app.get("/istank/")
async def fastapi_is_tank(network: str, node: str) -> bool:
return is_tank(network, node)
# link
@app.get("/getlinks/")
async def fastapi_get_links(network: str) -> list[str]:
return get_links(network)
@app.get("/islink/")
async def fastapi_is_link(network: str, link: str) -> bool:
return is_link(network, link)
@app.get("/ispipe/")
async def fastapi_is_pipe(network: str, link: str) -> bool:
return is_pipe(network, link)
@app.get("/ispump/")
async def fastapi_is_pump(network: str, link: str) -> bool:
return is_pump(network, link)
@app.get("/isvalve/")
async def is_valve(network: str, link: str) -> bool:
return is_valve(network, link)
# curve
@app.get("/getcurves/")
async def fastapi_get_curves(network: str) -> list[str]:
return get_curves(network)
@app.get("/iscurve/")
async def fastapi_is_curve(network: str, curve: str) -> bool:
return is_curve(network, curve)
# parttern
@app.get("/getpatterns/")
async def fastapi_get_patterns(network: str) -> list[str]:
return get_patterns(network)
@app.get("/ispattern/")
async def fastapi_is_curve(network: str, pattern: str) -> bool:
return is_pattern(network, pattern)
############################################################
# junction 2.[JUNCTIONS]
############################################################
@app.get('/getjunctionschema')
async def fast_get_junction_schema(network: str) -> dict[str, dict[str, Any]]:
return get_junction_schema(network)
@app.post("/addjunction/")
async def fastapi_add_junction(network: str, junction: str, x: float, y: float, z: float) -> ChangeSet:
ps = { 'id' : junction,
'x' : x,
'y' : y,
'elevation' : z }
return add_junction(network, ChangeSet(ps))
@app.post("/deletejunction/")
async def fastapi_delete_junction(network: str, junction: str) -> ChangeSet:
ps = {'id' : junction}
return delete_junction(network, ChangeSet(ps))
@app.get("/getjunctionelevation/")
async def fastapi_get_junction_elevation(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['elevation']
@app.get("/getjunctionx/")
async def fastapi_get_junction_x(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['x']
@app.get("/getjunctiony/")
async def fastapi_get_junction_x(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['y']
@app.get("/getjunctioncoord/")
async def fastapi_get_junction_coord(network: str, junction: str) -> dict[str, float]:
ps = get_junction(network, junction)
coord = { 'x' : ps['x'],
'y' : ps['y'] }
return coord
@app.get("/getjunctiondemand/")
async def fastapi_get_junction_demand(network: str, junction: str) -> float:
ps = get_junction(network, junction)
return ps['demand']
@app.get("/getjunctionpattern/")
async def fastapi_get_junction_pattern(network: str, junction: str) -> str:
ps = get_junction(network, junction)
return ps['pattern']
@app.post("/setjunctionelevation/")
async def fastapi_set_junction_elevation(network: str, junction: str, elevation: float) -> ChangeSet:
ps = { 'id' : junction,
'elevation' : elevation }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctionx/")
async def fastapi_set_junction_x(network: str, junction: str, x: float) -> ChangeSet:
ps = { 'id' : junction,
'x' : x }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctiony/")
async def fastapi_set_junction_y(network: str, junction: str, y: float) -> ChangeSet:
ps = { 'id' : junction,
'y' : y }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctioncoord/")
async def fastapi_set_junction_coord(network: str, junction: str, x: float, y: float) -> ChangeSet:
ps = { 'id' : junction,
'x' : x,
'y' : y }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctiondemand/")
async def fastapi_set_junction_demand(network: str, junction: str, demand: float) -> ChangeSet:
ps = { 'id' : junction,
'demand' : demand }
return set_junction(network, ChangeSet(ps))
@app.post("/setjunctionpattern/")
async def fastapi_set_junction_pattern(network: str, junction: str, pattern: str) -> ChangeSet:
ps = { 'id' : junction,
'pattern' : pattern }
return set_junction(network, ChangeSet(ps))
@app.get("/getjunctionproperties/")
async def fastapi_get_junction_properties(network: str, junction: str) -> dict[str, Any]:
return get_junction(network, junction)
@app.post("/setjunctionproperties/")
async def fastapi_set_junction_properties(network: str, junction: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : junction } | props
return set_junction(network, ChangeSet(ps))
############################################################
# reservoir 3.[RESERVOIRS]
############################################################
@app.get('/getreservoirschema')
async def fast_get_reservoir_schema(network: str) -> dict[str, dict[str, Any]]:
return get_reservoir_schema(network)
@app.post("/addreservoir/")
async def fastapi_add_reservoir(network: str, reservoir: str, x: float, y: float, head: float) -> ChangeSet:
ps = { 'id' : reservoir,
'x' : x,
'y' : y,
'head' : head }
return add_reservoir(network, ChangeSet(ps))
@app.post("/deletereservoir/")
async def fastapi_delete_reservoir(network: str, reservoir: str) -> ChangeSet:
ps = { 'id' : reservoir }
return delete_reservoir(network, ChangeSet(ps))
@app.get("/getreservoirhead/")
async def fastapi_get_reservoir_head(network: str, reservoir: str) -> float | None:
ps = get_reservoir(network, reservoir)
return ps['head']
@app.get("/getreservoirpattern/")
async def fastapi_get_reservoir_pattern(network: str, reservoir: str) -> str | None:
ps = get_reservoir(network, reservoir)
return ps['pattern']
@app.get("/getreservoirx/")
async def fastapi_get_reservoir_x(network: str, reservoir: str) -> dict[str, float] | None:
ps = get_reservoir(network, reservoir)
return ps['x']
@app.get("/getreservoiry/")
async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None:
ps = get_reservoir(network, reservoir)
return ps['y']
@app.get("/getreservoircoord/")
async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None:
ps = get_reservoir(network, reservoir)
coord = { 'id' : reservoir,
'x' : ps['x'],
'y' : ps['y'] }
return coord
@app.post("/setreservoirhead/")
async def fastapi_set_reservoir_head(network: str, reservoir: str, head: float) -> ChangeSet:
ps = { 'id' : reservoir,
'head' : head }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoirpattern/")
async def fastapi_set_reservoir_pattern(network: str, reservoir: str, pattern: str) -> ChangeSet:
ps = { 'id' : reservoir,
'pattern' : pattern }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoirx/")
async def fastapi_set_reservoir_x(network: str, reservoir: str, x: float) -> ChangeSet:
ps = { 'id' : reservoir,
'x' : x }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoirx/")
async def fastapi_set_reservoir_y(network: str, reservoir: str, y: float) -> ChangeSet:
ps = { 'id' : reservoir,
'y' : y }
return set_reservoir(network, ChangeSet(ps))
@app.post("/setreservoircoord/")
async def fastapi_set_reservoir_y(network: str, reservoir: str, x: float, y: float) -> ChangeSet:
ps = { 'id' : reservoir,
'x' : x,
'y' : y }
return set_reservoir(network, ChangeSet(ps))
@app.get("/getreservoirproperties/")
async def fastapi_get_reservoir_properties(network: str, reservoir: str) -> dict[str, Any]:
return get_reservoir(network, reservoir)
@app.post("/setreservoirproperties/")
async def fastapi_set_reservoir_properties(network: str, reservoir: str
, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : reservoir } | props
return set_reservoir(network, ChangeSet(ps))
############################################################
# tank 4.[TANKS]
############################################################
@app.get('/gettankschema')
async def fast_get_tank_schema(network: str) -> dict[str, dict[str, Any]]:
return get_tank_schema(network)
@app.post("/addtank/")
async def fastapi_add_tank(network: str, tank: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet:
ps = { 'id' : tank,
'x' : x,
'y' : y,
'elevation' : elevation,
'init_level' : init_level,
'min_level' : min_level,
'max_level' : max_level,
'diameter' : diameter,
'min_vol' : min_vol
}
return add_tank(network, ChangeSet(ps))
@app.post("/deletetank/")
async def fastapi_delete_tank(network: str, tank: str) -> ChangeSet:
ps = { 'id' : tank }
return delete_tank(network, ChangeSet(ps))
@app.get("/gettankelevation/")
async def fastapi_get_tank_elevation(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['elevation']
@app.get("/gettankinitlevel/")
async def fastapi_get_tank_init_level(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['init_level']
@app.get("/gettankminlevel/")
async def fastapi_get_tank_min_level(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['min_level']
@app.get("/gettankmaxlevel/")
async def fastapi_get_tank_max_level(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['max_level']
@app.get("/gettankdiameter/")
async def fastapi_get_tank_diameter(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['diameter']
@app.get("/gettankminvol/")
async def fastapi_get_tank_min_vol(network: str, tank: str) -> float | None:
ps = get_tank(network, tank)
return ps['min_vol']
@app.get("/gettankvolcurve/")
async def fastapi_get_tank_vol_curve(network: str, tank: str) -> str | None:
ps = get_tank(network, tank)
return ps['vol_curve']
@app.get("/gettankoverflow/")
async def fastapi_get_tank_overflow(network: str, tank: str) -> str | None:
ps = get_tank(network, tank)
return ps['overflow']
@app.get("/gettankx/")
async def fastapi_get_tank_x(network: str, tank: str) -> float:
ps = get_tank(network, tank)
return ps['x']
@app.get("/gettanky/")
async def fastapi_get_tank_x(network: str, tank: str) -> float:
ps = get_tank(network, tank)
return ps['y']
@app.get("/gettankcoord/")
async def fastapi_get_tank_coord(network: str, tank: str) -> dict[str, float]:
ps = get_tank(network, tank)
coord = { 'x' : ps['x'],
'y' : ps['y'] }
return coord
@app.post("/settankelevation/")
async def fastapi_set_tank_elevation(network: str, tank: str, elevation: float) -> ChangeSet:
ps = { 'id' : tank,
'elevation' : elevation }
return set_tank(network, ChangeSet(ps))
@app.post("/settankinitlevel/")
async def fastapi_set_tank_init_level(network: str, tank: str, init_level: float) -> ChangeSet:
ps = { 'id' : tank,
'init_level' : init_level }
return set_tank(network, ChangeSet(ps))
@app.post("/settankminlevel/")
async def fastapi_set_tank_min_level(network: str, tank: str, min_level: float) -> ChangeSet:
ps = { 'id' : tank,
'min_level' : min_level }
return set_tank(network, ChangeSet(ps))
@app.post("/settankmaxlevel/")
async def fastapi_set_tank_max_level(network: str, tank: str, max_level: float) -> ChangeSet:
ps = { 'id' : tank,
'max_level' : max_level }
return set_tank(network, ChangeSet(ps))
@app.post("settankdiameter//")
async def fastapi_set_tank_diameter(network: str, tank: str, diameter: float) -> ChangeSet:
ps = { 'id' : tank,
'diameter' : diameter }
return set_tank(network, ChangeSet(ps))
@app.post("/settankminvol/")
async def fastapi_set_tank_min_vol(network: str, tank: str, min_vol: float) -> ChangeSet:
ps = { 'id' : tank,
'min_vol' : min_vol }
return set_tank(network, ChangeSet(ps))
@app.post("/settankvolcurve/")
async def fastapi_set_tank_vol_curve(network: str, tank: str, vol_curve: str) -> ChangeSet:
ps = { 'id' : tank,
'vol_curve' : vol_curve}
return set_tank(network, ChangeSet(ps))
@app.post("/settankoverflow/")
async def fastapi_set_tank_overflow(network: str, tank: str, overflow: str) -> ChangeSet:
ps = { 'id' : tank,
'overflow' : overflow }
return set_tank(network, ChangeSet(ps))
@app.post("/settankx/")
async def fastapi_set_tank_x(network: str, tank: str, x: float) -> ChangeSet:
ps = { 'id' : tank,
'x' : x }
return set_tank(network, ChangeSet(ps))
@app.post("/settanky/")
async def fastapi_set_tank_y(network: str, tank: str, y: float) -> ChangeSet:
ps = { 'id' : tank,
'y' : y }
return set_tank(network, ChangeSet(ps))
@app.post("/settankcoord/")
async def fastapi_set_tank_coord(network: str, tank: str, x: float, y: float) -> ChangeSet:
ps = { 'id' : tank,
'x' : x,
'y' : y }
return set_tank(network, ChangeSet(ps))
@app.get("/gettankproperties/")
async def fastapi_get_tank_properties(network: str, tank: str) -> dict[str, Any]:
return get_tank(network, tank)
@app.post("/settankproperties/")
async def fastapi_set_tank_properties(network: str, tank: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : tank } | props
return set_tank(network, ChangeSet(ps))
############################################################
# pipe 4.[PIPES]
############################################################
@app.get('/getpipeschema')
async def fast_get_pipe_schema(network: str) -> dict[str, dict[str, Any]]:
return get_pipe_schema(network)
@app.post("/addpipe/")
async def fastapi_add_pipe(network: str, pipe: str, node1: str, node2: str, length: float = 0,
diameter: float = 0, roughness: float = 0, minor_loss: float = 0, status: str = PIPE_STATUS_OPEN) -> ChangeSet:
ps = { 'id' : pipe,
'node1' : node1,
'node2' : node2,
'length' : length,
'diameter' : diameter,
'roughness' : roughness,
'minor_loss' : minor_loss,
'status' : status }
return add_pipe(network, ChangeSet(ps))
@app.post("/deletepipe/")
async def fastapi_delete_pipe(network: str, pipe: str) -> ChangeSet:
ps = {'id' : pipe}
return delete_pipe(network, ChangeSet(ps))
@app.get("/getpipenode1/")
async def fastapi_get_pipe_node1(network: str, pipe: str) -> str | None:
ps = get_pipe(network, pipe)
return ps['node1']
@app.get("/getpipenode2/")
async def fastapi_get_pipe_node2(network: str, pipe: str) -> str | None:
ps = get_pipe(network, pipe)
return ps['node2']
@app.get("/getpipelength/")
async def fastapi_get_pipe_length(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['length']
@app.get("/getpipediameter/")
async def fastapi_get_pipe_diameter(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['diameter']
@app.get("/getpiperoughness/")
async def fastapi_get_pipe_roughness(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['roughness']
@app.get("/getpipeminorloss/")
async def fastapi_get_pipe_minor_loss(network: str, pipe: str) -> float | None:
ps = get_pipe(network, pipe)
return ps['minor_loss']
@app.get("/getpipestatus/")
async def fastapi_get_pipe_status(network: str, pipe: str) -> str | None:
ps = get_pipe(network, pipe)
return ps['status']
@app.post("/setpipenode1/")
async def fastapi_set_pipe_node1(network: str, pipe: str, node1: str) -> ChangeSet:
ps = { 'id' : pipe,
'node1' : node1 }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipenode2/")
async def fastapi_set_pipe_node2(network: str, pipe: str, node2: str) -> ChangeSet:
ps = { 'id' : pipe,
'node2' : node2 }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipelength/")
async def fastapi_set_pipe_length(network: str, pipe: str, length: float) -> ChangeSet:
ps = { 'id' : pipe,
'length' : length }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipediameter/")
async def fastapi_set_pipe_diameter(network: str, pipe: str, diameter: float) -> ChangeSet:
ps = { 'id' : pipe,
'diameter' : diameter }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpiperoughness/")
async def fastapi_set_pipe_roughness(network: str, pipe: str, roughness: float) -> ChangeSet:
ps = { 'id' : pipe,
'roughness' : roughness }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipeminorloss/")
async def fastapi_set_pipe_minor_loss(network: str, pipe: str, minor_loss: float) -> ChangeSet:
ps = { 'id' : pipe,
'minor_loss' : minor_loss }
return set_pipe(network, ChangeSet(ps))
@app.post("/setpipestatus/")
async def fastapi_set_pipe_status(network: str, pipe: str, status: float) -> ChangeSet:
ps = { 'id' : pipe,
'status' : status }
return set_pipe(network, ChangeSet(ps))
@app.get("/getpipeproperties/")
async def fastapi_get_pipe_properties(network: str, pipe: str) -> dict[str, Any]:
return get_pipe(network, pipe)
@app.post("/setpipeproperties/")
async def fastapi_set_pipe_properties(network: str, pipe: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : pipe } | props
return set_pipe(network, ChangeSet(ps))
############################################################
# pump 4.[PUMPS]
############################################################
@app.get('/getpumpschema')
async def fast_get_pump_schema(network: str) -> dict[str, dict[str, Any]]:
return get_pump_schema(network)
@app.post("/addpump/")
async def fastapi_add_pump(network: str, pump: str, node1: str, node2: str) -> ChangeSet:
ps = { 'id' : pump,
'node1' : node1,
'node2' : node2 }
return add_pump(network, ChangeSet(ps))
@app.post("/deletepump/")
async def fastapi_delete_pump(network: str, pump: str) -> ChangeSet:
ps = { 'id' : pump }
return delete_pump(network, ChangeSet(ps))
@app.get("/getpumpnode1/")
async def fastapi_get_pump_node1(network: str, pump: str) -> str | None:
ps = get_pump(network, pump)
return ps['node1']
@app.get("/getpumpnode2/")
async def fastapi_get_pump_node2(network: str, pump: str) -> str | None:
ps = get_pump(network, pump)
return ps['node2']
@app.post("/setpumpnode1/")
async def fastapi_set_pump_node1(network: str, pump: str, node1: str) -> ChangeSet:
ps = { 'id' : pump,
'node1' : node1 }
return set_pump(network, ChangeSet(ps))
@app.post("/setpumpnode2/")
async def fastapi_set_pump_node2(network: str, pump: str, node2: str) -> ChangeSet:
ps = { 'id' : pump,
'node2' : node2 }
return set_pump(network, ChangeSet(ps))
@app.get("/getpumpproperties/")
async def fastapi_get_pump_properties(network: str, pump: str) -> dict[str, Any]:
return get_pump(network, pump)
@app.post("/setpumpproperties/")
async def fastapi_set_pump_properties(network: str, pump: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : pump } | props
return set_pump(network, ChangeSet(ps))
############################################################
# valve 4.[VALVES]
############################################################
@app.get('/getvalveschema')
async def fast_get_valve_schema(network: str) -> dict[str, dict[str, Any]]:
return get_valve_schema(network)
@app.post("/addvalve/")
async def fastapi_add_valve(network: str, valve: str, node1: str, node2: str, diameter: float = 0, v_type: str = VALVES_TYPE_PRV, setting: float = 0, minor_loss: float = 0) -> ChangeSet:
ps = { 'id' : valve,
'node1' : node1,
'node2' : node2,
'diameter' : diameter,
'v_type' : v_type,
'setting' : setting,
'minor_loss' : minor_loss }
return add_valve(network, ChangeSet(ps))
@app.post("/deletevalve/")
async def fastapi_delete_valve(network: str, valve: str) -> ChangeSet:
ps = { 'id' : valve }
return delete_valve(network, ChangeSet(ps))
@app.get("/getvalvenode1/")
async def fastapi_get_valve_node1(network: str, valve: str) -> str | None:
ps = get_valve(network, valve)
return ps['node1']
@app.get("/getvalvenode2/")
async def fastapi_get_valve_node2(network: str, valve: str) -> str | None:
ps = get_valve(network, valve)
return ps['node2']
@app.get("/getvalvediameter/")
async def fastapi_get_valve_diameter(network: str, valve: str) -> float | None:
ps = get_valve(network, valve)
return ps['diameter']
@app.get("/getvalvetype/")
async def fastapi_get_valve_type(network: str, valve: str) -> str | None:
ps = get_valve(network, valve)
return ps['type']
@app.get("/getvalvesetting/")
async def fastapi_get_valve_setting(network: str, valve: str) -> float | None:
ps = get_valve(network, valve)
return ps['setting']
@app.get("/getvalveminorloss/")
async def fastapi_get_valve_minor_loss(network: str, valve: str) -> float | None:
ps = get_valve(network, valve)
return ps['minor_loss']
@app.post("/setvalvenode1/")
async def fastapi_set_valve_node1(network: str, valve: str, node1: str) -> ChangeSet:
ps = { 'id' : valve,
'node1' : node1 }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvenode2/")
async def fastapi_set_valve_node2(network: str, valve: str, node2: str) -> ChangeSet:
ps = { 'id' : valve,
'node2' : node2 }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvenodediameter/")
async def fastapi_set_valve_diameter(network: str, valve: str, diameter: float) -> ChangeSet:
ps = { 'id' : valve,
'diameter' : diameter }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvetype/")
async def fastapi_set_valve_type(network: str, valve: str, type: str) -> ChangeSet:
ps = { 'id' : valve,
'type' : type }
return set_valve(network, ChangeSet(ps))
@app.post("/setvalvesetting/")
async def fastapi_set_valve_setting(network: str, valve: str, setting: float) -> ChangeSet:
ps = { 'id' : valve,
'setting' : setting }
return set_valve(network, ChangeSet(ps))
@app.get("/getvalveproperties/")
async def fastapi_get_valve_properties(network: str, valve: str) -> dict[str, Any]:
return get_valve(network, valve)
@app.post("/setvalveproperties/")
async def fastapi_set_valve_properties(network: str, valve: str, req: Request) -> ChangeSet:
props = await req.json()
ps = { 'id' : valve } | props
return set_valve(network, ChangeSet(ps))
# inp file
@app.post("/uploadinp/", status_code=status.HTTP_200_OK)
async def upload_inp(file: bytes = File(), name: str = None):
filePath = inpDir + str(name)
f = open(filePath, 'wb')
f.write(file)
f.close()
return True
@app.get("/downloadinp/", status_code=status.HTTP_200_OK)
async def download_inp(name: str, response: Response):
filePath = inpDir + name
if os.path.exists(filePath):
return FileResponse(filePath, media_type='application/octet-stream', filename="inp.inp")
else:
response.status_code = status.HTTP_400_BAD_REQUEST
return True
@app.get("/getjson/")
async def get_json():
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content={
'code': 400,
'message': "this is message",
'data': 123,
}
)