import os import io import json from typing import * from urllib.request import Request from fastapi import FastAPI, File, UploadFile from pydantic import BaseModel from starlette.responses import FileResponse, JSONResponse from fastapi import FastAPI, Response, status, Request from tjnetwork import * JUNCTION = 0 RESERVOIR = 1 TANK = 2 PIPE = 1 NODE_COUNT = 0 LINK_COUNT = 2 prjs = [] inpDir = "C:/inpfiles/" tmpDir = "C:/tmpfiles/" lockedPrjs = {} if not os.path.exists(inpDir): os.mkdir(inpDir) if not os.path.exists(tmpDir): os.mkdir(tmpDir) app = FastAPI() # project operations @app.get("/haveproject/") async def fastapi_have_project(network: str): return have_project(network) @app.post("/createproject/") async def fastapi_create_project(network: str): create_project(network) return network @app.get("/isprojectopen/") async def fastapi_is_project_open(network: str): return is_project_open(network) @app.post("/openproject/") async def fastapi_open_project(network: str): open_project(network) return network @app.post("/closeproject/") async def fastapi_close_project(network: str): close_project(network) return True @app.post("/deleteproject/") async def fastapi_delete_project(network: str): delete_project(network) return True @app.post("/copyproject/") async def fastapi_copy_project(source: str, target: str): copy_project(source, target) return True @app.get("/isprojectlocked/") async def fastapi_is_locked(network: str): return lockedPrjs.get(network) != None @app.post("/lockproject/") async def fastapi_lock_project(network: str, id: str): lockedPrjs[network] = id return True @app.post("/unlockproject/") async def fastapi_unlock_project(network: str, id: str): if lockedPrjs[network] == id: del lockedPrjs[network] return True # undo/redo @app.post("/undo/") async def fastapi_undo(network: str): return execute_undo(network) @app.post("/redo/") async def fastapi_redo(network: str): return execute_redo(network) @app.get("/getcurrentoperationid/") async def fastapi_get_current_operaiton_id(network: str) -> int: return get_current_operation(network) @app.get("/syncwithserver/") def fastapi_sync_with_server(network: str, operationid: int) -> ChangeSet: return sync_with_server(network, operationid) @app.post("/batch/") def fastapi_execute_batch_commands(network: str, request: Request) -> ChangeSet: print(request) jsTxt = request.json() print(jsTxt) joOps = json.loads(jsTxt)['operations'] cs:ChangeSet = ChangeSet({}) for js in joOps: cs.add(js) return api.execute_batch_commands(network, cs) # node @app.get("/getnodes/") async def fastapi_get_nodes(network: str) -> list[str]: return get_nodes(network) @app.get("/isnode/") async def fastapi_is_node(network: str, node: str) -> bool: return is_node(network, node) @app.get("/isjunction/") async def fastapi_is_junction(network: str, node: str) -> bool: return is_junction(network, node) @app.get("/isreservoir/") async def fastapi_is_reservoir(network: str, node: str) -> bool: return is_reservoir(network, node) @app.get("/istank/") async def fastapi_is_tank(network: str, node: str) -> bool: return is_tank(network, node) # link @app.get("/getlinks/") async def fastapi_get_links(network: str) -> list[str]: return get_links(network) @app.get("/islink/") async def fastapi_is_link(network: str, link: str) -> bool: return is_link(network, link) @app.get("/ispipe/") async def fastapi_is_pipe(network: str, link: str) -> bool: return is_pipe(network, link) @app.get("/ispump/") async def fastapi_is_pump(network: str, link: str) -> bool: return is_pump(network, link) @app.get("/isvalve/") async def is_valve(network: str, link: str) -> bool: return is_valve(network, link) # curve @app.get("/getcurves/") async def fastapi_get_curves(network: str) -> list[str]: return get_curves(network) @app.get("/iscurve/") async def fastapi_is_curve(network: str, curve: str) -> bool: return is_curve(network, curve) # parttern @app.get("/getpatterns/") async def fastapi_get_patterns(network: str) -> list[str]: return get_patterns(network) @app.get("/ispattern/") async def fastapi_is_curve(network: str, pattern: str) -> bool: return is_pattern(network, pattern) ############################################################ # junction 2.[JUNCTIONS] ############################################################ @app.get('/getjunctionschema') async def fast_get_junction_schema(network: str) -> dict[str, dict[str, Any]]: return get_junction_schema(network) @app.post("/addjunction/") async def fastapi_add_junction(network: str, junction: str, x: float, y: float, z: float) -> ChangeSet: ps = {'id' : junction, 'x' : x, 'y' : y, 'elevation' : z} return add_junction(network, ChangeSet(ps)) @app.post("/deletejunction/") async def fastapi_delete_junction(network: str, junction: str) -> ChangeSet: ps = {'id' : junction} return delete_junction(network, ChangeSet(ps)) @app.get("/getjunctionelevation/") async def fastapi_get_junction_elevation(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['elevation'] @app.get("/getjunctionx/") async def fastapi_get_junction_x(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['x'] @app.get("/getjunctiony/") async def fastapi_get_junction_x(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['y'] @app.get("/getjunctioncoord/") async def fastapi_get_junction_coord(network: str, junction: str) -> dict[str, float]: ps = get_junction(network, junction) coord = { 'x' : ps['x'], 'y' : ps['y']} return coord @app.get("/getjunctiondemand/") async def fastapi_get_junction_demand(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['demand'] @app.get("/getjunctionpattern/") async def fastapi_get_junction_pattern(network: str, junction: str) -> str: ps = get_junction(network, junction) return ps['pattern'] @app.post("/setjunctionelevation/") async def fastapi_set_junction_elevation(network: str, junction: str, elevation: float) -> ChangeSet: props = {} props['id'] = junction props['elevation'] = elevation return set_junction(network, ChangeSet(props)) @app.post("/setjunctionx/") async def fastapi_set_junction_x(network: str, junction: str, x: float) -> ChangeSet: props = {} props['id'] = junction props['x'] = x return set_junction(network, ChangeSet(props)) @app.post("/setjunctiony/") async def fastapi_set_junction_y(network: str, junction: str, y: float) -> ChangeSet: props = {} props['id'] = junction props['y'] = y return set_junction(network, ChangeSet(props)) @app.post("/setjunctioncoord/") async def fastapi_set_junction_coord(network: str, junction: str, x: float, y: float) -> ChangeSet: props = {} props['id'] = junction props['x'] = x props['y'] = y return set_junction(network, ChangeSet(props)) @app.post("/setjunctiondemand/") async def fastapi_set_junction_demand(network: str, junction: str, demand: float) -> ChangeSet: props = {} props['id'] = junction props['demand'] = demand return set_junction(network, ChangeSet(props)) @app.post("/setjunctionpattern/") async def fastapi_set_junction_pattern(network: str, junction: str, pattern: str) -> ChangeSet: props = {} props['id'] = junction props['pattern'] = pattern return set_junction(network, ChangeSet(props)) @app.get("/getjunctionproperties/") async def fastapi_get_junction_properties(network: str, junction: str) -> dict[str, Any]: return get_junction(network, junction) @app.post("/setjunctionproperties/") async def fastapi_set_junction_properties(network: str, junction: str, props: dict[str, Any]) -> ChangeSet: return set_junction(network, ChangeSet(props)) ############################################################ # reservoir 3.[RESERVOIRS] ############################################################ @app.get('/getreservoirschema') async def fast_get_reservoir_schema(network: str) -> dict[str, dict[str, Any]]: return get_reservoir_schema(network) @app.post("/addreservoir/") async def fastapi_add_reservoir(network: str, reservoir: str, x: float, y: float, head: float) -> ChangeSet: return add_reservoir(network, reservoir, x, y, head) @app.post("/deletereservoir/") async def fastapi_delete_reservoir(network: str, reservoir: str) -> ChangeSet: return delete_reservoir(network, reservoir) @app.get("/getreservoirhead/") async def fastapi_get_reservoir_head(network: str, reservoir: str) -> float | None: ps = get_reservoir(network, reservoir) return ps['head'] @app.get("/getreservoirpattern/") async def fastapi_get_reservoir_pattern(network: str, reservoir: str) -> str | None: ps = get_reservoir(network, reservoir) return ps['pattern'] @app.get("/getreservoirx/") async def fastapi_get_reservoir_x(network: str, reservoir: str) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) return ps['x'] @app.get("/getreservoiry/") async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) return ps['y'] @app.get("/getreservoircoord/") async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) coord = { 'x' : ps['x'], 'y' : ps['y']} return coord @app.post("/setreservoirhead/") async def fastapi_set_reservoir_head(network: str, reservoir: str, head: float) -> ChangeSet: props = {} props['head'] = head return set_reservoir(network, reservoir, props) @app.post("/setreservoirpattern/") async def fastapi_set_reservoir_pattern(network: str, reservoir: str, pattern: str) -> ChangeSet: props = {} props['pattern'] = pattern return set_reservoir(network, reservoir, props) @app.post("/setreservoirx/") async def fastapi_set_reservoir_x(network: str, reservoir: str, x: float) -> ChangeSet: props = {} props['x'] = x return set_reservoir(network, reservoir, props) @app.post("/setreservoirx/") async def fastapi_set_reservoir_y(network: str, reservoir: str, y: float) -> ChangeSet: props = {} props['y'] = y return set_reservoir(network, reservoir, props) @app.post("/setreservoircoord/") async def fastapi_set_reservoir_y(network: str, reservoir: str, x: float, y: float) -> ChangeSet: props = {} props['x'] = x props['y'] = y return set_reservoir(network, reservoir, props) @app.get("/getreservoirproperties/") async def fastapi_get_reservoir_properties(network: str, reservoir: str) -> dict[str, Any]: return get_reservoir(network, reservoir) @app.post("/setreservoirproperties/") async def fastapi_set_reservoir_properties(network: str, reservoir: str, props: dict[str, Any]) -> ChangeSet: return set_reservoir(network, reservoir, props) ############################################################ # tank 4.[TANKS] ############################################################ @app.get('/gettankschema') async def fast_get_tank_schema(network: str) -> dict[str, dict[str, Any]]: return get_tank_schema(network) @app.post("/addtank/") async def fastapi_add_tank(network: str, tank: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet: return add_tank(network, tank, x, y, elevation, init_level, min_level, max_level, diameter, min_vol) @app.post("/deletetank/") async def fastapi_delete_tank(network: str, tank: str) -> ChangeSet: return delete_tank(network, tank) @app.get("/gettankelevation/") async def fastapi_get_tank_elevation(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['elevation'] @app.get("/gettankinitlevel/") async def fastapi_get_tank_init_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['init_level'] @app.get("/gettankminlevel/") async def fastapi_get_tank_min_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['min_level'] @app.get("/gettankmaxlevel/") async def fastapi_get_tank_max_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['max_level'] @app.get("/gettankdiameter/") async def fastapi_get_tank_diameter(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['diameter'] @app.get("/gettankminvol/") async def fastapi_get_tank_min_vol(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['min_vol'] @app.get("/gettankvolcurve/") async def fastapi_get_tank_vol_curve(network: str, tank: str) -> str | None: ps = get_tank(network, tank) return ps['vol_curve'] @app.get("/gettankoverflow/") async def fastapi_get_tank_overflow(network: str, tank: str) -> str | None: ps = get_tank(network, tank) return ps['overflow'] @app.get("/gettankx/") async def fastapi_get_tank_x(network: str, tank: str) -> float: ps = get_tank(network, tank) return ps['x'] @app.get("/gettanky/") async def fastapi_get_tank_x(network: str, tank: str) -> float: ps = get_tank(network, tank) return ps['y'] @app.get("/gettankcoord/") async def fastapi_get_tank_coord(network: str, tank: str) -> dict[str, float]: ps = get_tank(network, tank) coord = { 'x' : ps['x'], 'y' : ps['y']} return coord @app.post("/settankelevation/") async def fastapi_set_tank_elevation(network: str, tank: str, elevation: float) -> ChangeSet: props = {} props['elevation'] = elevation return set_tank(network, tank, props) @app.post("/settankinitlevel/") async def fastapi_set_tank_init_level(network: str, tank: str, init_level: float) -> ChangeSet: props = {} props['init_level'] = init_level return set_tank(network, tank, props) @app.post("/settankminlevel/") async def fastapi_set_tank_min_level(network: str, tank: str, min_level: float) -> ChangeSet: props = {} props['min_level'] = min_level return set_tank(network, tank, props) @app.post("/settankmaxlevel/") async def fastapi_set_tank_max_level(network: str, tank: str, max_level: float) -> ChangeSet: props = {} props['max_level'] = max_level return set_tank(network, tank, props) @app.post("settankdiameter//") async def fastapi_set_tank_diameter(network: str, tank: str, diameter: float) -> ChangeSet: props = {} props['diameter'] = diameter return set_tank(network, tank, props) @app.post("/settankminvol/") async def fastapi_set_tank_min_vol(network: str, tank: str, min_vol: float) -> ChangeSet: props = {} props['min_vol'] = min_vol return set_tank(network, tank, props) @app.post("/settankvolcurve/") async def fastapi_set_tank_vol_curve(network: str, tank: str, vol_curve: str) -> ChangeSet: props = {} props['vol_curve'] = vol_curve return set_tank(network, tank, props) @app.post("/settankoverflow/") async def fastapi_set_tank_overflow(network: str, tank: str, overflow: str) -> ChangeSet: props = {} props['overflow'] = overflow return set_tank(network, tank, props) @app.post("/settankx/") async def fastapi_set_tank_x(network: str, tank: str, x: float) -> ChangeSet: props = {} props['x'] = x return set_tank(network, tank, props) @app.post("/settanky/") async def fastapi_set_tank_y(network: str, tank: str, y: float) -> ChangeSet: props = {} props['y'] = y return set_tank(network, tank, props) @app.post("/settankcoord/") async def fastapi_set_tank_coord(network: str, tank: str, x: float, y: float) -> ChangeSet: props = {} props['x'] = x props['y'] = y return set_tank(network, tank, props) @app.get("/gettankproperties/") async def fastapi_get_tank_properties(network: str, tank: str) -> dict[str, Any]: return get_tank(network, tank) @app.post("/settankproperties/") async def fastapi_set_tank_properties(network: str, tank: str, props: dict[str, Any]) -> ChangeSet: return set_tank(network, tank, props) ############################################################ # pipe 4.[PIPES] ############################################################ @app.get('/getpipeschema') async def fast_get_pipe_schema(network: str) -> dict[str, dict[str, Any]]: return get_pipe_schema(network) @app.post("/addpipe/") async def fastapi_add_pipe(network: str, pipe: str, node1: str, node2: str, length: float = 0, diameter: float = 0, roughness: float = 0, minorloss: float = 0, status: str = PIPE_STATUS_OPEN) -> ChangeSet: props : dict[str, any] = {} props['id'] = pipe props['node1'] = node1 props['node2'] = node2 props['length'] = length props['diameter'] = diameter props['roughness'] = roughness props['minor_loss'] = minorloss props['status'] = status return add_pipe(network, ChangeSet(props)) @app.post("/deletepipe/") async def fastapi_delete_pipe(network: str, pipe: str) -> ChangeSet: return delete_pipe(network, pipe) @app.get("/getpipenode1/") async def fastapi_get_pipe_node1(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps['node1'] @app.get("/getpipenode2/") async def fastapi_get_pipe_node2(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps['node2'] @app.get("/getpipelength/") async def fastapi_get_pipe_length(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['length'] @app.get("/getpipediameter/") async def fastapi_get_pipe_diameter(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['diameter'] @app.get("/getpiperoughness/") async def fastapi_get_pipe_roughness(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['roughness'] @app.get("/getpipeminorloss/") async def fastapi_get_pipe_minor_loss(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['minor_loss'] @app.get("/getpipestatus/") async def fastapi_get_pipe_status(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps['status'] @app.post("/setpipenode1/") async def fastapi_set_pipe_node1(network: str, pipe: str, node1: str) -> ChangeSet: ps = get_pipe(network, pipe) return ps['node1'] @app.post("/setpipenode2/") async def fastapi_set_pipe_node2(network: str, pipe: str, node2: str) -> ChangeSet: ps = get_pipe(network, pipe) return ps['node2'] @app.post("/setpipelength/") async def fastapi_set_pipe_length(network: str, pipe: str, length: float) -> ChangeSet: props = {} props['length'] = length return set_pipe(network, pipe, props) @app.post("/setpipediameter/") async def fastapi_set_pipe_diameter(network: str, pipe: str, diameter: float) -> ChangeSet: props = {} props['diameter'] = diameter return set_pipe(network, pipe, props) @app.post("/setpiperoughness/") async def fastapi_set_pipe_roughness(network: str, pipe: str, roughness: float) -> ChangeSet: props = {} props['roughness'] = roughness return set_pipe(network, pipe, props) @app.post("/setpipeminorloss/") async def fastapi_set_pipe_minor_loss(network: str, pipe: str, minor_loss: float) -> ChangeSet: props = {} props['minor_loss'] = minor_loss return set_pipe(network, pipe, props) @app.post("/setpipestatus/") async def fastapi_set_pipe_status(network: str, pipe: str, status: float) -> ChangeSet: props = {} props['status'] = status return set_pipe(network, pipe, props) @app.get("/getpipeproperties/") async def fastapi_get_pipe_properties(network: str, pipe: str) -> dict[str, Any]: return get_pipe(network, pipe) @app.post("/setpipeproperties/") async def fastapi_set_pipe_properties(network: str, pipe: str, props: dict[str, Any]) -> ChangeSet: return set_pipe(network, pipe, props) ############################################################ # pump 4.[PUMPS] ############################################################ @app.get('/getpumpschema') async def fast_get_pump_schema(network: str) -> dict[str, dict[str, Any]]: return get_pump_schema(network) @app.post("/addpump/") async def fastapi_add_pump(network: str, pump: str, node1: str, node2: str) -> ChangeSet: props: dict[str, any] = { 'id' : pump, 'node1' : node1, 'node2' : node2 } return add_pump(network, ChangeSet(props)) @app.post("/deletepump/") async def fastapi_delete_pump(network: str, pump: str) -> ChangeSet: return delete_pump(network, pump) @app.get("/getpumpnode1/") async def fastapi_get_pump_node1(network: str, pump: str) -> str | None: ps = get_pump(network, pump) return ps['node1'] @app.get("/getpumpnode2/") async def fastapi_get_pump_node2(network: str, pump: str) -> str | None: ps = get_pump(network, pump) return ps['node2'] @app.post("/setpumpnode1/") async def fastapi_set_pump_node1(network: str, pump: str, node1: str) -> ChangeSet: props = {} props['node1'] = node1 return set_pump(network, pump, props) @app.post("/setpumpnode2/") async def fastapi_set_pump_node2(network: str, pump: str, node2: str) -> ChangeSet: props = {} props['node2'] = node2 return set_pump(network, pump, props) @app.get("/getpumpproperties/") async def fastapi_get_pump_properties(network: str, pump: str) -> dict[str, Any]: return get_pump(network, pump) @app.post("/setpumpproperties/") async def fastapi_set_pump_properties(network: str, pump: str, props: dict[str, Any]) -> ChangeSet: return set_pump(network, pump, props) ############################################################ # valve 4.[VALVES] ############################################################ @app.get('/getvalveschema') async def fast_get_valve_schema(network: str) -> dict[str, dict[str, Any]]: return get_valve_schema(network) @app.post("/addvalve/") async def fastapi_add_valve(network: str, valve: str, node1: str, node2: str, diameter: float = 0, type: str = VALVES_TYPE_PRV, setting: float = 0, minor_loss: float = 0) -> ChangeSet: return add_valve(network, valve, node1, node2, diameter, type, setting, minor_loss) @app.post("/deletevalve/") async def fastapi_delete_valve(network: str, valve: str) -> ChangeSet: return delete_valve(network, valve) @app.get("/getvalvenode1/") async def fastapi_get_valve_node1(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps['node1'] @app.get("/getvalvenode2/") async def fastapi_get_valve_node2(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps['node2'] @app.get("/getvalvediameter/") async def fastapi_get_valve_diameter(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps['diameter'] @app.get("/getvalvetype/") async def fastapi_get_valve_type(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps['type'] @app.get("/getvalvesetting/") async def fastapi_get_valve_setting(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps['setting'] @app.get("/getvalveminorloss/") async def fastapi_get_valve_minor_loss(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps['minor_loss'] @app.post("/setvalvenode1/") async def fastapi_set_valve_node1(network: str, valve: str, node1: str) -> ChangeSet: props = {} props['node1'] = node1 return set_valve(network, valve, props) @app.post("/setvalvenode2/") async def fastapi_set_valve_node2(network: str, valve: str, node2: str) -> ChangeSet: props = {} props['node2'] = node2 return set_valve(network, valve, props) @app.post("/setvalvenodediameter/") async def fastapi_set_valve_diameter(network: str, valve: str, diameter: float) -> ChangeSet: props = {} props['diameter'] = diameter return set_valve(network, valve, props) @app.post("/setvalvetype/") async def fastapi_set_valve_type(network: str, valve: str, type: str) -> ChangeSet: props = {} props['type'] = type return set_valve(network, valve, props) @app.post("/setvalvesetting/") async def fastapi_set_valve_setting(network: str, valve: str, setting: float) -> ChangeSet: props = {} props['setting'] = setting return set_valve(network, valve, props) @app.get("/getvalveproperties/") async def fastapi_get_valve_properties(network: str, valve: str) -> dict[str, Any]: return get_valve(network, valve) @app.post("/setvalveproperties/") async def fastapi_set_valve_properties(network: str, valve: str, props: dict[str, Any]) -> ChangeSet: return set_valve(network, valve, props) # inp file @app.post("/uploadinp/", status_code=status.HTTP_200_OK) async def upload_inp(file: bytes = File(), name: str = None): filePath = inpDir + str(name) f = open(filePath, 'wb') f.write(file) f.close() return True @app.get("/downloadinp/", status_code=status.HTTP_200_OK) async def download_inp(name: str, response: Response): filePath = inpDir + name if os.path.exists(filePath): return FileResponse(filePath, media_type='application/octet-stream', filename="inp.inp") else: response.status_code = status.HTTP_400_BAD_REQUEST return True @app.get("/getjson/") async def get_json(): return JSONResponse( status_code = status.HTTP_400_BAD_REQUEST, content={ 'code': 400, 'message': "this is message", 'data': 123, } )