import asyncio, os, io, json, time, pickle, redis, datetime, logging, threading, uvicorn, multiprocessing, asyncio, shutil, random from typing import * from urllib.request import Request from xml.dom import minicompat from pydantic import BaseModel from starlette.responses import FileResponse, JSONResponse from starlette.middleware.base import BaseHTTPMiddleware from starlette.types import Receive from fastapi import FastAPI, File, UploadFile, Response, status, Request, Body, HTTPException,Query from fastapi.responses import PlainTextResponse from fastapi.middleware.gzip import GZipMiddleware from tjnetwork import * from multiprocessing import Value import uvicorn import msgpack from run_simulation import run_simulation, run_simulation_ex from online_Analysis import * from fastapi.middleware.cors import CORSMiddleware from influxdb_client import InfluxDBClient, BucketsApi, WriteApi, OrganizationsApi, Point, QueryApi from typing import List, Dict from datetime import datetime, timedelta, timezone from dateutil import parser import influxdb_info import influxdb_api import py_linq import time_api JUNCTION = 0 RESERVOIR = 1 TANK = 2 PIPE = 1 NODE_COUNT = 0 LINK_COUNT = 2 prjs = [] inpDir = "C:/inpfiles/" tmpDir = "C:/tmpfiles/" lockedPrjs = {} if not os.path.exists(inpDir): os.mkdir(inpDir) if not os.path.exists(tmpDir): os.mkdir(tmpDir) app = FastAPI() # 将 Query的信息 序列号到 redis/json, 默认不支持datetime,需要自定义 # 自定义序列化函数 # 序列化处理器 def encode_datetime(obj): """将datetime转换为可序列化的字典结构""" if isinstance(obj, datetime): return { '__datetime__': True, 'as_str': obj.strftime("%Y%m%dT%H:%M:%S.%f") } return obj # 反序列化处理器 def decode_datetime(obj): """将字典还原为datetime对象""" if '__datetime__' in obj: return datetime.strptime( obj['as_str'], "%Y%m%dT%H:%M:%S.%f" ) return obj # 初始化 Redis 连接 # 用redis 限制并发访u redis_client = redis.Redis(host="localhost", port=6379, db=0) # influxdb数据库连接信息 # influx_url = influxdb_info.url # influx_token = influxdb_info.token # influx_org_name = influxdb_info.org # influx_client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org_name, timeout=100*1000) # 100 seconds # 配置日志记录器 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # 配置 CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源 allow_credentials=True, # 允许传递凭证(Cookie、HTTP 头等) allow_methods=["*"], # 允许所有 HTTP 方法 allow_headers=["*"], # 允许所有 HTTP 头 ) # 定义一个共享变量 lock_simulation = Value('i', 0) app.add_middleware(GZipMiddleware, minimum_size=1000) async def receive_with_body(body: bytes) -> Receive: async def receive() -> dict: return { "type": "http.request", "body": body, "more_body": False, } return receive @app.middleware("http") async def log_requests(request: Request, call_next): if request.url.path == "/favicon.ico": return Response(status_code=204) # 记录接收请求的时间 request_time = time.time() request_time_str = datetime.fromtimestamp(request_time).strftime('%Y-%m-%d %H:%M:%S') # 判断是否为文件上传 is_file_upload = request.headers.get("content-type", "").startswith("multipart/form-data") # 记录接收的请求数据 logging.info(f"Received request: {request.method} {request.url} at {request_time_str}") if not is_file_upload: request_body = await request.body() logging.info(f"Request body: {request_body.decode('utf-8')}") # 创建新的 Request 对象,传递缓存的请求体 receive = await receive_with_body(request_body) request = Request(request.scope, receive=receive) else: logging.info(f"Request body: File") # 处理请求 response = await call_next(request) # 记录发送响应的时间 response_time = time.time() response_time_str = datetime.fromtimestamp(response_time).strftime('%Y-%m-%d %H:%M:%S') processing_time = response_time - request_time # 记录发送的响应数据 # response_body = b"" # async for chunk in response.body_iterator: # response_body += chunk # 记录响应的状态码以及是否成功 success_status = response.status_code < 400 logging.info(f"Response status: {response.status_code} at {response_time_str}, success: {success_status}") # logging.info(f"Response body: {response_body.decode('utf-8')}") logging.info(f"Processing time: {processing_time:.3f} seconds") return response @app.on_event("startup") async def startup_db(): logger.info('**********************************************************') logger.info(str(datetime.now())) logger.info("TJWater CloudService is starting...") logger.info('**********************************************************') # open 'bb' by default open_project("bb") ############################################################ # extension_data ############################################################ @app.get('/getallextensiondatakeys/') async def fastapi_get_all_extension_data_keys(network: str) -> list[str]: return get_all_extension_data_keys(network) @app.get('/getallextensiondata/') async def fastapi_get_all_extension_data(network: str) -> dict[str, Any]: return get_all_extension_data(network) @app.get('/getextensiondata/') async def fastapi_get_extension_data(network: str, key: str) -> str | None: return get_extension_data(network, key) @app.post('/setextensiondata',response_model=None) async def fastapi_set_extension_data(network: str, req: Request) -> ChangeSet: props = await req.json() print(props) cs = set_extension_data(network, ChangeSet(props)) print(cs.operations[0]) return cs ############################################################ # project ############################################################ @app.get('/listprojects/') async def fastapi_list_projects() -> list[str]: return list_project() @app.get("/haveproject/") async def fastapi_have_project(network: str): return have_project(network) @app.post("/createproject/") async def fastapi_create_project(network: str): create_project(network) return network @app.post("/deleteproject/") async def fastapi_delete_project(network: str): delete_project(network) return True @app.get("/isprojectopen/") async def fastapi_is_project_open(network: str): return is_project_open(network) @app.post("/openproject/") async def fastapi_open_project(network: str): open_project(network) return network @app.post("/closeproject/") async def fastapi_close_project(network: str): close_project(network) return True @app.post("/copyproject/") async def fastapi_copy_project(source: str, target: str): copy_project(source, target) return True @app.post("/importinp/") async def fastapi_import_inp(network: str, req: Request): jo_root = await req.json() inp_text = jo_root['inp'] ps = { "inp" : inp_text } ret = import_inp(network, ChangeSet(ps)) print(ret) return ret @app.get("/exportinp/",response_model=None) async def fastapi_export_inp(network: str, version: str) -> ChangeSet: cs = export_inp(network, version) op = cs.operations[0] open_project(network) op['vertex'] = json.dumps(get_all_vertices(network)) op['scada'] = json.dumps(get_all_scada_elements(network)) op['dma'] = json.dumps(get_all_district_metering_areas(network)) op['sa'] = json.dumps(get_all_service_areas(network)) op['vd'] = json.dumps(get_all_virtual_districts(network)) op['legend'] = get_extension_data(network, 'legend') db = get_extension_data(network, 'scada_db') print(db) scada_db = '' if db: scada_db = db print(scada_db) op['scada_db'] = scada_db close_project(network) return cs @app.post("/readinp/") async def fastapi_read_inp(network: str, inp: str) -> bool: read_inp(network, inp) return True @app.get("/dumpinp/") async def fastapi_dump_inp(network: str, inp: str) -> bool: dump_inp(network, inp) return True # 必须用这个PlainTextResponse,不然每个key都有引号 @app.get("/runproject/", response_class = PlainTextResponse) async def fastapi_run_project(network: str) -> str: lock_key = "exclusive_api_lock" timeout = 120 # 锁自动过期时间(秒) # 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间) acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout) if not acquired: raise HTTPException(status_code=409, detail="is in simulation") else: try: return run_project(network) finally: # 手动释放锁(可选,依赖过期时间自动释放更安全) redis_client.delete(lock_key) # DingZQ, 2025-02-04, 返回dict[str, Any] # output 和 report # output 是 json # report 是 text @app.get("/runprojectreturndict/") async def fastapi_run_project_return_dict(network: str) -> dict[str, Any]: lock_key = "exclusive_api_lock" timeout = 120 # 锁自动过期时间(秒) # 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间) acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout) if not acquired: raise HTTPException(status_code=409, detail="is in simulation") else: try: return run_project_return_dict(network) finally: # 手动释放锁(可选,依赖过期时间自动释放更安全) redis_client.delete(lock_key) # put in inp folder, name without extension @app.get("/runinp/") async def fastapi_run_inp(network: str) -> str: return run_inp(network) # path is absolute path @app.get("/dumpoutput/") async def fastapi_dump_output(output: str) -> str: return dump_output(output) @app.get("/isprojectlocked/") async def fastapi_is_locked(network: str, req: Request): return str in lockedPrjs.keys() @app.get("/isprojectlockedbyme/") async def fastapi_is_locked_by_me(network: str, req: Request): client_host = req.client.host return lockedPrjs.get(network) == client_host # 0 successfully locked # 1 already locked by you # 2 locked by others @app.post("/lockproject/") async def fastapi_lock_project(network: str, req: Request): client_host = req.client.host if not network in lockedPrjs.keys(): lockedPrjs[network] = client_host return 0 else: if lockedPrjs.get(network) == client_host: return 1 else: return 2 @app.post("/unlockproject/") def fastapi_unlock_project(network: str, req: Request): client_host = req.client.host if lockedPrjs[network] == client_host: print("delete key") del lockedPrjs[network] return True return False ### operations @app.get('/getcurrentoperationid/') async def fastapi_get_current_operaiton_id(network: str) -> int: return get_current_operation(network) @app.post('/undo/') async def fastapi_undo(network: str): return execute_undo(network) @app.post('/redo/') async def fastapi_redo(network: str): return execute_redo(network) @app.get('/getsnapshots/') def fastapi_list_snapshot(network: str) -> list[tuple[int, str]]: return list_snapshot(network) @app.get('/havesnapshot/') async def fastapi_have_snapshot(network: str, tag: str) -> bool: return have_snapshot(network, tag) @app.get('/havesnapshotforoperation/') async def fastapi_have_snapshot_for_operation(network: str, operation: int) -> bool: return have_snapshot_for_operation(network, operation) @app.get('/havesnapshotforcurrentoperation/') async def fastapi_have_snapshot_for_current_operation(network: str) -> bool: return have_snapshot_for_current_operation(network) @app.post('/takesnapshotforoperation/') async def fastapi_take_snapshot_for_operation(network: str, operation: int, tag: str) -> None: return take_snapshot_for_operation(network, operation, tag) @app.post('takenapshotforcurrentoperation') async def fastapi_take_snapshot_for_current_operation(network: str, tag: str) -> None: return take_snapshot_for_current_operation(network, tag) @app.post('/takesnapshot/') def fastapi_take_snapshot(network: str, tag: str) -> None: return take_snapshot(network, tag) @app.post('/picksnapshot/',response_model=None) def fastapi_pick_snapshot(network: str, tag: str, discard: bool = False) -> ChangeSet: return pick_snapshot(network, tag, discard) @app.post('/pickoperation/',response_model=None) async def fastapi_pick_operation(network: str, operation: int, discard: bool = False) -> ChangeSet: return pick_operation(network, operation, discard) @app.get("/syncwithserver/",response_model=None) async def fastapi_sync_with_server(network: str, operation: int) -> ChangeSet: return sync_with_server(network, operation) @app.post("/batch/",response_model=None) async def fastapi_execute_batch_commands(network: str, req: Request)-> ChangeSet: jo_root = await req.json() cs: ChangeSet = ChangeSet() cs.operations = jo_root['operations'] rcs = execute_batch_commands(network, cs) return rcs @app.post("/compressedbatch/",response_model=None) async def fastapi_execute_compressed_batch_commands(network: str, req: Request)-> ChangeSet: jo_root = await req.json() cs: ChangeSet = ChangeSet() cs.operations = jo_root['operations'] return execute_batch_command(network, cs) @app.get("/getrestoreoperation/") async def fastapi_get_restore_operation(network : str) -> int: return get_restore_operation(network) @app.post("/setrestoreoperation/") async def fastapi_set_restore_operation(network: str, operation: int) -> None: return set_restore_operation(network, operation) ############################################################ # type ############################################################ @app.get('/isnode/') async def fastapi_is_node(network: str, node: str) -> bool: return is_node(network, node) @app.get('/isjunction/') async def fastapi_is_junction(network: str, node: str) -> bool: return is_junction(network, node) @app.get('/isreservoir/') async def fastapi_is_reservoir(network: str, node: str) -> bool: return is_reservoir(network, node) @app.get('/istank/') async def fastapi_is_tank(network: str, node: str) -> bool: return is_tank(network, node) @app.get('/islink/') async def fastapi_is_link(network: str, link: str) -> bool: return is_link(network, link) @app.get('/ispipe/') async def fastapi_is_pipe(network: str, link: str) -> bool: return is_pipe(network, link) @app.get('/ispump/') async def fastapi_is_pump(network: str, link: str) -> bool: return is_pump(network, link) @app.get('/isvalve/') async def fastapi_is_valve(network: str, link: str) -> bool: return is_valve(network, link) # DingZQ, 2025-02-05 @app.get('/getnodetype/') async def fastapi_get_node_type(network: str, node: str) -> str: return get_node_type(network, node) @app.get('/getlinktype/') async def fastapi_get_link_type(network: str, link: str) -> str: return get_link_type(network, link) @app.get('/getelementtype/') async def fastapi_get_element_type(network: str, element: str) -> str: return get_element_type(network, element) @app.get('/getelementtypevalue/') async def fastapi_get_element_type_value(network: str, element: str) -> int: return get_element_type_value(network, element) @app.get('/iscurve/') async def fastapi_is_curve(network: str, curve: str) -> bool: return is_curve(network, curve) @app.get('/ispattern/') async def fastapi_is_pattern(network: str, pattern: str) -> bool: return is_pattern(network, pattern) @app.get("/getnodes/") async def fastapi_get_nodes(network: str) -> list[str]: return get_nodes(network) @app.get("/getlinks/") async def fastapi_get_links(network: str) -> list[str]: return get_links(network) @app.get("/getcurves/") async def fastapi_get_curves(network: str) -> list[str]: return get_curves(network) @app.get("/getpatterns/") async def fastapi_get_patterns(network: str) -> list[str]: return get_patterns(network) @app.get("/getnodelinks/") def get_node_links(network: str, node: str) -> list[str]: return get_node_links(network, node) ############################################################ # DingZQ, 2025-02-05 # 用统一的接口来获取 Node & Link properties, Node和Link的Id可以一样,不能进一步统一成获取Element 的 properties # Node & Link properties ############################################################ @app.get('/getnodeproperties/') async def fast_get_node_properties(network: str, node: str) -> dict[str, Any]: return get_node_properties(network, node) @app.get('/getlinkproperties/') async def fast_get_link_properties(network: str, link: str) -> dict[str, Any]: return get_link_properties(network, link) @app.get('/getscadaproperties/') async def fast_get_scada_properties(network: str, scada: str) -> dict[str, Any]: return get_scada_info(network, scada) @app.get('/getallscadaproperties/') async def fast_get_all_scada_properties(network: str) -> list[dict[str, Any]]: return get_all_scada_info(network) # elementtype can be 'node' or 'link' or 'scada' @app.get('/getelementpropertieswithtype/') async def fast_get_element_properties_with_type(network: str, elementtype: str, element: str) -> dict[str, Any]: return get_element_properties_with_type(network, elementtype, element) # type can be 'node' or 'link' or 'scada' @app.get('/getelementproperties/') async def fast_get_element_properties(network: str, element: str) -> dict[str, Any]: return get_element_properties(network, element) ############################################################ # title 1.[TITLE] ############################################################ @app.get('/gettitleschema/') async def fast_get_title_schema(network: str) -> dict[str, dict[str, Any]]: return get_title_schema(network) @app.get('/gettitle/') async def fast_get_title(network: str) -> dict[str, Any]: return get_title(network) @app.get('/settitle/',response_model=None) async def fastapi_set_title(network: str, req: Request) -> ChangeSet: props = await req.json() return set_title(network, ChangeSet(props)) ############################################################ # junction 2.[JUNCTIONS] ############################################################ @app.get('/getjunctionschema') async def fast_get_junction_schema(network: str) -> dict[str, dict[str, Any]]: return get_junction_schema(network) @app.post("/addjunction/",response_model=None) async def fastapi_add_junction(network: str, junction: str, x: float, y: float, z: float) -> ChangeSet: ps = { 'id' : junction, 'x' : x, 'y' : y, 'elevation' : z } return add_junction(network, ChangeSet(ps)) @app.post("/deletejunction/",response_model=None) async def fastapi_delete_junction(network: str, junction: str) -> ChangeSet: ps = {'id' : junction} return delete_junction(network, ChangeSet(ps)) @app.get("/getjunctionelevation/") async def fastapi_get_junction_elevation(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['elevation'] @app.get("/getjunctionx/") async def fastapi_get_junction_x(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['x'] @app.get("/getjunctiony/") async def fastapi_get_junction_x(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['y'] @app.get("/getjunctioncoord/") async def fastapi_get_junction_coord(network: str, junction: str) -> dict[str, float]: ps = get_junction(network, junction) coord = { 'x' : ps['x'], 'y' : ps['y'] } return coord @app.get("/getjunctiondemand/") async def fastapi_get_junction_demand(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps['demand'] @app.get("/getjunctionpattern/") async def fastapi_get_junction_pattern(network: str, junction: str) -> str: ps = get_junction(network, junction) return ps['pattern'] @app.post("/setjunctionelevation/",response_model=None) async def fastapi_set_junction_elevation(network: str, junction: str, elevation: float) -> ChangeSet: ps = { 'id' : junction, 'elevation' : elevation } return set_junction(network, ChangeSet(ps)) @app.post("/setjunctionx/",response_model=None) async def fastapi_set_junction_x(network: str, junction: str, x: float) -> ChangeSet: ps = { 'id' : junction, 'x' : x } return set_junction(network, ChangeSet(ps)) @app.post("/setjunctiony/",response_model=None) async def fastapi_set_junction_y(network: str, junction: str, y: float) -> ChangeSet: ps = { 'id' : junction, 'y' : y } return set_junction(network, ChangeSet(ps)) @app.post("/setjunctioncoord/",response_model=None) async def fastapi_set_junction_coord(network: str, junction: str, x: float, y: float) -> ChangeSet: ps = { 'id' : junction, 'x' : x, 'y' : y } return set_junction(network, ChangeSet(ps)) @app.post("/setjunctiondemand/",response_model=None) async def fastapi_set_junction_demand(network: str, junction: str, demand: float) -> ChangeSet: ps = { 'id' : junction, 'demand' : demand } return set_junction(network, ChangeSet(ps)) @app.post("/setjunctionpattern/",response_model=None) async def fastapi_set_junction_pattern(network: str, junction: str, pattern: str) -> ChangeSet: ps = { 'id' : junction, 'pattern' : pattern } return set_junction(network, ChangeSet(ps)) @app.get("/getjunctionproperties/") async def fastapi_get_junction_properties(network: str, junction: str) -> dict[str, Any]: return get_junction(network, junction) # DingZQ, 2025-03-29 @app.get("/getalljunctionproperties/") async def fastapi_get_all_junction_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getalljunctionproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_junctions(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setjunctionproperties/",response_model=None) async def fastapi_set_junction_properties(network: str, junction: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : junction } | props return set_junction(network, ChangeSet(ps)) ############################################################ # reservoir 3.[RESERVOIRS] ############################################################ @app.get('/getreservoirschema') async def fast_get_reservoir_schema(network: str) -> dict[str, dict[str, Any]]: return get_reservoir_schema(network) @app.post("/addreservoir/",response_model=None) async def fastapi_add_reservoir(network: str, reservoir: str, x: float, y: float, head: float) -> ChangeSet: ps = { 'id' : reservoir, 'x' : x, 'y' : y, 'head' : head } return add_reservoir(network, ChangeSet(ps)) @app.post("/deletereservoir/",response_model=None) async def fastapi_delete_reservoir(network: str, reservoir: str) -> ChangeSet: ps = { 'id' : reservoir } return delete_reservoir(network, ChangeSet(ps)) @app.get("/getreservoirhead/") async def fastapi_get_reservoir_head(network: str, reservoir: str) -> float | None: ps = get_reservoir(network, reservoir) return ps['head'] @app.get("/getreservoirpattern/") async def fastapi_get_reservoir_pattern(network: str, reservoir: str) -> str | None: ps = get_reservoir(network, reservoir) return ps['pattern'] @app.get("/getreservoirx/") async def fastapi_get_reservoir_x(network: str, reservoir: str) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) return ps['x'] @app.get("/getreservoiry/") async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) return ps['y'] @app.get("/getreservoircoord/") async def fastapi_get_reservoir_y(network: str, reservoir: str) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) coord = { 'id' : reservoir, 'x' : ps['x'], 'y' : ps['y'] } return coord @app.post("/setreservoirhead/",response_model=None) async def fastapi_set_reservoir_head(network: str, reservoir: str, head: float) -> ChangeSet: ps = { 'id' : reservoir, 'head' : head } return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoirpattern/",response_model=None) async def fastapi_set_reservoir_pattern(network: str, reservoir: str, pattern: str) -> ChangeSet: ps = { 'id' : reservoir, 'pattern' : pattern } return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoirx/",response_model=None) async def fastapi_set_reservoir_x(network: str, reservoir: str, x: float) -> ChangeSet: ps = { 'id' : reservoir, 'x' : x } return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoirx/",response_model=None) async def fastapi_set_reservoir_y(network: str, reservoir: str, y: float) -> ChangeSet: ps = { 'id' : reservoir, 'y' : y } return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoircoord/",response_model=None) async def fastapi_set_reservoir_y(network: str, reservoir: str, x: float, y: float) -> ChangeSet: ps = { 'id' : reservoir, 'x' : x, 'y' : y } return set_reservoir(network, ChangeSet(ps)) @app.get("/getreservoirproperties/") async def fastapi_get_reservoir_properties(network: str, reservoir: str) -> dict[str, Any]: return get_reservoir(network, reservoir) # DingZQ, 2025-03-29 @app.get("/getallreservoirproperties/") async def fastapi_get_all_reservoir_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallreservoirproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_reservoirs(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setreservoirproperties/",response_model=None) async def fastapi_set_reservoir_properties(network: str, reservoir: str , req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : reservoir } | props return set_reservoir(network, ChangeSet(ps)) ############################################################ # tank 4.[TANKS] ############################################################ @app.get('/gettankschema') async def fast_get_tank_schema(network: str) -> dict[str, dict[str, Any]]: return get_tank_schema(network) @app.post("/addtank/",response_model=None) async def fastapi_add_tank(network: str, tank: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0) -> ChangeSet: ps = { 'id' : tank, 'x' : x, 'y' : y, 'elevation' : elevation, 'init_level' : init_level, 'min_level' : min_level, 'max_level' : max_level, 'diameter' : diameter, 'min_vol' : min_vol } return add_tank(network, ChangeSet(ps)) @app.post("/deletetank/",response_model=None) async def fastapi_delete_tank(network: str, tank: str) -> ChangeSet: ps = { 'id' : tank } return delete_tank(network, ChangeSet(ps)) @app.get("/gettankelevation/") async def fastapi_get_tank_elevation(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['elevation'] @app.get("/gettankinitlevel/") async def fastapi_get_tank_init_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['init_level'] @app.get("/gettankminlevel/") async def fastapi_get_tank_min_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['min_level'] @app.get("/gettankmaxlevel/") async def fastapi_get_tank_max_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['max_level'] @app.get("/gettankdiameter/") async def fastapi_get_tank_diameter(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['diameter'] @app.get("/gettankminvol/") async def fastapi_get_tank_min_vol(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps['min_vol'] @app.get("/gettankvolcurve/") async def fastapi_get_tank_vol_curve(network: str, tank: str) -> str | None: ps = get_tank(network, tank) return ps['vol_curve'] @app.get("/gettankoverflow/") async def fastapi_get_tank_overflow(network: str, tank: str) -> str | None: ps = get_tank(network, tank) return ps['overflow'] @app.get("/gettankx/") async def fastapi_get_tank_x(network: str, tank: str) -> float: ps = get_tank(network, tank) return ps['x'] @app.get("/gettanky/") async def fastapi_get_tank_x(network: str, tank: str) -> float: ps = get_tank(network, tank) return ps['y'] @app.get("/gettankcoord/") async def fastapi_get_tank_coord(network: str, tank: str) -> dict[str, float]: ps = get_tank(network, tank) coord = { 'x' : ps['x'], 'y' : ps['y'] } return coord @app.post("/settankelevation/",response_model=None) async def fastapi_set_tank_elevation(network: str, tank: str, elevation: float) -> ChangeSet: ps = { 'id' : tank, 'elevation' : elevation } return set_tank(network, ChangeSet(ps)) @app.post("/settankinitlevel/",response_model=None) async def fastapi_set_tank_init_level(network: str, tank: str, init_level: float) -> ChangeSet: ps = { 'id' : tank, 'init_level' : init_level } return set_tank(network, ChangeSet(ps)) @app.post("/settankminlevel/",response_model=None) async def fastapi_set_tank_min_level(network: str, tank: str, min_level: float) -> ChangeSet: ps = { 'id' : tank, 'min_level' : min_level } return set_tank(network, ChangeSet(ps)) @app.post("/settankmaxlevel/",response_model=None) async def fastapi_set_tank_max_level(network: str, tank: str, max_level: float) -> ChangeSet: ps = { 'id' : tank, 'max_level' : max_level } return set_tank(network, ChangeSet(ps)) @app.post("settankdiameter//",response_model=None) async def fastapi_set_tank_diameter(network: str, tank: str, diameter: float) -> ChangeSet: ps = { 'id' : tank, 'diameter' : diameter } return set_tank(network, ChangeSet(ps)) @app.post("/settankminvol/",response_model=None) async def fastapi_set_tank_min_vol(network: str, tank: str, min_vol: float) -> ChangeSet: ps = { 'id' : tank, 'min_vol' : min_vol } return set_tank(network, ChangeSet(ps)) @app.post("/settankvolcurve/",response_model=None) async def fastapi_set_tank_vol_curve(network: str, tank: str, vol_curve: str) -> ChangeSet: ps = { 'id' : tank, 'vol_curve' : vol_curve} return set_tank(network, ChangeSet(ps)) @app.post("/settankoverflow/",response_model=None) async def fastapi_set_tank_overflow(network: str, tank: str, overflow: str) -> ChangeSet: ps = { 'id' : tank, 'overflow' : overflow } return set_tank(network, ChangeSet(ps)) @app.post("/settankx/",response_model=None) async def fastapi_set_tank_x(network: str, tank: str, x: float) -> ChangeSet: ps = { 'id' : tank, 'x' : x } return set_tank(network, ChangeSet(ps)) @app.post("/settanky/",response_model=None) async def fastapi_set_tank_y(network: str, tank: str, y: float) -> ChangeSet: ps = { 'id' : tank, 'y' : y } return set_tank(network, ChangeSet(ps)) @app.post("/settankcoord/",response_model=None) async def fastapi_set_tank_coord(network: str, tank: str, x: float, y: float) -> ChangeSet: ps = { 'id' : tank, 'x' : x, 'y' : y } return set_tank(network, ChangeSet(ps)) @app.get("/gettankproperties/") async def fastapi_get_tank_properties(network: str, tank: str) -> dict[str, Any]: return get_tank(network, tank) # DingZQ, 2025-03-29 @app.get("/getalltankproperties/") async def fastapi_get_all_tank_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getalltankproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_tanks(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/settankproperties/",response_model=None) async def fastapi_set_tank_properties(network: str, tank: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : tank } | props return set_tank(network, ChangeSet(ps)) ############################################################ # pipe 4.[PIPES] ############################################################ @app.get('/getpipeschema') async def fastapi_get_pipe_schema(network: str) -> dict[str, dict[str, Any]]: return get_pipe_schema(network) @app.post("/addpipe/",response_model=None) async def fastapi_add_pipe(network: str, pipe: str, node1: str, node2: str, length: float = 0, diameter: float = 0, roughness: float = 0, minor_loss: float = 0, status: str = PIPE_STATUS_OPEN) -> ChangeSet: ps = { 'id' : pipe, 'node1' : node1, 'node2' : node2, 'length' : length, 'diameter' : diameter, 'roughness' : roughness, 'minor_loss' : minor_loss, 'status' : status } return add_pipe(network, ChangeSet(ps)) @app.post("/deletepipe/",response_model=None) async def fastapi_delete_pipe(network: str, pipe: str) -> ChangeSet: ps = {'id' : pipe} return delete_pipe(network, ChangeSet(ps)) @app.get("/getpipenode1/") async def fastapi_get_pipe_node1(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps['node1'] @app.get("/getpipenode2/") async def fastapi_get_pipe_node2(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps['node2'] @app.get("/getpipelength/") async def fastapi_get_pipe_length(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['length'] @app.get("/getpipediameter/") async def fastapi_get_pipe_diameter(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['diameter'] @app.get("/getpiperoughness/") async def fastapi_get_pipe_roughness(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['roughness'] @app.get("/getpipeminorloss/") async def fastapi_get_pipe_minor_loss(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps['minor_loss'] @app.get("/getpipestatus/") async def fastapi_get_pipe_status(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps['status'] @app.post("/setpipenode1/",response_model=None) async def fastapi_set_pipe_node1(network: str, pipe: str, node1: str) -> ChangeSet: ps = { 'id' : pipe, 'node1' : node1 } return set_pipe(network, ChangeSet(ps)) @app.post("/setpipenode2/",response_model=None) async def fastapi_set_pipe_node2(network: str, pipe: str, node2: str) -> ChangeSet: ps = { 'id' : pipe, 'node2' : node2 } return set_pipe(network, ChangeSet(ps)) @app.post("/setpipelength/",response_model=None) async def fastapi_set_pipe_length(network: str, pipe: str, length: float) -> ChangeSet: ps = { 'id' : pipe, 'length' : length } return set_pipe(network, ChangeSet(ps)) @app.post("/setpipediameter/",response_model=None) async def fastapi_set_pipe_diameter(network: str, pipe: str, diameter: float) -> ChangeSet: ps = { 'id' : pipe, 'diameter' : diameter } return set_pipe(network, ChangeSet(ps)) @app.post("/setpiperoughness/",response_model=None) async def fastapi_set_pipe_roughness(network: str, pipe: str, roughness: float) -> ChangeSet: ps = { 'id' : pipe, 'roughness' : roughness } return set_pipe(network, ChangeSet(ps)) @app.post("/setpipeminorloss/",response_model=None) async def fastapi_set_pipe_minor_loss(network: str, pipe: str, minor_loss: float) -> ChangeSet: ps = { 'id' : pipe, 'minor_loss' : minor_loss } return set_pipe(network, ChangeSet(ps)) @app.post("/setpipestatus/",response_model=None) async def fastapi_set_pipe_status(network: str, pipe: str, status: str) -> ChangeSet: ps = { 'id' : pipe, 'status' : status } print(status) print(ps) ret = set_pipe(network, ChangeSet(ps)) print(ret) return ret @app.get("/getpipeproperties/") async def fastapi_get_pipe_properties(network: str, pipe: str) -> dict[str, Any]: return get_pipe(network, pipe) # DingZQ, 2025-03-29 @app.get('/getallpipeproperties/') async def fastapi_get_all_pipe_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallpipeproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_pipes(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setpipeproperties/",response_model=None) async def fastapi_set_pipe_properties(network: str, pipe: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : pipe } | props return set_pipe(network, ChangeSet(ps)) ############################################################ # pump 4.[PUMPS] ############################################################ @app.get('/getpumpschema') async def fastapi_get_pump_schema(network: str) -> dict[str, dict[str, Any]]: return get_pump_schema(network) @app.post("/addpump/",response_model=None) async def fastapi_add_pump(network: str, pump: str, node1: str, node2: str, power: float = 0.0) -> ChangeSet: ps = { 'id' : pump, 'node1' : node1, 'node2' : node2, 'power' : power } return add_pump(network, ChangeSet(ps)) @app.post("/deletepump/",response_model=None) async def fastapi_delete_pump(network: str, pump: str) -> ChangeSet: ps = { 'id' : pump } return delete_pump(network, ChangeSet(ps)) @app.get("/getpumpnode1/") async def fastapi_get_pump_node1(network: str, pump: str) -> str | None: ps = get_pump(network, pump) return ps['node1'] @app.get("/getpumpnode2/") async def fastapi_get_pump_node2(network: str, pump: str) -> str | None: ps = get_pump(network, pump) return ps['node2'] @app.post("/setpumpnode1/",response_model=None) async def fastapi_set_pump_node1(network: str, pump: str, node1: str) -> ChangeSet: ps = { 'id' : pump, 'node1' : node1 } return set_pump(network, ChangeSet(ps)) @app.post("/setpumpnode2/",response_model=None) async def fastapi_set_pump_node2(network: str, pump: str, node2: str) -> ChangeSet: ps = { 'id' : pump, 'node2' : node2 } return set_pump(network, ChangeSet(ps)) @app.get("/getpumpproperties/") async def fastapi_get_pump_properties(network: str, pump: str) -> dict[str, Any]: return get_pump(network, pump) # DingZQ, 2025-03-29 @app.get('/getallpumpproperties/') async def fastapi_get_all_pump_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallpumpproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_pumps(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setpumpproperties/",response_model=None) async def fastapi_set_pump_properties(network: str, pump: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : pump } | props return set_pump(network, ChangeSet(ps)) ############################################################ # valve 4.[VALVES] ############################################################ @app.get('/getvalveschema') async def fastapi_get_valve_schema(network: str) -> dict[str, dict[str, Any]]: return get_valve_schema(network) @app.post("/addvalve/",response_model=None) async def fastapi_add_valve(network: str, valve: str, node1: str, node2: str, diameter: float = 0, v_type: str = VALVES_TYPE_PRV, setting: float = 0, minor_loss: float = 0) -> ChangeSet: ps = { 'id' : valve, 'node1' : node1, 'node2' : node2, 'diameter' : diameter, 'v_type' : v_type, 'setting' : setting, 'minor_loss' : minor_loss } return add_valve(network, ChangeSet(ps)) @app.post("/deletevalve/",response_model=None) async def fastapi_delete_valve(network: str, valve: str) -> ChangeSet: ps = { 'id' : valve } return delete_valve(network, ChangeSet(ps)) @app.get("/getvalvenode1/") async def fastapi_get_valve_node1(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps['node1'] @app.get("/getvalvenode2/") async def fastapi_get_valve_node2(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps['node2'] @app.get("/getvalvediameter/") async def fastapi_get_valve_diameter(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps['diameter'] @app.get("/getvalvetype/") async def fastapi_get_valve_type(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps['type'] @app.get("/getvalvesetting/") async def fastapi_get_valve_setting(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps['setting'] @app.get("/getvalveminorloss/") async def fastapi_get_valve_minor_loss(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps['minor_loss'] @app.post("/setvalvenode1/",response_model=None) async def fastapi_set_valve_node1(network: str, valve: str, node1: str) -> ChangeSet: ps = { 'id' : valve, 'node1' : node1 } return set_valve(network, ChangeSet(ps)) @app.post("/setvalvenode2/",response_model=None) async def fastapi_set_valve_node2(network: str, valve: str, node2: str) -> ChangeSet: ps = { 'id' : valve, 'node2' : node2 } return set_valve(network, ChangeSet(ps)) @app.post("/setvalvenodediameter/",response_model=None) async def fastapi_set_valve_diameter(network: str, valve: str, diameter: float) -> ChangeSet: ps = { 'id' : valve, 'diameter' : diameter } return set_valve(network, ChangeSet(ps)) @app.post("/setvalvetype/",response_model=None) async def fastapi_set_valve_type(network: str, valve: str, type: str) -> ChangeSet: ps = { 'id' : valve, 'type' : type } return set_valve(network, ChangeSet(ps)) @app.post("/setvalvesetting/",response_model=None) async def fastapi_set_valve_setting(network: str, valve: str, setting: float) -> ChangeSet: ps = { 'id' : valve, 'setting' : setting } return set_valve(network, ChangeSet(ps)) @app.get("/getvalveproperties/") async def fastapi_get_valve_properties(network: str, valve: str) -> dict[str, Any]: return get_valve(network, valve) # DingZQ, 2025-03-29 @app.get('/getallvalveproperties/') async def fastapi_get_all_valve_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallvalveproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_valves(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setvalveproperties/",response_model=None) async def fastapi_set_valve_properties(network: str, valve: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : valve } | props return set_valve(network, ChangeSet(ps)) # node & link @app.post("/deletenode/",response_model=None) async def fastapi_delete_node(network: str, node: str) -> ChangeSet: ps = {'id' : node} if is_junction(network, node): return delete_junction(network, ChangeSet(ps)) elif is_reservoir(network, node): return delete_reservoir(network, ChangeSet(ps)) elif is_tank(network, node): return delete_tank(network, ChangeSet(ps)) @app.post("/deletelink/",response_model=None) async def fastapi_delete_link(network: str, link: str) -> ChangeSet: ps = {'id' : link} if is_pipe(network, link): return delete_pipe(network, ChangeSet(ps)) elif is_pump(network, link): return delete_pump(network, ChangeSet(ps)) elif is_valve(network, link): return delete_valve(network, ChangeSet(ps)) ############################################################ # tag 8.[TAGS] ############################################################ # # TAG_TYPE_NODE = api.TAG_TYPE_NODE # TAG_TYPE_LINK = api.TAG_TYPE_LINK # @app.get('/gettagschema/') async def fastapi_get_tag_schema(network: str) -> dict[str, dict[str, Any]]: return get_tag_schema(network) @app.get('/gettag/') async def fastapi_get_tag(network: str, t_type: str, id: str) -> dict[str, Any]: return get_tag(network, t_type, id) @app.get('/gettags/') async def fastapi_get_tags(network: str) -> list[dict[str, Any]]: tags = get_tags(network) print(tags) return tags # example: # set_tag(p, ChangeSet({'t_type': TAG_TYPE_NODE, 'id': 'j1', 'tag': 'j1t' })) # set_tag(p, ChangeSet({'t_type': TAG_TYPE_LINK, 'id': 'p0', 'tag': 'p0t' })) @app.post('/settag/',response_model=None) async def fastapi_set_tag(network: str, req: Request) -> ChangeSet: props = await req.json() return set_tag(network, ChangeSet(props)) ############################################################ # demand 9.[DEMANDS] ############################################################ @app.get('/getdemandschema') async def fastapi_get_demand_schema(network: str) -> dict[str, dict[str, Any]]: return get_demand_schema(network) @app.get("/getdemandproperties/") async def fastapi_get_demand_properties(network: str, junction: str) -> dict[str, Any]: return get_demand(network, junction) # example: set_demand(p, ChangeSet({'junction': 'j1', 'demands': [{'demand': 10.0, 'pattern': None, 'category': 'x'}, {'demand': 20.0, 'pattern': None, 'category': None}]})) @app.post("/setdemandproperties/",response_model=None) async def fastapi_set_demand_properties(network: str, junction: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'junction' : junction } | props return set_demand(network, ChangeSet(ps)) ############################################################ # status 10.[STATUS] init_status ############################################################ @app.get('/getstatusschema') async def fastapi_get_status_schema(network: str) -> dict[str, dict[str, Any]]: return get_status_schema(network) @app.get("/getstatus/") async def fastapi_get_status(network: str, link: str) -> dict[str, Any]: return get_status(network, link) # example: set_status(p, ChangeSet({'link': 'p0', 'status': LINK_STATUS_OPEN, 'setting': 10.0})) @app.post("/setstatus/",response_model=None) async def fastapi_set_status_properties(network: str, link: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'link' : link } | props return set_status(network, ChangeSet(ps)) ############################################################ # pattern 11.[PATTERNS] ############################################################ @app.get('/getpatternschema') async def fastapi_get_pattern_schema(network: str) -> dict[str, dict[str, Any]]: return get_pattern_schema(network) @app.post("/addpattern/",response_model=None) async def fastapi_add_pattern(network: str, pattern: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : pattern, } | props return add_pattern(network, ChangeSet(ps)) @app.post("/deletepattern/",response_model=None) async def fastapi_delete_pattern(network: str, pattern: str) -> ChangeSet: ps = { 'id' : pattern } return delete_pattern(network, ChangeSet(ps)) @app.get("/getpatternproperties/") async def fastapi_get_pattern_properties(network: str, pattern: str) -> dict[str, Any]: return get_pattern(network, pattern) # example: set_pattern(p, ChangeSet({'id' : 'p0', 'factors': [1.0, 2.0, 3.0]})) @app.post("/setpatternproperties/",response_model=None) async def fastapi_set_pattern_properties(network: str, pattern: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : pattern } | props return set_pattern(network, ChangeSet(ps)) ############################################################ # curve 12.[CURVES] ############################################################ @app.get('/getcurveschema') async def fastapi_get_curve_schema(network: str) -> dict[str, dict[str, Any]]: return get_curve_schema(network) @app.post("/addcurve/",response_model=None) async def fastapi_add_curve(network: str, curve: str, req: Request) -> ChangeSet: props = await req.json() print(props) ps = { 'id' : curve, } | props print(ps) return add_curve(network, ChangeSet(ps)) @app.post("/deletecurve/",response_model=None) async def fastapi_delete_curve(network: str, curve: str) -> ChangeSet: ps = { 'id' : curve } return delete_curve(network, ChangeSet(ps)) @app.get("/getcurveproperties/") async def fastapi_get_curve_properties(network: str, curve: str) -> dict[str, Any]: return get_curve(network, curve) # example: set_curve(p, ChangeSet({'id' : 'c0', 'c_type' : CURVE_TYPE_PUMP, 'coords': [{'x': 1.0, 'y': 2.0}, {'x': 2.0, 'y': 1.0}]})) @app.post("/setcurveproperties/",response_model=None) async def fastapi_set_curve_properties(network: str, curve: str, req: Request) -> ChangeSet: props = await req.json() # c_type放到request中 ps = { 'id' : curve } | props return set_curve(network, ChangeSet(ps)) ############################################################ # control 13.[CONTROLS] ############################################################ @app.get('/getcontrolschema/') async def fastapi_get_control_schema(network: str) -> dict[str, dict[str, Any]]: return get_control_schema(network) @app.get("/getcontrolproperties/") async def fastapi_get_control_properties(network: str) -> dict[str, Any]: return get_control(network) # example: set_control(p, ChangeSet({'control': 'x'})) @app.post("/setcontrolproperties/",response_model=None) async def fastapi_set_control_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_control(network, ChangeSet(props)) ############################################################ # rule 14.[RULES] ############################################################ @app.get("/getruleschema/") async def fastapi_get_rule_schema(network: str) -> dict[str, dict[str, Any]]: return get_rule_schema(network) @app.get("/getruleproperties/") async def fastapi_get_rule_properties(network: str) -> dict[str, Any]: return get_rule(network) # example: set_rule(p, ChangeSet({'rule': 'x'})) @app.post("/setruleproperties/",response_model=None) async def fastapi_set_rule_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_rule(network, ChangeSet(props)) ############################################################ # energy 15.[ENERGY] ############################################################ @app.get("/getenergyschema/") async def fastapi_get_energy_schema(network: str) -> dict[str, dict[str, Any]]: return get_energy_schema(network) @app.get("/getenergyproperties/") async def fastapi_get_energy_properties(network: str) -> dict[str, Any]: return get_energy(network) @app.post("/setenergyproperties/",response_model=None) async def fastapi_set_energy_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_energy(network, ChangeSet(props)) @app.get("/getpumpenergyschema/") async def fastapi_get_pump_energy_schema(network: str) -> dict[str, dict[str, Any]]: return get_pump_energy_schema(network) @app.get("/getpumpenergyproperties//") async def fastapi_get_pump_energy_proeprties(network: str, pump: str) -> dict[str, Any]: return get_pump_energy(network, pump) @app.get("/setpumpenergyproperties//",response_model=None) async def fastapi_set_pump_energy_properties(network: str, pump: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'id' : pump } | props return set_pump_energy(network, ChangeSet(ps)) ############################################################ # emitter 16.[EMITTERS] ############################################################ @app.get('/getemitterschema') async def fastapi_get_emitter_schema(network: str) -> dict[str, dict[str, Any]]: return get_emitter_schema(network) @app.get("/getemitterproperties/") async def fastapi_get_emitter_properties(network: str, junction: str) -> dict[str, Any]: return get_emitter(network, junction) # example: set_emitter(p, ChangeSet({'junction': 'j1', 'coefficient': 10.0})) @app.post("/setemitterproperties/",response_model=None) async def fastapi_set_emitter_properties(network: str, junction: str, req: Request) -> ChangeSet: props = await req.json() ps = { 'junction' : junction } | props return set_emitter(network, ChangeSet(ps)) ############################################################ # quality 17.[QUALITY] ############################################################ @app.get('/getqualityschema/') async def fastapi_get_quality_schema(network: str) -> dict[str, dict[str, Any]]: return get_quality_schema(network) @app.get('/getqualityproperties/') async def fastapi_get_quality_properties(network: str, node: str) -> dict[str, Any]: return get_quality(network, node) # example: set_quality(p, ChangeSet({'node': 'j1', 'quality': 10.0})) @app.post("/setqualityproperties/",response_model=None) async def fastapi_set_quality_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_quality(network, ChangeSet(props)) ############################################################ # source 18.[SOURCES] ############################################################ @app.get('/getsourcechema/') async def fastapi_get_source_schema(network: str) -> dict[str, dict[str, Any]]: return get_source_schema(network) @app.get('/getsource/') async def fastapi_get_source(network: str, node: str) -> dict[str, Any]: return get_source(network, node) @app.post('/setsource/',response_model=None) async def fastapi_set_source(network: str, req: Request) -> ChangeSet: props = await req.json() return set_source(network, ChangeSet(props)) # example: add_source(p, ChangeSet({'node': 'j0', 's_type': SOURCE_TYPE_CONCEN, 'strength': 10.0, 'pattern': 'p0'})) @app.post('/addsource/',response_model=None) async def fastapi_add_source(network: str, req: Request) -> ChangeSet: props = await req.json() return add_source(network, ChangeSet(props)) @app.post('/deletesource/',response_model=None) async def fastapi_delete_source(network: str, node: str) -> ChangeSet: props = { 'node': node } return delete_source(network, ChangeSet(props)) ############################################################ # reaction 19.[REACTIONS] ############################################################ @app.get('/getreactionschema/') async def fastapi_get_reaction_schema(network: str) -> dict[str, dict[str, Any]]: return get_reaction_schema(network) @app.get('/getreaction/') async def fastapi_get_reaction(network: str) -> dict[str, Any]: return get_reaction(network) @app.post('/setreaction/',response_model=None) # set_reaction(p, ChangeSet({ 'ORDER BULK' : '10' })) async def fastapi_set_reaction(network: str, req: Request) -> ChangeSet: props = await req.json() return set_reaction(network, ChangeSet(props)) @app.get('/getpipereactionschema/') async def fastapi_get_pipe_reaction_schema(network: str) -> dict[str, dict[str, Any]]: return get_pipe_reaction_schema(network) @app.get('/getpipereaction/') async def fastapi_get_pipe_reaction(network: str, pipe: str) -> dict[str, Any]: return get_pipe_reaction(network, pipe) @app.post('/setpipereaction/',response_model=None) async def fastapi_set_pipe_reaction(network: str, req: Request) -> ChangeSet: props = await req.json() return set_pipe_reaction(network, ChangeSet(props)) @app.get('/gettankreactionschema/') async def fastapi_get_tank_reaction_schema(network: str) -> dict[str, dict[str, Any]]: return get_tank_reaction_schema(network) @app.get('/gettankreaction/') async def fastapi_get_tank_reaction(network: str, tank: str) -> dict[str, Any]: return get_tank_reaction(network, tank) @app.post('/settankreaction/',response_model=None) async def fastapi_set_tank_reaction(network: str, req: Request) -> ChangeSet: props = await req.json() return set_tank_reaction(network, ChangeSet(props)) ############################################################ # mixing 20.[MIXING] ############################################################ @app.get('/getmixingschema/') async def fastapi_get_mixing_schema(network: str) -> dict[str, dict[str, Any]]: return get_mixing_schema(network) @app.get('/getmixing/') async def fastapi_get_mixing(network: str, tank: str) -> dict[str, Any]: return get_mixing(network, tank) @app.post('/setmixing/',response_model=None) async def fastapi_set_mixing(network: str, req: Request) -> ChangeSet: props = await req.json() return api.set_mixing(network, ChangeSet(props)) # example: add_mixing(p, ChangeSet({'tank': 't0', 'model': MIXING_MODEL_MIXED, 'value': 10.0})) @app.post('/addmixing/',response_model=None) async def fastapi_add_mixing(network: str, req: Request) -> ChangeSet: props = await req.json() return add_mixing(network, ChangeSet(props)) @app.post('/deletemixing/',response_model=None) async def fastapi_delete_mixing(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_mixing(network, ChangeSet(props)) ############################################################ # time 21.[TIME] ############################################################ @app.get('/gettimeschema') async def fastapi_get_time_schema(network: str) -> dict[str, dict[str, Any]]: return get_time_schema(network) @app.get("/gettimeproperties/") async def fastapi_get_time_properties(network: str) -> dict[str, Any]: return get_time(network) @app.post("/settimeproperties/",response_model=None) async def fastapi_set_time_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_time(network, ChangeSet(props)) ############################################################ # option 23.[OPTIONS] ############################################################ @app.get('/getoptionschema/') async def fastapi_get_option_schema(network: str) -> dict[str, dict[str, Any]]: return get_option_v3_schema(network) @app.get("/getoptionproperties/") async def fastapi_get_option_properties(network: str) -> dict[str, Any]: return get_option_v3(network) @app.post("/setoptionproperties/",response_model=None) async def fastapi_set_option_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_option_v3(network, ChangeSet(props)) ############################################################ # coord 24.[COORDINATES] ############################################################ @app.get("/getnodecoord/") async def fastapi_get_node_coord(network: str, node: str) -> dict[str, float] | None: return get_node_coord(network, node) # DingZQ, 2025-01-27, get all node coord/links # nodes: id:type:x:y # links: id:type:node1:node2 # node type: junction, reservoir, tank # link type: pipe, pump, valve @app.get("/getnetworkgeometries/") async def fastapi_get_network_geometries(network: str) -> dict[str, Any] | None: # 获取所有节点坐标# 缓存查询结果提高性能 global redis_client cache_key = f"getnetworkgeometries_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict coords = get_network_node_coords(network) nodes = [] for node_id, coord in coords.items(): nodes.append(f"{node_id}:{coord['type']}:{coord['x']}:{coord['y']}") links = get_network_link_nodes(network) # return list of scadas. scada : id, x, y # scadas = get_all_scada_elements(network) # data from WMH's scada info scadas = get_all_scada_info(network) results = { 'nodes': nodes, 'links': links, 'scadas': scadas } # 缓存查询结果提高性能 redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results # DingZQ, 2024-12-31, get major node coord # id:type:x:y # type: junction, reservoir, tank @app.get("/getmajornodecoords/") async def fastapi_get_major_node_coords(network: str, diameter: int) -> list[str] | None: start_time = time.time() coords = get_major_node_coords(network, diameter) end_time = time.time() logger.info("get_major_node_coords: %s, time: %s", coords, end_time - start_time) result = [] for node_id, coord in coords.items(): result.append(f"{node_id}:{coord['type']}:{coord['x']}:{coord['y']}") return result # DingZQ, 2025-01-03, get network in extent @app.get("/getnetworkinextent/") async def fastapi_get_network_in_extent(network: str, x1: float, y1: float, x2: float, y2: float) -> dict[str, Any] | None: nodes = api.get_nodes_in_extent(network, x1, y1, x2, y2) links = api.get_links_in_extent(network, x1, y1, x2, y2) return { 'nodes': nodes, 'links': links } # DingZQ, 2024-12-08, get all links' start and end node # link_id:link_type:node_id1:node_id2 @app.get("/getnetworklinknodes/") async def fastapi_get_network_link_nodes(network: str) -> list[str] | None: return get_network_link_nodes(network) # DingZQ 2024-12-31 # 获取直径大于800的管道 @app.get("/getmajorpipenodes/") async def fastapi_get_major_pipe_nodes(network: str, diameter: int) -> list[str] | None: start_time = time.time() result = get_major_pipe_nodes(network, diameter) end_time = time.time() logger.info("get_major_pipe_nodes: %s, time: %s", result, end_time - start_time) return result ############################################################ # vertex 25.[VERTICES] ############################################################ @app.get('/getvertexschema/') async def fastapi_get_vertex_schema(network: str) -> dict[str, dict[str, Any]]: return get_vertex_schema(network) @app.get('/getvertexproperties/') async def fastapi_get_vertex_properties(network: str, link: str) -> dict[str, Any]: return get_vertex(network, link) # set_vertex(p, ChangeSet({'link' : 'p0', 'coords': [{'x': 1.0, 'y': 2.0}, {'x': 2.0, 'y': 1.0}]})) @app.post('/setvertexproperties/',response_model=None) async def fastapi_set_vertex_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_vertex(network, ChangeSet(props)) @app.post('/addvertex/',response_model=None) async def fastapi_add_vertex(network: str, req: Request) -> ChangeSet: props = await req.json() return add_vertex(network, ChangeSet(props)) @app.post('/deletevertex/',response_model=None) async def fastapi_delete_vertex(network: str, req: Request) -> ChangeSet: props = await req.json() return api.delete_vertex(network, ChangeSet(props)) @app.get('/getallvertexlinks/', response_class = PlainTextResponse) async def fastapi_get_all_vertex_links(network: str) -> list[str]: return json.dumps(get_all_vertex_links(network)) @app.get('/getallvertices/', response_class = PlainTextResponse) async def fastapi_get_all_vertices(network: str) -> list[dict[str, Any]]: return json.dumps(get_all_vertices(network)) ############################################################ # label 26.[LABELS] ############################################################ @app.get('/getlabelschema/') async def fastapi_get_label_schema(network: str) -> dict[str, dict[str, Any]]: return get_label_schema(network) @app.get('/getlabelproperties/') async def fastapi_get_label_properties(network: str, x: float, y: float) -> dict[str, Any]: return get_label(network, x, y) @app.post('/setlabelproperties/',response_model=None) async def fastapi_set_label_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_label(network, ChangeSet(props)) @app.post('/addlabel/',response_model=None) async def fastapi_add_label(network: str, req: Request) -> ChangeSet: props = await req.json() return add_label(network, ChangeSet(props)) @app.post('/deletelabel/',response_model=None) async def fastapi_delete_label(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_label(network, ChangeSet(props)) ############################################################ # backdrop 27.[BACKDROP] ############################################################ @app.get('/getbackdropschema/') async def fastapi_get_backdrop_schema(network: str) -> dict[str, dict[str, Any]]: return get_backdrop_schema(network) @app.get('/getbackdropproperties/') async def fastapi_get_backdrop_properties(network: str) -> dict[str, Any]: return get_backdrop(network) @app.post('/setbackdropproperties/',response_model=None) async def fastapi_set_backdrop_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_backdrop(network, ChangeSet(props)) ############################################################ # scada_device 29 ############################################################ @app.get('/getscadadeviceschema/') async def fastapi_get_scada_device_schema(network: str) -> dict[str, dict[str, Any]]: return get_scada_device_schema(network) @app.get('/getscadadevice/') async def fastapi_get_scada_device(network: str, id: str) -> dict[str, Any]: return get_scada_device(network, id) @app.post('/setscadadevice/',response_model=None) async def fastapi_set_scada_device(network: str, req: Request) -> ChangeSet: props = await req.json() return set_scada_device(network, ChangeSet(props)) @app.post('/addscadadevice/',response_model=None) async def fastapi_add_scada_device(network: str, req: Request) -> ChangeSet: props = await req.json() return add_scada_device(network, ChangeSet(props)) @app.post('/deletescadadevice/',response_model=None) async def fastapi_delete_scada_device(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_scada_device(network, ChangeSet(props)) @app.post('/cleanscadadevice/',response_model=None) async def fastapi_clean_scada_device(network: str) -> ChangeSet: return clean_scada_device(network) @app.get('/getallscadadeviceids/') async def fastapi_get_all_scada_device_ids(network: str) -> list[str]: return get_all_scada_device_ids(network) @app.get('/getallscadadevices/', response_class = PlainTextResponse) async def fastapi_get_all_scada_devices(network: str) -> list[dict[str, Any]]: return json.dumps(get_all_scada_devices(network)) ############################################################ # scada_device_data 30 ############################################################ @app.get('/getscadadevicedataschema/') async def fastapi_get_scada_device_data_schema(network: str) -> dict[str, dict[str, Any]]: return get_scada_device_data_schema(network) @app.get('/getscadadevicedata/') async def fastapi_get_scada_device_data(network: str, id: str) -> dict[str, Any]: return get_scada_device_data(network, id) # example: set_scada_device_data(p, ChangeSet({'device_id': 'sm_device', 'data': [{ 'time': '2023-02-10 00:02:22', 'value': 100.0 }, { 'time': '2023-02-10 00:03:22', 'value': 200.0 }]})) # time format must be 'YYYY-MM-DD HH:MM:SS' @app.post('/setscadadevicedata/',response_model=None) async def fastapi_set_scada_device_data(network: str, req: Request) -> ChangeSet: props = await req.json() return set_scada_device_data(network, ChangeSet(props)) # example: add_scada_device_data(p, ChangeSet({'device_id': 'sm_device', 'time': '2023-02-10 00:02:22', 'value': 100.0})) @app.post('/addscadadevicedata/',response_model=None) async def fastapi_add_scada_device_data(network: str, req: Request) -> ChangeSet: props = await req.json() return add_scada_device_data(network, ChangeSet(props)) # example: delete_scada_device_data(p, ChangeSet({'device_id': 'sm_device', 'time': '2023-02-12 00:02:22'})) @app.post('/deletescadadevicedata/',response_model=None) async def fastapi_delete_scada_device_data(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_scada_device_data(network, ChangeSet(props)) @app.post('/cleanscadadevicedata/',response_model=None) async def fastapi_clean_scada_device_data(network: str) -> ChangeSet: return clean_scada_device_data(network) ############################################################ # scada_element 31 ############################################################ @app.get('/getscadaelementschema/') async def fastapi_get_scada_element_schema(network: str) -> dict[str, dict[str, Any]]: return get_scada_element_schema(network) @app.get('/getscadaelements/') async def fastapi_get_scada_elements(network: str) -> list[str]: return get_all_scada_elements(network) @app.get('/getscadaelement/') async def fastapi_get_scada_element(network: str, id: str) -> dict[str, Any]: return get_scada_element(network, id) @app.post('/setscadaelement/',response_model=None) async def fastapi_set_scada_element(network: str, req: Request) -> ChangeSet: props = await req.json() return set_scada_element(network, ChangeSet(props)) @app.post('/addscadaelement/',response_model=None) async def fastapi_add_scada_element(network: str, req: Request) -> ChangeSet: props = await req.json() return add_scada_element(network, ChangeSet(props)) @app.post('/deletescadaelement/',response_model=None) async def fastapi_delete_scada_element(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_scada_element(network, ChangeSet(props)) @app.post('/cleanscadaelement/',response_model=None) async def fastapi_clean_scada_element(network: str) -> ChangeSet: return clean_scada_element(network) ############################################################ # general_region 32 ############################################################ @app.get('/getregionschema/') async def fastapi_get_region_schema(network: str) -> dict[str, dict[str, Any]]: return get_region_schema(network) @app.get('/getregion/') async def fastapi_get_region(network: str, id: str) -> dict[str, Any]: return get_region(network, id) @app.post('/setregion/',response_model=None) async def fastapi_set_region(network : str, req: Request) -> ChangeSet: props = await req.json() return set_region(network, ChangeSet(props)) # example: add_region(p, ChangeSet({'id': 'r', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]})) @app.post('/addregion/',response_model=None) async def fastapi_add_region(network: str, req: Request) -> ChangeSet: props = await req.json() return add_region(network, ChangeSet(props)) @app.post('/deleteregion/',response_model=None) async def fastapi_delete_region(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_region(network, ChangeSet(props)) ############################################################ # district_metering_area 33 ############################################################ @app.get('/calculatedistrictmeteringareafornodes/') async def fastapi_calculate_district_metering_area_for_nodes(network: str, req: Request) -> list[list[str]]: props = await req.json() nodes = props['nodes'] part_count = props['part_count'] part_type = props['part_type'] return calculate_district_metering_area_for_nodes(network, nodes, part_count, part_type) @app.get('/calculatedistrictmeteringareaforregion/') async def fastapi_calculate_district_metering_area_for_region(network: str, req: Request) -> list[list[str]]: props = await req.json() region = props['region'] part_count = props['part_count'] part_type = props['part_type'] return calculate_district_metering_area_for_region(network, region, part_count, part_type) @app.get('/calculatedistrictmeteringareafornetwork/') async def fastapi_calculate_district_metering_area_for_network(network: str, req: Request) -> list[list[str]]: props = await req.json() part_count = props['part_count'] part_type = props['part_type'] return calculate_district_metering_area_for_network(network, part_count, part_type) @app.get('/getdistrictmeteringareaschema/') async def fastapi_get_district_metering_area_schema(network: str) -> dict[str, dict[str, Any]]: return get_district_metering_area_schema(network) @app.get('/getdistrictmeteringarea/') async def fastapi_get_district_metering_area(network: str, id: str) -> dict[str, Any]: return get_district_metering_area(network, id) @app.post('/setdistrictmeteringarea/',response_model=None) async def fastapi_set_district_metering_area(network: str, req: Request) -> ChangeSet: props = await req.json() return set_district_metering_area(network, ChangeSet(props)) @app.post('/adddistrictmeteringarea/',response_model=None) async def fastapi_add_district_metering_area(network: str, req: Request) -> ChangeSet: props = await req.json() # boundary should be [(x,y), (x,y)] boundary = props['boundary'] newBoundary = [] for pt in boundary: newBoundary.append((pt[0], pt[1])) props['boundary'] = newBoundary return add_district_metering_area(network, ChangeSet(props)) @app.post('/deletedistrictmeteringarea/',response_model=None) async def fastapi_delete_district_metering_area(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_district_metering_area(network, ChangeSet(props)) @app.get('/getalldistrictmeteringareaids/') async def fastapi_get_all_district_metering_area_ids(network: str) -> list[str]: return get_all_district_metering_area_ids(network) @app.get('/getalldistrictmeteringareas/') async def getalldistrictmeteringareas(network: str) -> list[dict[str, Any]]: return get_all_district_metering_areas(network) @app.post('/generatedistrictmeteringarea/',response_model=None) async def fastapi_generate_district_metering_area(network: str, part_count: int, part_type: int, inflate_delta: float) -> ChangeSet: return generate_district_metering_area(network, part_count, part_type, inflate_delta) @app.post('/generatesubdistrictmeteringarea/',response_model=None) async def fastapi_generate_sub_district_metering_area(network: str, dma: str, part_count: int, part_type: int, inflate_delta: float) -> ChangeSet: print(network) print(dma) print(part_count) print(part_type) print(inflate_delta) return generate_sub_district_metering_area(network, dma, part_count, part_type, inflate_delta) ############################################################ # service_area 34 ############################################################ @app.get('/calculateservicearea/') async def fastapi_calculate_service_area(network: str, time_index: int) -> dict[str, Any]: return calculate_service_area(network, time_index) @app.get('/getserviceareaschema/') async def fastapi_get_service_area_schema(network: str) -> dict[str, dict[str, Any]]: return get_service_area_schema(network) @app.get('/getservicearea/') async def fastapi_get_service_area(network: str, id: str) -> dict[str, Any]: return get_service_area(network, id) @app.post('/setservicearea/',response_model=None) async def fastapi_set_service_area(network: str, req: Request) -> ChangeSet: props = await req.json() return set_service_area(network, ChangeSet(props)) @app.post('/addservicearea/',response_model=None) async def fastapi_add_service_area(network: str, req: Request) -> ChangeSet: props = await req.json() return add_service_area(network, ChangeSet(props)) @app.post('/deleteservicearea/',response_model=None) async def fastapi_delete_service_area(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_service_area(network, ChangeSet(props)) @app.get('/getallserviceareas/') async def fastapi_get_all_service_areas(network: str) -> list[dict[str, Any]]: return get_all_service_areas(network) @app.post('/generateservicearea/',response_model=None) async def fastapi_generate_service_area(network: str, inflate_delta: float) -> ChangeSet: return generate_service_area(network, inflate_delta) ############################################################ # virtual_district 35 ############################################################ @app.get('/calculatevirtualdistrict/') async def fastapi_calculate_virtual_district(network: str, centers: list[str]) -> dict[str, list[Any]]: return calculate_virtual_district(network, centers) @app.get('/getvirtualdistrictschema/') async def fastapi_get_virtual_district_schema(network: str) -> dict[str, dict[str, Any]]: return get_virtual_district_schema(network) @app.get('/getvirtualdistrict/') async def fastapi_get_virtual_district(network: str, id: str) -> dict[str, Any]: return get_virtual_district(network, id) @app.post('/setvirtualdistrict/',response_model=None) async def fastapi_set_virtual_district(network: str, req: Request) -> ChangeSet: props = await req.json() return set_virtual_district(network, ChangeSet(props)) @app.post('/addvirtualdistrict/',response_model=None) async def fastapi_add_virtual_district(network: str, req: Request) -> ChangeSet: props = await req.json() return add_virtual_district(network, ChangeSet(props)) @app.post('/deletevirtualdistrict/',response_model=None) async def fastapi_delete_virtual_district(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_virtual_district(network, ChangeSet(props)) @app.get('/getallvirtualdistrict/') async def fastapi_get_all_virtual_district(network: str) -> list[dict[str, Any]]: return get_all_virtual_districts(network) @app.post('/generatevirtualdistrict/',response_model=None) async def fastapi_generate_virtual_district(network: str, inflate_delta: float, req: Request) -> ChangeSet: props = await req.json() return generate_virtual_district(network, props['centers'], inflate_delta) ############################################################ # water_distribution_area 36 ############################################################ @app.get('/calculatedemandtonodes/') async def fastapi_calculate_demand_to_nodes(network: str, req: Request) -> dict[str, float]: props = await req.json() demand = props['demand'] nodes = props['nodes'] return calculate_demand_to_nodes(network, demand, nodes) @app.get('/calculatedemandtoregion/') async def fastapi_calculate_demand_to_region(network: str, req: Request) -> dict[str, float]: props = await req.json() demand = props['demand'] region = props['region'] return calculate_demand_to_region(network, demand, region) @app.get('/calculatedemandtonetwork/') async def fastapi_calculate_demand_to_network(network: str, demand: float) -> dict[str, float]: return calculate_demand_to_network(network, demand) ########################################################### # scada_info 38 || written by WMH ############################################################ @app.get('/getscadainfoschema/') async def fastapi_get_scada_info_schema(network: str) -> dict[str, dict[str, Any]]: return get_scada_info_schema(network) @app.get('/getscadainfo/') async def fastapi_get_scada_info(network: str, id: str) -> dict[str, float]: return get_scada_info(network, id) @app.get('/getallscadainfo/') async def fastapi_get_all_scada_info(network: str) -> list[dict[str, float]]: return get_all_scada_info(network) ########################################################### # user 39 ########################################################### @app.get('/getuserschema/') async def fastapi_get_user_schema(network: str) -> dict[str, dict[Any, Any]]: return get_user_schema(network) @app.get('/getuser/') async def fastapi_get_user(network: str, user_name: str) -> dict[Any, Any]: return get_user(network, user_name) @app.get('/getallusers/') async def fastapi_get_all_users(network: str) -> list[dict[Any, Any]]: return get_all_users(network) ############################################################ # scheme 40 ############################################################ @app.get('/getschemeschema/') async def fastapi_get_scheme_schema(network: str) -> dict[str, dict[Any, Any]]: return get_scheme_schema(network) @app.get('/getscheme/') async def fastapi_get_scheme(network: str, schema_name: str) -> dict[Any, Any]: return get_scheme(network, schema_name) @app.get('/getallschemes/') async def fastapi_get_all_schemes(network: str) -> list[dict[Any, Any]]: return get_all_schemes(network) ############################################################ # pipe_risk_probability 41 ############################################################ @app.get('/getpiperiskprobabilitynow/') async def fastapi_get_pipe_risk_probability_now(network: str, pipe_id: str) -> dict[str, Any]: return get_pipe_risk_probability_now(network, pipe_id) @app.get('/getpiperiskprobability/') async def fastapi_get_pipe_risk_probability(network: str, pipe_id: str) -> dict[str, Any]: return get_pipe_risk_probability(network, pipe_id) @app.get('/getpipesriskprobability/') async def fastapi_get_pipes_risk_probability(network: str, pipe_ids: str) -> list[dict[str, Any]]: pipeids = pipe_ids.split(',') return get_pipes_risk_probability(network, pipeids) @app.get('/getnetworkpiperiskprobabilitynow/') async def fastapi_get_network_pipe_risk_probability_now(network: str) -> list[dict[str, Any]]: return get_network_pipe_risk_probability_now(network) # 返回一个字典,key 是管道的 id,value 是管道的几何信息 # 几何信息是一个字典,包含 start 和 end 两个 key,value 是管道的起点和终点的坐标 # 例如: # "GSD240730154246A51D2C324D1A": { # "start": [ # 106.424759007, # 29.815104642 # ], # "end": [ # 106.424824186, # 29.814950582 # ] # }, @app.get('/getpiperiskprobabilitygeometries/') async def fastapi_get_pipe_risk_probability_geometries(network: str) -> dict[str, Any]: return get_pipe_risk_probability_geometries(network) # inp file @app.post("/uploadinp/", status_code=status.HTTP_200_OK) async def fastapi_upload_inp(afile: bytes, name: str ): filePath = inpDir + str(name) f = open(filePath, 'wb') f.write(afile) f.close() return True @app.get("/downloadinp/", status_code=status.HTTP_200_OK) async def fastapi_download_inp(name: str, response: Response): filePath = inpDir + name if os.path.exists(filePath): return FileResponse(filePath, media_type='application/octet-stream', filename="inp.inp") else: response.status_code = status.HTTP_400_BAD_REQUEST return True # DingZQ, 2024-12-28, convert v3 to v2 @app.get("/convertv3tov2/",response_model=None) async def fastapi_convert_v3_to_v2(req: Request) -> ChangeSet: network = 'v3Tov2' jo_root = await req.json() inp = jo_root['inp'] cs = convert_inp_v3_to_v2(inp) op = cs.operations[0] open_project(network) op['vertex'] = json.dumps(get_all_vertices(network)) op['scada'] = json.dumps(get_all_scada_elements(network)) op['dma'] = json.dumps(get_all_district_metering_areas(network)) op['sa'] = json.dumps(get_all_service_areas(network)) op['vd'] = json.dumps(get_all_virtual_districts(network)) op['legend'] = get_extension_data(network, 'legend') db = get_extension_data(network, 'scada_db') print(db) scada_db = '' if db: scada_db = db print(scada_db) op['scada_db'] = scada_db close_project(network) return cs @app.get("/getjson/") async def fastapi_get_json(): return JSONResponse( status_code = status.HTTP_400_BAD_REQUEST, content={ 'code': 400, 'message': "this is message", 'data': 123, } ) ############################################################ # DingZQ, 2024-12-09, Add sample API to return real time data/simulation result # influx db operation ############################################################ @app.get("/getrealtimedata/") async def fastapi_get_realtimedata(): data = [random.randint(0, 100) for _ in range(100)] return data @app.get("/getsimulationresult/") async def fastapi_get_simulationresult(): data = [random.randint(0, 100) for _ in range(100)] return data # 下面几个query 函数,都是从 influxdb 中查询的,不与 network 绑定,用固定的network 名字 # DingZQ 2025-01-31 # def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> dict: @app.get("/querynodelatestrecordbyid/") async def fastapi_query_node_latest_record_by_id(id: str): return influxdb_api.query_latest_record_by_ID(id, type='node') @app.get("/querylinklatestrecordbyid/") async def fastapi_query_link_latest_record_by_id(id: str): return influxdb_api.query_latest_record_by_ID(id, type='link') # query scada @app.get("/queryscadalatestrecordbyid/") async def fastapi_query_scada_latest_record_by_id(id: str): return influxdb_api.query_latest_record_by_ID(id, type='scada') # def query_all_record_by_time(query_time: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> tuple: @app.get("/queryallrecordsbytime/") async def fastapi_query_all_records_by_time(querytime: str) -> dict[str, list]: results: tuple = influxdb_api.query_all_records_by_time(query_time=querytime) return { "nodes": results[0], "links": results[1] } @app.get("/queryallrecordsbydate/") async def fastapi_query_all_records_by_date(querydate: str) -> dict: # 缓存查询结果提高性能 global redis_client is_today_or_future = time_api.is_today_or_future(querydate) logger.info(f"isToday or future: {is_today_or_future}") # 今天的不要去缓存 if not is_today_or_future: cache_key = f"queryallrecordsbydate_{querydate}" logger.info(f"cache_key: {cache_key}") data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 results = msgpack.unpackb(data, object_hook=decode_datetime) logger.info(f"return from cache redis") return results logger.info(f"query from influxdb") nodes_links: tuple = influxdb_api.query_all_records_by_date(query_date=querydate) results = { "nodes": nodes_links[0], "links": nodes_links[1] } # 今天的不要去缓存 if not is_today_or_future: logger.info(f"save to cache redis") redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) logger.info(f"return results") return results @app.get("/queryallrecordsbytimerange/") async def fastapi_query_all_records_by_time_range(starttime: str, endtime: str) -> dict[str, list]: # 缓存查询结果提高性能 global redis_client # 今天的不要去缓存 if not time_api.is_today_or_future(starttime): cache_key = f"queryallrecordsbytimerange_{starttime}_{endtime}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict nodes_links: tuple = influxdb_api.query_all_records_by_time_range(starttime=starttime, endtime=endtime) results = { "nodes": nodes_links[0], "links": nodes_links[1] } # 今天的不要去缓存 if not time_api.is_today_or_future(starttime): redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results #2025-03-15, DingZQ @app.get("/queryallrecordsbydatewithtype/") async def fastapi_query_all_records_by_date_with_type(querydate: str, querytype: str) -> list: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results @app.get("/queryallrecordsbyidsdatetype/") async def fastapi_query_all_records_by_ids_date_type(ids:str, querydate: str, querytype: str) -> list: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}" data = redis_client.get(cache_key) results = [] if data: # 使用自定义的反序列化函数 results = msgpack.unpackb(data, object_hook=decode_datetime) else: results = influxdb_api.query_all_records_by_date_with_type(query_date=querydate, query_type=querytype) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) query_ids = ids.split(",") e_results = py_linq.Enumerable(results) lst_results = e_results.where(lambda x: x['ID'] in query_ids).to_list() return lst_results # 查询指定日期、类型、属性的所有记录 # 返回 [{'time': '2024-01-01T00:00:00Z', 'ID': '1', 'value': 1.0}, {'time': '2024-01-01T00:00:00Z', 'ID': '2', 'value': 2.0}] @app.get("/queryallrecordsbydateproperty/") async def fastapi_query_all_records_by_date_property(querydate: str, querytype: str, property: str) -> list[dict]: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallrecordsbydateproperty_{querydate}_{querytype}_{property}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict result_dict = influxdb_api.query_all_record_by_date_property(query_date=querydate, type=querytype, property=property) packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) return result_dict # def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list: @app.get("/querynodecurvebyidpropertydaterange/") async def fastapi_query_node_curve_by_id_property_daterange(id: str, prop: str, startdate: str, enddate: str): return influxdb_api.query_curve_by_ID_property_daterange(id, type='node', property=prop, start_date=startdate, end_date=enddate) @app.get("/querylinkcurvebyidpropertydaterange/") async def fastapi_query_link_curve_by_id_property_daterange(id: str, prop: str, startdate: str, enddate: str): return influxdb_api.query_curve_by_ID_property_daterange(id, type='link', property=prop, start_date=startdate, end_date=enddate) # ids 用,隔开 # 返回 { 'id': value1, 'id2': value2 } # def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]: @app.get("/queryscadadatabydeviceidandtime/") async def fastapi_query_scada_data_by_device_id_and_time(ids: str, querytime: str): query_ids = ids.split(',') logger.info(querytime) return influxdb_api.query_SCADA_data_by_device_ID_and_time(query_ids_list=query_ids, query_time=querytime) @app.get("/queryscadadatabydeviceidandtimerange/") async def fastapi_query_scada_data_by_device_id_and_time_range(ids: str, starttime: str, endtime: str): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(',') return influxdb_api.query_SCADA_data_by_device_ID_and_timerange(query_ids_list=query_ids, start_time=starttime, end_time=endtime) # 查询指定时间范围内,多个SCADA设备的清洗后的数据 # DingZQ, 2025-04-19 @app.get("/querycleanedscadadatabydeviceidandtimerange/") async def fastapi_query_cleaned_scada_data_by_device_id_and_time_range(ids: str, starttime: str, endtime: str): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(',') return influxdb_api.query_cleaned_SCADA_data_by_device_ID_and_timerange(query_ids_list=query_ids, start_time=starttime, end_time=endtime) @app.get("/queryscadadatabydeviceidanddate/") async def fastapi_query_scada_data_by_device_id_and_date(ids: str, querydate: str): query_ids = ids.split(',') return influxdb_api.query_SCADA_data_by_device_ID_and_date(query_ids_list=query_ids, query_date=querydate) # DingZQ, 2025-03-08 # 返回所有SCADA设备在指定日期的所有记录 @app.get("/queryallscadarecordsbydate/") async def fastapi_query_all_scada_records_by_date(querydate: str): global redis_client is_today_or_future = time_api.is_today_or_future(querydate) logger.info(f"isToday or future: {is_today_or_future}") # 今天的不要去缓存 if not is_today_or_future: cache_key = f"queryallscadarecordsbydate_{querydate}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) logger.info(f"return from cache redis") return loaded_dict logger.info(f"query from influxdb") result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate) # 今天的不要去缓存 if not is_today_or_future: logger.info(f"save to cache redis") packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) logger.info(f"return results") return result_dict # DingZQ, 2025-03-15 # Scheme @app.get("/queryallschemeallrecords/") async def fastapi_query_all_scheme_all_records(schemetype: str, schemename: str, querydate: str) -> tuple: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallschemeallrecords_{schemetype}_{schemename}_{querydate}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results # DingZQ, 2025-03-21 # 缓存是用的queryallschemeallrecords的缓存 @app.get("/queryschemeallrecordsproperty/") async def fastapi_query_all_scheme_all_records_property(schemetype: str, schemename: str, querydate: str, querytype: str, queryproperty: str) -> list: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallschemeallrecords_{schemetype}_{schemename}_{querydate}" data = redis_client.get(cache_key) all_results = None if data: # 使用自定义的反序列化函数 all_results = msgpack.unpackb(data, object_hook=decode_datetime) else: all_results = influxdb_api.query_scheme_all_record(scheme_Type=schemetype, scheme_Name=schemename, query_date=querydate) packed = msgpack.packb(all_results, default=encode_datetime) redis_client.set(cache_key, packed) results = None if querytype == 'node': results = all_results[0] elif querytype == "link": results = all_results[1] return results @app.post("/clearrediskey/") async def fastapi_clear_redis_key(key: str): redis_client.delete(key) return True @app.post("/clearrediskeys/") async def fastapi_clear_redis_keys(keys: str): # delete keys contains the key matched_keys = redis_client.keys(f"*{keys}*") redis_client.delete(*matched_keys) return True @app.post("/clearallredis/") async def fastapi_clear_all_redis(): redis_client.flushdb() return True @app.get("/queryredis/") async def fastapi_query_redis(): return redis_client.keys("*") @app.get("/queryinfluxdbbuckets/") async def fastapi_query_influxdb_buckets(): return influxdb_api.query_buckets() @app.get("/queryinfluxdbbucketmeasurements/") async def fastapi_query_influxdb_bucket_measurements(bucket: str): return influxdb_api.query_measurements(bucket=bucket) # DingZQ, 2024-12-31, generate openapi.json def generate_openapi_json(): openapi_json_path = "openapi.json" with open(openapi_json_path, "w") as file: json.dump(app.openapi(), file, indent=4) ############################################################ # real_time api 37 # example: http://127.0.0.1:8000/runsimulation?network=beibeizone&start_time=2024-04-01T08:00:00Z ############################################################ # 必须用这个PlainTextResponse,不然每个key都有引号 # @app.get("/runsimulation/", response_class = PlainTextResponse) # async def fastapi_run_project(network: str,start_time:str,end_time=None) -> str: # filename = 'c:/lock.simulation' # filename2 = 'c:/lock.simulation2' # if os.path.exists(filename2): # print('file exists') # raise HTTPException(status_code=409, detail="is in simulation") # else: # print('file doesnt exists') # #os.rename(filename, filename2) # result = run_simulation(network,start_time,end_time) # #os.rename(filename2, filename) # return result ############################################################ # real_time api 37 # example: http://127.0.0.1:8000/runsimulation?network=beibeizone&start_time=2024-04-01T08:00:00Z ############################################################ # 必须用这个PlainTextResponse,不然每个key都有引号 @app.get("/runsimulation/", response_class = PlainTextResponse) async def fastapi_run_project(network: str,start_time:str,end_time=None) -> str: filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = run_simulation_ex(name=network, simulation_type='realtime', start_datetime=start_time, end_datetime=end_time) #os.rename(filename2, filename) return result ############################################################ # real_time api 37.5 # example: # response = requests.post("http://127.0.0.1:8000/runsimulation", # data=json.dumps({'network': 'bb_server', 'simulation_type': 'extended', # 'start_time': '2024-05-17T09:30:00Z', 'duration': 900, # 'pump_control': {'1#': [0, 0], '2#': [1, 1], '3#': [1, 1], '4#': [1, 0], # '5#': [45, 43], '6#': [0, 0], '7#': [0, 0]}}), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ # class RunSimuItem(BaseModel): # network: str # simulation_type: str # start_time: str # end_time: Optional[str] = None # duration: Optional[int] = 900 # pump_control: Optional[dict] = None # # # @app.post("/runsimulation/") # async def fastapi_run_project(item: RunSimuItem) -> str: # item = item.dict() # filename = 'c:/lock.simulation' # filename2 = 'c:/lock.simulation2' # if os.path.exists(filename2): # print('file exists') # raise HTTPException(status_code=409, detail="is in simulation") # else: # print('file doesnt exists') # #os.rename(filename, filename2) # result = run_simulation_ex(item['network'], item['simulation_type'], # item['start_time'], item['end_time'], # item['duration'], item['pump_control']) # #os.rename(filename2, filename) # return result ############################################################ # burst analysis api 38 #example:http://127.0.0.1:8000/burst_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&burst_ID=ZBBGXSZW000001&burst_size=200&duration=1800 ############################################################ # @app.get("/burst_analysis/", response_class = PlainTextResponse) # async def fastapi_burst_analysis(network: str,start_time:str,burst_ID:str,burst_size:float,burst_flow:float=None,duration:int=None) -> str: # filename = 'c:/lock.simulation' # filename2 = 'c:/lock.simulation2' # if os.path.exists(filename2): # print('file exists') # raise HTTPException(status_code=409, detail="is in simulation") # else: # print('file doesnt exists') # #os.rename(filename, filename2) # result = burst_analysis(network,start_time,burst_ID,burst_size,burst_flow,duration) # #os.rename(filename2, filename) # return result ############################################################ # burst analysis api 38.5 # example: # response = requests.post("http://127.0.0.1:8000/burst_analysis", # data=json.dumps({'network': 'bb_server', # 'start_time': '2024-05-17T09:30:00Z', # 'burst_ID': ['ZBBGXSZW000001'], # 'burst_size': [200], # 'duration': 1800, # 'pump_control': {'1#': [0, 0, 0], '2#': [1, 1, 1], '3#': [1, 1, 1], '4#': [1, 1, 1], # '5#': [45, 45, 45], '6#': [0, 0, 0], '7#': [0, 0, 0]} # 'valve_closed': ['GSD2307192058576667FF7B41FF']), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ class BurstAnalysis(BaseModel): name: str modify_pattern_start_time: str burst_ID: Union[List[str], str] = None burst_size: Union[List[float], float, int] = None modify_total_duration: int = 900 modify_fixed_pump_pattern: Optional[dict[str, list]] = None modify_variable_pump_pattern: Optional[dict[str, list]] = None modify_valve_opening: Optional[dict[str, float]] = None scheme_Name: Optional[str] = None @app.post("/burst_analysis/") async def fastapi_burst_analysis(data: BurstAnalysis) -> str: item = data.dict() filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) burst_analysis(name=item['name'], modify_pattern_start_time=item['modify_pattern_start_time'], burst_ID=item['burst_ID'], burst_size=item['burst_size'], modify_total_duration=item['modify_total_duration'], modify_fixed_pump_pattern=item['modify_fixed_pump_pattern'], modify_variable_pump_pattern=item['modify_variable_pump_pattern'], modify_valve_opening=item['modify_valve_opening'], scheme_Name=item['scheme_Name'] ) #os.rename(filename2, filename) """ # 将 时间转换成日期,然后缓存这个计算结果 # 缓存key: burst_analysis__ global redis_client schemename = data.scheme_Name print(data.modify_pattern_start_time) querydate = time_api.get_date_from_time(data.modify_pattern_start_time) print(f"schemename: {schemename}, querydate: {querydate}") cache_key = f"queryallschemeallrecords_burst_Analysis_{schemename}_{querydate}" data = redis_client.get(cache_key) if not data: results = influxdb_api.query_scheme_all_record("burst_Analysis", scheme_Name=schemename, query_date=querydate) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) """ return "success" ############################################################ # valve close analysis api 39 #example:http://127.0.0.1:8000/valve_close_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valves=GSD2307192058577780A3287D78&valves=GSD2307192058572E953B707226(S2)&duration=1800 ############################################################ @app.get("/valve_close_analysis/", response_class = PlainTextResponse) async def fastapi_valve_close_analysis(network: str,start_time:str,valves:Annotated[list[str], Query()],duration:int=None) -> str: filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = valve_close_analysis(network,start_time,valves,duration) #os.rename(filename2, filename) return result ############################################################ # pipe flushing analysis api 40 #example:http://127.0.0.1:8000/flushing_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valves=GSD230719205857733F8F5214FF&valves=GSD230719205857C0AF65B6A170&valves_k=0.5&valves_k=0.5&drainage_node_ID=GSD2307192058570DEDF28E4F73&flush_flow=0&duration=1800 ############################################################ @app.get("/flushing_analysis/", response_class = PlainTextResponse) async def fastapi_flushing_analysis(network: str,start_time:str,valves:Annotated[list[str], Query()],valves_k:Annotated[list[float], Query()],drainage_node_ID:str,flush_flow:float=0,duration:int=None) -> str: filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = flushing_analysis(network,start_time,valves,valves_k,drainage_node_ID,flush_flow,duration) #os.rename(filename2, filename) return result ############################################################ # contaminant_simulation api 41 #example:http://127.0.0.1:8000/contaminant_simulation?network=beibeizone&start_time=2024-04-01T08:00:00Z&source=ZBBDTZDP002677&concentration=100&duration=1800 ############################################################ @app.get("/contaminant_simulation/", response_class = PlainTextResponse) async def fastapi_contaminant_simulation(network: str,start_time:str,source:str,concentration:float,duration:int=900,pattern:str=None) -> str: filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = contaminant_simulation(network,start_time,source,concentration,duration,pattern) #os.rename(filename2, filename) return result ############################################################ # age analysis api 42 #example:http://127.0.0.1:8000/age_analysis/?network=bb&start_time=2024-04-01T00:00:00Z&end_time=2024-04-01T08:00:00Z&duration=28800 ############################################################ @app.get("/age_analysis/", response_class = PlainTextResponse) async def fastapi_age_analysis(network: str, start_time:str, end_time:str, duration:int) -> str: filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = age_analysis(network,start_time,end_time,duration) #os.rename(filename2, filename) return result ############################################################ # scheduling analysis api 43 ############################################################ class SchedulingAnalysis(BaseModel): network: str start_time: str pump_control: dict tank_id: str water_plant_output_id: str time_delta: Optional[int] = 300 @app.post("/scheduling_analysis/") async def fastapi_scheduling_analysis(data: SchedulingAnalysis) -> str: data = data.dict() filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = scheduling_simulation(data['network'], data['start_time'], data['pump_control'], data['tank_id'], data['water_plant_output_id'], data['time_delta']) #os.rename(filename2, filename) return result ############################################################ # pressure_regulating api 44 # example: # response = requests.post("http://127.0.0.1:8000/pressure_regulating", # data=json.dumps({'network': 'bb_server', # 'start_time': '2024-05-17T09:30:00Z', # 'pump_control': {'1#': [0, 0], '2#': [1, 1], '3#': [1, 1], '4#': [1, 1], # '5#': [45, 45], '6#': [0, 0], '7#': [0, 0]} # 'tank_init_level': {'ZBBDTJSC000002': 2, 'ZBBDTJSC000001': 2}}), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ class PressureRegulation(BaseModel): network: str start_time: str pump_control: dict tank_init_level: Optional[dict] = None @app.post("/pressure_regulation/") async def fastapi_pressure_regulation(data: PressureRegulation) -> str: item = data.dict() filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = pressure_regulation(prj_name=item['network'], start_datetime=item['start_time'], pump_control=item['pump_control'], tank_initial_level_control=item['tank_init_level']) #os.rename(filename2, filename) return result ############################################################ # project_management api 45 # example: # response = requests.post("http://127.0.0.1:8000/project_management", # data=json.dumps({'network': 'bb_server', # 'start_time': '2024-05-17T00:00:00Z', # 'pump_control': # {'1#':(list:97), '2#':(list:97), '3#':(list:97), '4#':(list:97), # '5#':(list:97), '6#':(list:97), '7#':(list:97)} # 'tank_init_level': {'ZBBDTJSC000002': 2, 'ZBBDTJSC000001': 2} # 'region_demand': {'hp': 150000, 'lp': 40000}}), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ class ProjectManagement(BaseModel): network: str start_time: str pump_control: dict tank_init_level: Optional[dict] = None region_demand: Optional[dict] = None @app.post("/project_management/") async def fastapi_project_management(data: ProjectManagement) -> str: item = data.dict() filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = project_management(prj_name=item['network'], start_datetime=item['start_time'], pump_control=item['pump_control'], tank_initial_level_control=item['tank_init_level'], region_demand_control=item['region_demand']) #os.rename(filename2, filename) return result ############################################################ # project_management api 46 # example: # with open('./inp/bb_temp.inp', 'rb') as file: # response = requests.post("http://127.0.0.1:8000/network_project", # files={'file': file}) ############################################################ @app.post("/network_project/") async def fastapi_network_project(file: UploadFile = File()) -> str: temp_file_path = './inp/' if not os.path.exists(temp_file_path): os.mkdir(temp_file_path) temp_file_name = f'network_project_{datetime.now().strftime("%Y%m%d")}' temp_file_path = f'{temp_file_path}{temp_file_name}.inp' with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) buffer.close() filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') result = run_inp(temp_file_name) #os.rename(filename2, filename) return result ############################################################ # daily scheduling analysis api 47 ############################################################ class DailySchedulingAnalysis(BaseModel): network: str start_time: str pump_control: dict reservoir_id: str tank_id: str water_plant_output_id: str time_delta: Optional[int] = 300 @app.post("/daily_scheduling_analysis/") async def fastapi_daily_scheduling_analysis(data: DailySchedulingAnalysis) -> str: data = data.dict() filename = 'c:/lock.simulation' filename2 = 'c:/lock.simulation2' if os.path.exists(filename2): print('file exists') raise HTTPException(status_code=409, detail="is in simulation") else: print('file doesnt exists') #os.rename(filename, filename2) result = daily_scheduling_simulation(data['network'], data['start_time'], data['pump_control'], data['reservoir_id'], data['tank_id'], data['water_plant_output_id']) #os.rename(filename2, filename) return result ############################################################ # network_update api 48 ############################################################ @app.post("/network_update/") async def fastapi_network_update(file: UploadFile = File()) -> str: # 默认文件夹 default_folder = './' # 使用当前时间生成临时文件名 temp_file_name = f'network_update_{datetime.now().strftime("%Y%m%d")}' temp_file_path = os.path.join(default_folder, temp_file_name) # 保存上传的文件到服务器 try: with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) buffer.close() print(f"文件 {temp_file_name} 已成功保存。") except Exception as e: raise HTTPException(status_code=500, detail=f"文件保存失败: {e}") # 更新数据库 try: network_update(temp_file_path) return json.dumps({"message": "管网更新成功"}) except Exception as e: raise HTTPException(status_code=500, detail=f"数据库操作失败: {e}") ############################################################ # pump failure api 49 ############################################################ class PumpFailureState(BaseModel): time: str pump_status: dict @app.post("/pump_failure/") async def fastapi_pump_failure(data: PumpFailureState) -> str: item = data.dict() with open('./pump_failure_message.txt', 'a', encoding='utf-8-sig') as f1: f1.write('[{}] {}\n'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), item)) # save message status_info = item.copy() with open('./pump_failure_status.txt', 'r', encoding='utf-8-sig') as f2: lines = f2.readlines() first_stage_pump_status_dict = json.loads(json.dumps(eval(lines[0]))) second_stage_pump_status_dict = json.loads(json.dumps(eval(lines[-1]))) # read local file pump_status_dict = {'first': first_stage_pump_status_dict, # first-stage pump 'second': second_stage_pump_status_dict} # second-stage pump for pump_type in status_info['pump_status'].keys(): # 'first' or 'second' if pump_type in pump_status_dict.keys(): # the type of pumps exists if all(pump_id in pump_status_dict[pump_type].keys() for pump_id in status_info['pump_status'][pump_type].keys()): # all pump IDs exist for pump_id in status_info['pump_status'][pump_type].keys(): pump_status_dict[pump_type][pump_id] = int( status_info['pump_status'][pump_type][pump_id]) # modify status dict else: return json.dumps('ERROR: Wrong Pump ID') else: return json.dumps('ERROR: Wrong Pump Type') with open('./pump_failure_status.txt', 'w', encoding='utf-8-sig') as f2_: f2_.write('{}\n{}'.format(pump_status_dict['first'], pump_status_dict['second'])) # save local file return json.dumps('SUCCESS') class Item(BaseModel): str_info: str dict_info: Optional[dict] = None @app.post("/test_dict/") async def get_dict(item: Item): print(item.dict()) return item if __name__ == "__main__": #uvicorn.run(app, host="0.0.0.0", port=8000) #url='http://127.0.0.1:8000/valve_close_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valve_IDs=GSD2307192058577780A3287D78&valve_IDs=GSD2307192058572E953B707226(S2)&duration=1800' url='http://127.0.0.1:8000/burst_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&burst_ID=ZBBGXSZW000001&duration=1800' Request.get(url,)