import os import json import time import datetime import logging import threading import shutil import random from typing import * from typing import List, Annotated, Optional, Union from urllib.request import Request from fastapi import ( FastAPI, File, UploadFile, Response, status, Request, HTTPException, Query, Depends, Header, ) from fastapi.responses import PlainTextResponse from fastapi.middleware.gzip import GZipMiddleware from fastapi.middleware.cors import CORSMiddleware from starlette.responses import FileResponse, JSONResponse from contextlib import asynccontextmanager from pydantic import BaseModel, field_validator from multiprocessing import Value import redis import msgpack from datetime import datetime, timedelta, timezone # 第三方/自定义模块 import app.infra.db.influxdb.api as influxdb_api import app.infra.db.timescaledb as timescaledb import app.infra.db.postgresql as postgresql import py_linq import app.services.time_api as time_api import app.services.simulation as simulation import app.services.globals as globals import app.services.project_info as project_info from app.infra.db.timescaledb.database import db as tsdb from app.infra.db.postgresql.database import db as pgdb from app.algorithms.online_Analysis import * from app.services.tjnetwork import ( Any, ChangeSet, PIPE_STATUS_OPEN, VALVES_TYPE_PRV, add_curve, add_district_metering_area, add_junction, add_label, add_mixing, add_pattern, add_pipe, add_pump, add_region, add_reservoir, add_scada_device, add_scada_device_data, add_scada_element, add_service_area, add_source, add_tank, add_valve, add_vertex, add_virtual_district, api, calculate_demand_to_network, calculate_demand_to_nodes, calculate_demand_to_region, calculate_district_metering_area_for_network, calculate_district_metering_area_for_nodes, calculate_district_metering_area_for_region, calculate_service_area, calculate_virtual_district, clean_scada_device, clean_scada_device_data, clean_scada_element, close_project, convert_inp_v3_to_v2, copy_project, create_project, delete_curve, delete_district_metering_area, delete_junction, delete_label, delete_mixing, delete_pattern, delete_pipe, delete_project, delete_pump, delete_region, delete_reservoir, delete_scada_device, delete_scada_device_data, delete_scada_element, delete_service_area, delete_source, delete_tank, delete_valve, delete_virtual_district, dump_inp, dump_output, execute_batch_command, execute_batch_commands, execute_redo, execute_undo, export_inp, generate_district_metering_area, generate_service_area, generate_sub_district_metering_area, generate_virtual_district, get_all_burst_locate_results, get_all_district_metering_area_ids, get_all_district_metering_areas, get_all_extension_data, get_all_extension_data_keys, get_all_junctions, get_all_pipes, get_all_pumps, get_all_reservoirs, get_all_scada_device_ids, get_all_scada_devices, get_all_scada_elements, get_all_scada_info, get_all_schemes, get_all_sensor_placements, get_all_service_areas, get_all_tanks, get_all_users, get_all_valves, get_all_vertex_links, get_all_vertices, get_all_virtual_districts, get_backdrop, get_backdrop_schema, get_control, get_control_schema, get_current_operation, get_curve, get_curve_schema, get_curves, get_demand, get_demand_schema, get_district_metering_area, get_district_metering_area_schema, get_element_properties, get_element_properties_with_type, get_element_type, get_element_type_value, get_emitter, get_emitter_schema, get_energy, get_energy_schema, get_extension_data, get_junction, get_junction_schema, get_label, get_label_schema, get_link_properties, get_link_type, get_links, get_major_node_coords, get_major_pipe_nodes, get_mixing, get_mixing_schema, get_network_link_nodes, get_network_node_coords, get_network_pipe_risk_probability_now, get_node_coord, get_node_links, get_node_properties, get_node_type, get_nodes, get_option_v3, get_option_v3_schema, get_pattern, get_pattern_schema, get_patterns, get_pipe, get_pipe_reaction, get_pipe_reaction_schema, get_pipe_risk_probability, get_pipe_risk_probability_geometries, get_pipe_risk_probability_now, get_pipe_schema, get_pipes_risk_probability, get_pump, get_pump_energy, get_pump_energy_schema, get_pump_schema, get_quality, get_quality_schema, get_reaction, get_reaction_schema, get_region, get_region_schema, get_reservoir, get_reservoir_schema, get_restore_operation, get_rule, get_rule_schema, get_scada_device, get_scada_device_data, get_scada_device_data_schema, get_scada_device_schema, get_scada_element, get_scada_element_schema, get_scada_info, get_scada_info_schema, get_scheme, get_scheme_schema, get_service_area, get_service_area_schema, get_source, get_source_schema, get_status, get_status_schema, get_tag, get_tag_schema, get_tags, get_tank, get_tank_reaction, get_tank_reaction_schema, get_tank_schema, get_time, get_time_schema, get_title, get_title_schema, get_user, get_user_schema, get_valve, get_valve_schema, get_vertex, get_vertex_schema, get_virtual_district, get_virtual_district_schema, have_project, have_snapshot, have_snapshot_for_current_operation, have_snapshot_for_operation, import_inp, is_curve, is_junction, is_link, is_node, is_pattern, is_pipe, is_project_open, is_pump, is_reservoir, is_tank, is_valve, list_project, list_snapshot, open_project, pick_operation, pick_snapshot, read_inp, run_inp, run_project, run_project_return_dict, set_backdrop, set_control, set_curve, set_demand, set_district_metering_area, set_emitter, set_energy, set_extension_data, set_junction, set_label, set_option_v3, set_pattern, set_pipe, set_pipe_reaction, set_pump, set_pump_energy, set_quality, set_reaction, set_region, set_reservoir, set_restore_operation, set_rule, set_scada_device, set_scada_device_data, set_scada_element, set_service_area, set_source, set_status, set_tag, set_tank, set_tank_reaction, set_time, set_title, set_valve, set_vertex, set_virtual_district, sync_with_server, take_snapshot, take_snapshot_for_current_operation, take_snapshot_for_operation, ) JUNCTION = 0 RESERVOIR = 1 TANK = 2 PIPE = 1 NODE_COUNT = 0 LINK_COUNT = 2 prjs = [] # inpDir = "C:/inpfiles/" # tmpDir = "C:/tmpfiles/" # proj_name = project_info.name # lockedPrjs = {} # if not os.path.exists(inpDir): # os.mkdir(inpDir) # if not os.path.exists(tmpDir): # os.mkdir(tmpDir) # 全局依赖项 async def global_auth(request: Request): # 白名单跳过 # if request.url.path in WHITE_LIST: # return # 验证 token = request.headers.get("Authorization") if token != "Bearer 567e33c876a2" and token != "Bearer 38b3be72b8af": raise HTTPException(status_code=401, detail="Invalid token") # 简易令牌验证(实际项目中应替换为 JWT/OAuth2 等) AUTH_TOKEN = "567e33c876a2" # 预设的有效令牌 async def verify_token(authorization: Annotated[str, Header()] = None): # 检查请求头是否存在 if not authorization: raise HTTPException(status_code=401, detail="Authorization header missing") # 提取 Bearer 后的令牌 (格式: Bearer ) try: token_type, token = authorization.split(" ", 1) if token_type.lower() != "bearer": raise ValueError except ValueError: raise HTTPException( status_code=401, detail="Invalid authorization format. Use: Bearer " ) # 验证令牌 if token != AUTH_TOKEN: raise HTTPException(status_code=403, detail="Invalid authentication token") return True # 全局依赖项 # app = FastAPI(dependencies=[Depends(global_auth)]) # app = FastAPI() # 生命周期管理器 @asynccontextmanager async def lifespan(app: FastAPI): # 初始化数据库连接池 tsdb.init_pool() pgdb.init_pool() await tsdb.open() await pgdb.open() open_project(project_info.name) yield # 清理资源 tsdb.close() pgdb.close() app = FastAPI(lifespan=lifespan) app.include_router(timescaledb.router) app.include_router(postgresql.router) access_tokens = [] def generate_access_token(username: str, password: str) -> str: """ 根据用户名和密码生成JWT access token 参数: username: 用户名 password: 密码 返回: JWT access token字符串 """ if username != "tjwater" or password != "tjwater@123": raise ValueError("用户名或密码错误") token = "567e33c876a2" return token # 将 Query的信息 序列号到 redis/json, 默认不支持datetime,需要自定义 # 自定义序列化函数 # 序列化处理器 def encode_datetime(obj): """将datetime转换为可序列化的字典结构""" if isinstance(obj, datetime): return {"__datetime__": True, "as_str": obj.strftime("%Y%m%dT%H:%M:%S.%f")} return obj # 反序列化处理器 def decode_datetime(obj): """将字典还原为datetime对象""" if "__datetime__" in obj: return datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f") return obj # 初始化 Redis 连接 # 用redis 限制并发访u redis_client = redis.Redis(host="127.0.0.1", port=6379, db=0) # influxdb数据库连接信息 # influx_url = influxdb_info.url # influx_token = influxdb_info.token # influx_org_name = influxdb_info.org # influx_client = InfluxDBClient(url=influx_url, token=influx_token, org=influx_org_name, timeout=100*1000) # 100 seconds # 配置 CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有来源 allow_credentials=True, # 允许传递凭证(Cookie、HTTP 头等) allow_methods=["*"], # 允许所有 HTTP 方法 allow_headers=["*"], # 允许所有 HTTP 头 ) # 定义一个共享变量 lock_simulation = Value("i", 0) app.add_middleware(GZipMiddleware, minimum_size=1000) logger = logging.getLogger() logger.setLevel(logging.INFO) @app.on_event("startup") async def startup_db(): logger.info("**********************************************************") logger.info(str(datetime.now())) logger.info("TJWater CloudService is starting...") logger.info("**********************************************************") # open proj_name by default print(project_info.name) open_project(project_info.name) ############################################################ # auth ############################################################ @app.post("/login/") async def fastapi_login(username: str, password: str) -> str: return generate_access_token(username, password) ############################################################ # extension_data ############################################################ @app.get("/getallextensiondatakeys/") async def fastapi_get_all_extension_data_keys(network: str) -> list[str]: return get_all_extension_data_keys(network) @app.get("/getallextensiondata/") async def fastapi_get_all_extension_data(network: str) -> dict[str, Any]: return get_all_extension_data(network) @app.get("/getextensiondata/") async def fastapi_get_extension_data(network: str, key: str) -> str | None: return get_extension_data(network, key) @app.post("/setextensiondata", response_model=None) async def fastapi_set_extension_data(network: str, req: Request) -> ChangeSet: props = await req.json() print(props) cs = set_extension_data(network, ChangeSet(props)) print(cs.operations[0]) return cs ############################################################ # project ############################################################ @app.get("/listprojects/") async def fastapi_list_projects() -> list[str]: return list_project() @app.get("/haveproject/") async def fastapi_have_project(network: str): return have_project(network) @app.post("/createproject/") async def fastapi_create_project(network: str): create_project(network) return network @app.post("/deleteproject/") async def fastapi_delete_project(network: str): delete_project(network) return True @app.get("/isprojectopen/") async def fastapi_is_project_open(network: str): return is_project_open(network) @app.post("/openproject/") async def fastapi_open_project(network: str): open_project(network) return network @app.post("/closeproject/") async def fastapi_close_project(network: str): close_project(network) return True @app.post("/copyproject/") async def fastapi_copy_project(source: str, target: str): copy_project(source, target) return True @app.post("/importinp/") async def fastapi_import_inp(network: str, req: Request): jo_root = await req.json() inp_text = jo_root["inp"] ps = {"inp": inp_text} ret = import_inp(network, ChangeSet(ps)) print(ret) return ret @app.get("/exportinp/", response_model=None) async def fastapi_export_inp(network: str, version: str) -> ChangeSet: cs = export_inp(network, version) op = cs.operations[0] open_project(network) op["vertex"] = json.dumps(get_all_vertices(network)) op["scada"] = json.dumps(get_all_scada_elements(network)) op["dma"] = json.dumps(get_all_district_metering_areas(network)) op["sa"] = json.dumps(get_all_service_areas(network)) op["vd"] = json.dumps(get_all_virtual_districts(network)) op["legend"] = get_extension_data(network, "legend") db = get_extension_data(network, "scada_db") print(db) scada_db = "" if db: scada_db = db print(scada_db) op["scada_db"] = scada_db close_project(network) return cs @app.post("/readinp/") async def fastapi_read_inp(network: str, inp: str) -> bool: read_inp(network, inp) return True @app.get("/dumpinp/") async def fastapi_dump_inp(network: str, inp: str) -> bool: dump_inp(network, inp) return True # 必须用这个PlainTextResponse,不然每个key都有引号 @app.get("/runproject/", response_class=PlainTextResponse) async def fastapi_run_project(network: str) -> str: lock_key = "exclusive_api_lock" timeout = 120 # 锁自动过期时间(秒) # 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间) acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout) if not acquired: raise HTTPException(status_code=409, detail="is in simulation") else: try: return run_project(network) finally: # 手动释放锁(可选,依赖过期时间自动释放更安全) redis_client.delete(lock_key) # DingZQ, 2025-02-04, 返回dict[str, Any] # output 和 report # output 是 json # report 是 text @app.get("/runprojectreturndict/") async def fastapi_run_project_return_dict(network: str) -> dict[str, Any]: lock_key = "exclusive_api_lock" timeout = 120 # 锁自动过期时间(秒) # 尝试获取锁(NX=True: 不存在时设置,EX=timeout: 过期时间) acquired = redis_client.set(lock_key, "locked", nx=True, ex=timeout) if not acquired: raise HTTPException(status_code=409, detail="is in simulation") else: try: return run_project_return_dict(network) finally: # 手动释放锁(可选,依赖过期时间自动释放更安全) redis_client.delete(lock_key) # put in inp folder, name without extension @app.get("/runinp/") async def fastapi_run_inp(network: str) -> str: return run_inp(network) # path is absolute path @app.get("/dumpoutput/") async def fastapi_dump_output(output: str) -> str: return dump_output(output) @app.get("/isprojectlocked/") async def fastapi_is_locked(network: str, req: Request): return str in lockedPrjs.keys() @app.get("/isprojectlockedbyme/") async def fastapi_is_locked_by_me(network: str, req: Request): client_host = req.client.host return lockedPrjs.get(network) == client_host # 0 successfully locked # 1 already locked by you # 2 locked by others @app.post("/lockproject/") async def fastapi_lock_project(network: str, req: Request): client_host = req.client.host if not network in lockedPrjs.keys(): lockedPrjs[network] = client_host return 0 else: if lockedPrjs.get(network) == client_host: return 1 else: return 2 @app.post("/unlockproject/") def fastapi_unlock_project(network: str, req: Request): client_host = req.client.host if lockedPrjs[network] == client_host: print("delete key") del lockedPrjs[network] return True return False ### operations @app.get("/getcurrentoperationid/") async def fastapi_get_current_operaiton_id(network: str) -> int: return get_current_operation(network) @app.post("/undo/") async def fastapi_undo(network: str): return execute_undo(network) @app.post("/redo/") async def fastapi_redo(network: str): return execute_redo(network) @app.get("/getsnapshots/") def fastapi_list_snapshot(network: str) -> list[tuple[int, str]]: return list_snapshot(network) @app.get("/havesnapshot/") async def fastapi_have_snapshot(network: str, tag: str) -> bool: return have_snapshot(network, tag) @app.get("/havesnapshotforoperation/") async def fastapi_have_snapshot_for_operation(network: str, operation: int) -> bool: return have_snapshot_for_operation(network, operation) @app.get("/havesnapshotforcurrentoperation/") async def fastapi_have_snapshot_for_current_operation(network: str) -> bool: return have_snapshot_for_current_operation(network) @app.post("/takesnapshotforoperation/") async def fastapi_take_snapshot_for_operation( network: str, operation: int, tag: str ) -> None: return take_snapshot_for_operation(network, operation, tag) @app.post("takenapshotforcurrentoperation") async def fastapi_take_snapshot_for_current_operation(network: str, tag: str) -> None: return take_snapshot_for_current_operation(network, tag) @app.post("/takesnapshot/") def fastapi_take_snapshot(network: str, tag: str) -> None: return take_snapshot(network, tag) @app.post("/picksnapshot/", response_model=None) def fastapi_pick_snapshot(network: str, tag: str, discard: bool = False) -> ChangeSet: return pick_snapshot(network, tag, discard) @app.post("/pickoperation/", response_model=None) async def fastapi_pick_operation( network: str, operation: int, discard: bool = False ) -> ChangeSet: return pick_operation(network, operation, discard) @app.get("/syncwithserver/", response_model=None) async def fastapi_sync_with_server(network: str, operation: int) -> ChangeSet: return sync_with_server(network, operation) @app.post("/batch/", response_model=None) async def fastapi_execute_batch_commands(network: str, req: Request) -> ChangeSet: jo_root = await req.json() cs: ChangeSet = ChangeSet() cs.operations = jo_root["operations"] rcs = execute_batch_commands(network, cs) return rcs @app.post("/compressedbatch/", response_model=None) async def fastapi_execute_compressed_batch_commands( network: str, req: Request ) -> ChangeSet: jo_root = await req.json() cs: ChangeSet = ChangeSet() cs.operations = jo_root["operations"] return execute_batch_command(network, cs) @app.get("/getrestoreoperation/") async def fastapi_get_restore_operation(network: str) -> int: return get_restore_operation(network) @app.post("/setrestoreoperation/") async def fastapi_set_restore_operation(network: str, operation: int) -> None: return set_restore_operation(network, operation) ############################################################ # type ############################################################ @app.get("/isnode/") async def fastapi_is_node(network: str, node: str) -> bool: return is_node(network, node) @app.get("/isjunction/") async def fastapi_is_junction(network: str, node: str) -> bool: return is_junction(network, node) @app.get("/isreservoir/") async def fastapi_is_reservoir(network: str, node: str) -> bool: return is_reservoir(network, node) @app.get("/istank/") async def fastapi_is_tank(network: str, node: str) -> bool: return is_tank(network, node) @app.get("/islink/") async def fastapi_is_link(network: str, link: str) -> bool: return is_link(network, link) @app.get("/ispipe/") async def fastapi_is_pipe(network: str, link: str) -> bool: return is_pipe(network, link) @app.get("/ispump/") async def fastapi_is_pump(network: str, link: str) -> bool: return is_pump(network, link) @app.get("/isvalve/") async def fastapi_is_valve(network: str, link: str) -> bool: return is_valve(network, link) # DingZQ, 2025-02-05 @app.get("/getnodetype/") async def fastapi_get_node_type(network: str, node: str) -> str: return get_node_type(network, node) @app.get("/getlinktype/") async def fastapi_get_link_type(network: str, link: str) -> str: return get_link_type(network, link) @app.get("/getelementtype/") async def fastapi_get_element_type(network: str, element: str) -> str: return get_element_type(network, element) @app.get("/getelementtypevalue/") async def fastapi_get_element_type_value(network: str, element: str) -> int: return get_element_type_value(network, element) @app.get("/iscurve/") async def fastapi_is_curve(network: str, curve: str) -> bool: return is_curve(network, curve) @app.get("/ispattern/") async def fastapi_is_pattern(network: str, pattern: str) -> bool: return is_pattern(network, pattern) @app.get("/getnodes/") async def fastapi_get_nodes(network: str) -> list[str]: return get_nodes(network) @app.get("/getlinks/") async def fastapi_get_links(network: str) -> list[str]: return get_links(network) @app.get("/getcurves/") async def fastapi_get_curves(network: str) -> list[str]: return get_curves(network) @app.get("/getpatterns/") async def fastapi_get_patterns(network: str) -> list[str]: return get_patterns(network) @app.get("/getnodelinks/") def get_node_links(network: str, node: str) -> list[str]: return get_node_links(network, node) ############################################################ # DingZQ, 2025-02-05 # 用统一的接口来获取 Node & Link properties, Node和Link的Id可以一样,不能进一步统一成获取Element 的 properties # Node & Link properties ############################################################ @app.get("/getnodeproperties/") async def fast_get_node_properties(network: str, node: str) -> dict[str, Any]: return get_node_properties(network, node) @app.get("/getlinkproperties/") async def fast_get_link_properties(network: str, link: str) -> dict[str, Any]: return get_link_properties(network, link) @app.get("/getscadaproperties/") async def fast_get_scada_properties(network: str, scada: str) -> dict[str, Any]: return get_scada_info(network, scada) @app.get("/getallscadaproperties/") async def fast_get_all_scada_properties(network: str) -> list[dict[str, Any]]: return get_all_scada_info(network) # elementtype can be 'node' or 'link' or 'scada' @app.get("/getelementpropertieswithtype/") async def fast_get_element_properties_with_type( network: str, elementtype: str, element: str ) -> dict[str, Any]: return get_element_properties_with_type(network, elementtype, element) # type can be 'node' or 'link' or 'scada' @app.get("/getelementproperties/") async def fast_get_element_properties(network: str, element: str) -> dict[str, Any]: return get_element_properties(network, element) ############################################################ # title 1.[TITLE] ############################################################ @app.get("/gettitleschema/") async def fast_get_title_schema(network: str) -> dict[str, dict[str, Any]]: return get_title_schema(network) @app.get("/gettitle/") async def fast_get_title(network: str) -> dict[str, Any]: return get_title(network) @app.get("/settitle/", response_model=None) async def fastapi_set_title(network: str, req: Request) -> ChangeSet: props = await req.json() return set_title(network, ChangeSet(props)) ############################################################ # junction 2.[JUNCTIONS] ############################################################ @app.get("/getjunctionschema") async def fast_get_junction_schema(network: str) -> dict[str, dict[str, Any]]: return get_junction_schema(network) @app.post("/addjunction/", response_model=None) async def fastapi_add_junction( network: str, junction: str, x: float, y: float, z: float ) -> ChangeSet: ps = {"id": junction, "x": x, "y": y, "elevation": z} return add_junction(network, ChangeSet(ps)) @app.post("/deletejunction/", response_model=None) async def fastapi_delete_junction(network: str, junction: str) -> ChangeSet: ps = {"id": junction} return delete_junction(network, ChangeSet(ps)) @app.get("/getjunctionelevation/") async def fastapi_get_junction_elevation(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps["elevation"] @app.get("/getjunctionx/") async def fastapi_get_junction_x(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps["x"] @app.get("/getjunctiony/") async def fastapi_get_junction_x(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps["y"] @app.get("/getjunctioncoord/") async def fastapi_get_junction_coord(network: str, junction: str) -> dict[str, float]: ps = get_junction(network, junction) coord = {"x": ps["x"], "y": ps["y"]} return coord @app.get("/getjunctiondemand/") async def fastapi_get_junction_demand(network: str, junction: str) -> float: ps = get_junction(network, junction) return ps["demand"] @app.get("/getjunctionpattern/") async def fastapi_get_junction_pattern(network: str, junction: str) -> str: ps = get_junction(network, junction) return ps["pattern"] @app.post("/setjunctionelevation/", response_model=None) async def fastapi_set_junction_elevation( network: str, junction: str, elevation: float ) -> ChangeSet: ps = {"id": junction, "elevation": elevation} return set_junction(network, ChangeSet(ps)) @app.post("/setjunctionx/", response_model=None) async def fastapi_set_junction_x(network: str, junction: str, x: float) -> ChangeSet: ps = {"id": junction, "x": x} return set_junction(network, ChangeSet(ps)) @app.post("/setjunctiony/", response_model=None) async def fastapi_set_junction_y(network: str, junction: str, y: float) -> ChangeSet: ps = {"id": junction, "y": y} return set_junction(network, ChangeSet(ps)) @app.post("/setjunctioncoord/", response_model=None) async def fastapi_set_junction_coord( network: str, junction: str, x: float, y: float ) -> ChangeSet: ps = {"id": junction, "x": x, "y": y} return set_junction(network, ChangeSet(ps)) @app.post("/setjunctiondemand/", response_model=None) async def fastapi_set_junction_demand( network: str, junction: str, demand: float ) -> ChangeSet: ps = {"id": junction, "demand": demand} return set_junction(network, ChangeSet(ps)) @app.post("/setjunctionpattern/", response_model=None) async def fastapi_set_junction_pattern( network: str, junction: str, pattern: str ) -> ChangeSet: ps = {"id": junction, "pattern": pattern} return set_junction(network, ChangeSet(ps)) @app.get("/getjunctionproperties/") async def fastapi_get_junction_properties( network: str, junction: str ) -> dict[str, Any]: return get_junction(network, junction) # DingZQ, 2025-03-29 @app.get("/getalljunctionproperties/") async def fastapi_get_all_junction_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getalljunctionproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_junctions(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setjunctionproperties/", response_model=None) async def fastapi_set_junction_properties( network: str, junction: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": junction} | props return set_junction(network, ChangeSet(ps)) ############################################################ # reservoir 3.[RESERVOIRS] ############################################################ @app.get("/getreservoirschema") async def fast_get_reservoir_schema(network: str) -> dict[str, dict[str, Any]]: return get_reservoir_schema(network) @app.post("/addreservoir/", response_model=None) async def fastapi_add_reservoir( network: str, reservoir: str, x: float, y: float, head: float ) -> ChangeSet: ps = {"id": reservoir, "x": x, "y": y, "head": head} return add_reservoir(network, ChangeSet(ps)) @app.post("/deletereservoir/", response_model=None) async def fastapi_delete_reservoir(network: str, reservoir: str) -> ChangeSet: ps = {"id": reservoir} return delete_reservoir(network, ChangeSet(ps)) @app.get("/getreservoirhead/") async def fastapi_get_reservoir_head(network: str, reservoir: str) -> float | None: ps = get_reservoir(network, reservoir) return ps["head"] @app.get("/getreservoirpattern/") async def fastapi_get_reservoir_pattern(network: str, reservoir: str) -> str | None: ps = get_reservoir(network, reservoir) return ps["pattern"] @app.get("/getreservoirx/") async def fastapi_get_reservoir_x( network: str, reservoir: str ) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) return ps["x"] @app.get("/getreservoiry/") async def fastapi_get_reservoir_y( network: str, reservoir: str ) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) return ps["y"] @app.get("/getreservoircoord/") async def fastapi_get_reservoir_y( network: str, reservoir: str ) -> dict[str, float] | None: ps = get_reservoir(network, reservoir) coord = {"id": reservoir, "x": ps["x"], "y": ps["y"]} return coord @app.post("/setreservoirhead/", response_model=None) async def fastapi_set_reservoir_head( network: str, reservoir: str, head: float ) -> ChangeSet: ps = {"id": reservoir, "head": head} return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoirpattern/", response_model=None) async def fastapi_set_reservoir_pattern( network: str, reservoir: str, pattern: str ) -> ChangeSet: ps = {"id": reservoir, "pattern": pattern} return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoirx/", response_model=None) async def fastapi_set_reservoir_x(network: str, reservoir: str, x: float) -> ChangeSet: ps = {"id": reservoir, "x": x} return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoirx/", response_model=None) async def fastapi_set_reservoir_y(network: str, reservoir: str, y: float) -> ChangeSet: ps = {"id": reservoir, "y": y} return set_reservoir(network, ChangeSet(ps)) @app.post("/setreservoircoord/", response_model=None) async def fastapi_set_reservoir_y( network: str, reservoir: str, x: float, y: float ) -> ChangeSet: ps = {"id": reservoir, "x": x, "y": y} return set_reservoir(network, ChangeSet(ps)) @app.get("/getreservoirproperties/") async def fastapi_get_reservoir_properties( network: str, reservoir: str ) -> dict[str, Any]: return get_reservoir(network, reservoir) # DingZQ, 2025-03-29 @app.get("/getallreservoirproperties/") async def fastapi_get_all_reservoir_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallreservoirproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_reservoirs(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setreservoirproperties/", response_model=None) async def fastapi_set_reservoir_properties( network: str, reservoir: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": reservoir} | props return set_reservoir(network, ChangeSet(ps)) ############################################################ # tank 4.[TANKS] ############################################################ @app.get("/gettankschema") async def fast_get_tank_schema(network: str) -> dict[str, dict[str, Any]]: return get_tank_schema(network) @app.post("/addtank/", response_model=None) async def fastapi_add_tank( network: str, tank: str, x: float, y: float, elevation: float, init_level: float = 0, min_level: float = 0, max_level: float = 0, diameter: float = 0, min_vol: float = 0, ) -> ChangeSet: ps = { "id": tank, "x": x, "y": y, "elevation": elevation, "init_level": init_level, "min_level": min_level, "max_level": max_level, "diameter": diameter, "min_vol": min_vol, } return add_tank(network, ChangeSet(ps)) @app.post("/deletetank/", response_model=None) async def fastapi_delete_tank(network: str, tank: str) -> ChangeSet: ps = {"id": tank} return delete_tank(network, ChangeSet(ps)) @app.get("/gettankelevation/") async def fastapi_get_tank_elevation(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps["elevation"] @app.get("/gettankinitlevel/") async def fastapi_get_tank_init_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps["init_level"] @app.get("/gettankminlevel/") async def fastapi_get_tank_min_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps["min_level"] @app.get("/gettankmaxlevel/") async def fastapi_get_tank_max_level(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps["max_level"] @app.get("/gettankdiameter/") async def fastapi_get_tank_diameter(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps["diameter"] @app.get("/gettankminvol/") async def fastapi_get_tank_min_vol(network: str, tank: str) -> float | None: ps = get_tank(network, tank) return ps["min_vol"] @app.get("/gettankvolcurve/") async def fastapi_get_tank_vol_curve(network: str, tank: str) -> str | None: ps = get_tank(network, tank) return ps["vol_curve"] @app.get("/gettankoverflow/") async def fastapi_get_tank_overflow(network: str, tank: str) -> str | None: ps = get_tank(network, tank) return ps["overflow"] @app.get("/gettankx/") async def fastapi_get_tank_x(network: str, tank: str) -> float: ps = get_tank(network, tank) return ps["x"] @app.get("/gettanky/") async def fastapi_get_tank_x(network: str, tank: str) -> float: ps = get_tank(network, tank) return ps["y"] @app.get("/gettankcoord/") async def fastapi_get_tank_coord(network: str, tank: str) -> dict[str, float]: ps = get_tank(network, tank) coord = {"x": ps["x"], "y": ps["y"]} return coord @app.post("/settankelevation/", response_model=None) async def fastapi_set_tank_elevation( network: str, tank: str, elevation: float ) -> ChangeSet: ps = {"id": tank, "elevation": elevation} return set_tank(network, ChangeSet(ps)) @app.post("/settankinitlevel/", response_model=None) async def fastapi_set_tank_init_level( network: str, tank: str, init_level: float ) -> ChangeSet: ps = {"id": tank, "init_level": init_level} return set_tank(network, ChangeSet(ps)) @app.post("/settankminlevel/", response_model=None) async def fastapi_set_tank_min_level( network: str, tank: str, min_level: float ) -> ChangeSet: ps = {"id": tank, "min_level": min_level} return set_tank(network, ChangeSet(ps)) @app.post("/settankmaxlevel/", response_model=None) async def fastapi_set_tank_max_level( network: str, tank: str, max_level: float ) -> ChangeSet: ps = {"id": tank, "max_level": max_level} return set_tank(network, ChangeSet(ps)) @app.post("settankdiameter//", response_model=None) async def fastapi_set_tank_diameter( network: str, tank: str, diameter: float ) -> ChangeSet: ps = {"id": tank, "diameter": diameter} return set_tank(network, ChangeSet(ps)) @app.post("/settankminvol/", response_model=None) async def fastapi_set_tank_min_vol( network: str, tank: str, min_vol: float ) -> ChangeSet: ps = {"id": tank, "min_vol": min_vol} return set_tank(network, ChangeSet(ps)) @app.post("/settankvolcurve/", response_model=None) async def fastapi_set_tank_vol_curve( network: str, tank: str, vol_curve: str ) -> ChangeSet: ps = {"id": tank, "vol_curve": vol_curve} return set_tank(network, ChangeSet(ps)) @app.post("/settankoverflow/", response_model=None) async def fastapi_set_tank_overflow( network: str, tank: str, overflow: str ) -> ChangeSet: ps = {"id": tank, "overflow": overflow} return set_tank(network, ChangeSet(ps)) @app.post("/settankx/", response_model=None) async def fastapi_set_tank_x(network: str, tank: str, x: float) -> ChangeSet: ps = {"id": tank, "x": x} return set_tank(network, ChangeSet(ps)) @app.post("/settanky/", response_model=None) async def fastapi_set_tank_y(network: str, tank: str, y: float) -> ChangeSet: ps = {"id": tank, "y": y} return set_tank(network, ChangeSet(ps)) @app.post("/settankcoord/", response_model=None) async def fastapi_set_tank_coord( network: str, tank: str, x: float, y: float ) -> ChangeSet: ps = {"id": tank, "x": x, "y": y} return set_tank(network, ChangeSet(ps)) @app.get("/gettankproperties/") async def fastapi_get_tank_properties(network: str, tank: str) -> dict[str, Any]: return get_tank(network, tank) # DingZQ, 2025-03-29 @app.get("/getalltankproperties/") async def fastapi_get_all_tank_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getalltankproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_tanks(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/settankproperties/", response_model=None) async def fastapi_set_tank_properties( network: str, tank: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": tank} | props return set_tank(network, ChangeSet(ps)) ############################################################ # pipe 4.[PIPES] ############################################################ @app.get("/getpipeschema") async def fastapi_get_pipe_schema(network: str) -> dict[str, dict[str, Any]]: return get_pipe_schema(network) @app.post("/addpipe/", response_model=None) async def fastapi_add_pipe( network: str, pipe: str, node1: str, node2: str, length: float = 0, diameter: float = 0, roughness: float = 0, minor_loss: float = 0, status: str = PIPE_STATUS_OPEN, ) -> ChangeSet: ps = { "id": pipe, "node1": node1, "node2": node2, "length": length, "diameter": diameter, "roughness": roughness, "minor_loss": minor_loss, "status": status, } return add_pipe(network, ChangeSet(ps)) @app.post("/deletepipe/", response_model=None) async def fastapi_delete_pipe(network: str, pipe: str) -> ChangeSet: ps = {"id": pipe} return delete_pipe(network, ChangeSet(ps)) @app.get("/getpipenode1/") async def fastapi_get_pipe_node1(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps["node1"] @app.get("/getpipenode2/") async def fastapi_get_pipe_node2(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps["node2"] @app.get("/getpipelength/") async def fastapi_get_pipe_length(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps["length"] @app.get("/getpipediameter/") async def fastapi_get_pipe_diameter(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps["diameter"] @app.get("/getpiperoughness/") async def fastapi_get_pipe_roughness(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps["roughness"] @app.get("/getpipeminorloss/") async def fastapi_get_pipe_minor_loss(network: str, pipe: str) -> float | None: ps = get_pipe(network, pipe) return ps["minor_loss"] @app.get("/getpipestatus/") async def fastapi_get_pipe_status(network: str, pipe: str) -> str | None: ps = get_pipe(network, pipe) return ps["status"] @app.post("/setpipenode1/", response_model=None) async def fastapi_set_pipe_node1(network: str, pipe: str, node1: str) -> ChangeSet: ps = {"id": pipe, "node1": node1} return set_pipe(network, ChangeSet(ps)) @app.post("/setpipenode2/", response_model=None) async def fastapi_set_pipe_node2(network: str, pipe: str, node2: str) -> ChangeSet: ps = {"id": pipe, "node2": node2} return set_pipe(network, ChangeSet(ps)) @app.post("/setpipelength/", response_model=None) async def fastapi_set_pipe_length(network: str, pipe: str, length: float) -> ChangeSet: ps = {"id": pipe, "length": length} return set_pipe(network, ChangeSet(ps)) @app.post("/setpipediameter/", response_model=None) async def fastapi_set_pipe_diameter( network: str, pipe: str, diameter: float ) -> ChangeSet: ps = {"id": pipe, "diameter": diameter} return set_pipe(network, ChangeSet(ps)) @app.post("/setpiperoughness/", response_model=None) async def fastapi_set_pipe_roughness( network: str, pipe: str, roughness: float ) -> ChangeSet: ps = {"id": pipe, "roughness": roughness} return set_pipe(network, ChangeSet(ps)) @app.post("/setpipeminorloss/", response_model=None) async def fastapi_set_pipe_minor_loss( network: str, pipe: str, minor_loss: float ) -> ChangeSet: ps = {"id": pipe, "minor_loss": minor_loss} return set_pipe(network, ChangeSet(ps)) @app.post("/setpipestatus/", response_model=None) async def fastapi_set_pipe_status(network: str, pipe: str, status: str) -> ChangeSet: ps = {"id": pipe, "status": status} print(status) print(ps) ret = set_pipe(network, ChangeSet(ps)) print(ret) return ret @app.get("/getpipeproperties/") async def fastapi_get_pipe_properties(network: str, pipe: str) -> dict[str, Any]: return get_pipe(network, pipe) # DingZQ, 2025-03-29 @app.get("/getallpipeproperties/") async def fastapi_get_all_pipe_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallpipeproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_pipes(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setpipeproperties/", response_model=None) async def fastapi_set_pipe_properties( network: str, pipe: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": pipe} | props return set_pipe(network, ChangeSet(ps)) ############################################################ # pump 4.[PUMPS] ############################################################ @app.get("/getpumpschema") async def fastapi_get_pump_schema(network: str) -> dict[str, dict[str, Any]]: return get_pump_schema(network) @app.post("/addpump/", response_model=None) async def fastapi_add_pump( network: str, pump: str, node1: str, node2: str, power: float = 0.0 ) -> ChangeSet: ps = {"id": pump, "node1": node1, "node2": node2, "power": power} return add_pump(network, ChangeSet(ps)) @app.post("/deletepump/", response_model=None) async def fastapi_delete_pump(network: str, pump: str) -> ChangeSet: ps = {"id": pump} return delete_pump(network, ChangeSet(ps)) @app.get("/getpumpnode1/") async def fastapi_get_pump_node1(network: str, pump: str) -> str | None: ps = get_pump(network, pump) return ps["node1"] @app.get("/getpumpnode2/") async def fastapi_get_pump_node2(network: str, pump: str) -> str | None: ps = get_pump(network, pump) return ps["node2"] @app.post("/setpumpnode1/", response_model=None) async def fastapi_set_pump_node1(network: str, pump: str, node1: str) -> ChangeSet: ps = {"id": pump, "node1": node1} return set_pump(network, ChangeSet(ps)) @app.post("/setpumpnode2/", response_model=None) async def fastapi_set_pump_node2(network: str, pump: str, node2: str) -> ChangeSet: ps = {"id": pump, "node2": node2} return set_pump(network, ChangeSet(ps)) @app.get("/getpumpproperties/") async def fastapi_get_pump_properties(network: str, pump: str) -> dict[str, Any]: return get_pump(network, pump) # DingZQ, 2025-03-29 @app.get("/getallpumpproperties/") async def fastapi_get_all_pump_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallpumpproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_pumps(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setpumpproperties/", response_model=None) async def fastapi_set_pump_properties( network: str, pump: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": pump} | props return set_pump(network, ChangeSet(ps)) ############################################################ # valve 4.[VALVES] ############################################################ @app.get("/getvalveschema") async def fastapi_get_valve_schema(network: str) -> dict[str, dict[str, Any]]: return get_valve_schema(network) @app.post("/addvalve/", response_model=None) async def fastapi_add_valve( network: str, valve: str, node1: str, node2: str, diameter: float = 0, v_type: str = VALVES_TYPE_PRV, setting: float = 0, minor_loss: float = 0, ) -> ChangeSet: ps = { "id": valve, "node1": node1, "node2": node2, "diameter": diameter, "v_type": v_type, "setting": setting, "minor_loss": minor_loss, } return add_valve(network, ChangeSet(ps)) @app.post("/deletevalve/", response_model=None) async def fastapi_delete_valve(network: str, valve: str) -> ChangeSet: ps = {"id": valve} return delete_valve(network, ChangeSet(ps)) @app.get("/getvalvenode1/") async def fastapi_get_valve_node1(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps["node1"] @app.get("/getvalvenode2/") async def fastapi_get_valve_node2(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps["node2"] @app.get("/getvalvediameter/") async def fastapi_get_valve_diameter(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps["diameter"] @app.get("/getvalvetype/") async def fastapi_get_valve_type(network: str, valve: str) -> str | None: ps = get_valve(network, valve) return ps["type"] @app.get("/getvalvesetting/") async def fastapi_get_valve_setting(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps["setting"] @app.get("/getvalveminorloss/") async def fastapi_get_valve_minor_loss(network: str, valve: str) -> float | None: ps = get_valve(network, valve) return ps["minor_loss"] @app.post("/setvalvenode1/", response_model=None) async def fastapi_set_valve_node1(network: str, valve: str, node1: str) -> ChangeSet: ps = {"id": valve, "node1": node1} return set_valve(network, ChangeSet(ps)) @app.post("/setvalvenode2/", response_model=None) async def fastapi_set_valve_node2(network: str, valve: str, node2: str) -> ChangeSet: ps = {"id": valve, "node2": node2} return set_valve(network, ChangeSet(ps)) @app.post("/setvalvenodediameter/", response_model=None) async def fastapi_set_valve_diameter( network: str, valve: str, diameter: float ) -> ChangeSet: ps = {"id": valve, "diameter": diameter} return set_valve(network, ChangeSet(ps)) @app.post("/setvalvetype/", response_model=None) async def fastapi_set_valve_type(network: str, valve: str, type: str) -> ChangeSet: ps = {"id": valve, "type": type} return set_valve(network, ChangeSet(ps)) @app.post("/setvalvesetting/", response_model=None) async def fastapi_set_valve_setting( network: str, valve: str, setting: float ) -> ChangeSet: ps = {"id": valve, "setting": setting} return set_valve(network, ChangeSet(ps)) @app.get("/getvalveproperties/") async def fastapi_get_valve_properties(network: str, valve: str) -> dict[str, Any]: return get_valve(network, valve) # DingZQ, 2025-03-29 @app.get("/getallvalveproperties/") async def fastapi_get_all_valve_properties(network: str) -> list[dict[str, Any]]: # 缓存查询结果提高性能 global redis_client cache_key = f"getallvalveproperties_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = get_all_valves(network) redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results @app.post("/setvalveproperties/", response_model=None) async def fastapi_set_valve_properties( network: str, valve: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": valve} | props return set_valve(network, ChangeSet(ps)) # node & link @app.post("/deletenode/", response_model=None) async def fastapi_delete_node(network: str, node: str) -> ChangeSet: ps = {"id": node} if is_junction(network, node): return delete_junction(network, ChangeSet(ps)) elif is_reservoir(network, node): return delete_reservoir(network, ChangeSet(ps)) elif is_tank(network, node): return delete_tank(network, ChangeSet(ps)) @app.post("/deletelink/", response_model=None) async def fastapi_delete_link(network: str, link: str) -> ChangeSet: ps = {"id": link} if is_pipe(network, link): return delete_pipe(network, ChangeSet(ps)) elif is_pump(network, link): return delete_pump(network, ChangeSet(ps)) elif is_valve(network, link): return delete_valve(network, ChangeSet(ps)) ############################################################ # tag 8.[TAGS] ############################################################ # # TAG_TYPE_NODE = api.TAG_TYPE_NODE # TAG_TYPE_LINK = api.TAG_TYPE_LINK # @app.get("/gettagschema/") async def fastapi_get_tag_schema(network: str) -> dict[str, dict[str, Any]]: return get_tag_schema(network) @app.get("/gettag/") async def fastapi_get_tag(network: str, t_type: str, id: str) -> dict[str, Any]: return get_tag(network, t_type, id) @app.get("/gettags/") async def fastapi_get_tags(network: str) -> list[dict[str, Any]]: tags = get_tags(network) print(tags) return tags # example: # set_tag(p, ChangeSet({'t_type': TAG_TYPE_NODE, 'id': 'j1', 'tag': 'j1t' })) # set_tag(p, ChangeSet({'t_type': TAG_TYPE_LINK, 'id': 'p0', 'tag': 'p0t' })) @app.post("/settag/", response_model=None) async def fastapi_set_tag(network: str, req: Request) -> ChangeSet: props = await req.json() return set_tag(network, ChangeSet(props)) ############################################################ # demand 9.[DEMANDS] ############################################################ @app.get("/getdemandschema") async def fastapi_get_demand_schema(network: str) -> dict[str, dict[str, Any]]: return get_demand_schema(network) @app.get("/getdemandproperties/") async def fastapi_get_demand_properties(network: str, junction: str) -> dict[str, Any]: return get_demand(network, junction) # example: set_demand(p, ChangeSet({'junction': 'j1', 'demands': [{'demand': 10.0, 'pattern': None, 'category': 'x'}, {'demand': 20.0, 'pattern': None, 'category': None}]})) @app.post("/setdemandproperties/", response_model=None) async def fastapi_set_demand_properties( network: str, junction: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"junction": junction} | props return set_demand(network, ChangeSet(ps)) ############################################################ # status 10.[STATUS] init_status ############################################################ @app.get("/getstatusschema") async def fastapi_get_status_schema(network: str) -> dict[str, dict[str, Any]]: return get_status_schema(network) @app.get("/getstatus/") async def fastapi_get_status(network: str, link: str) -> dict[str, Any]: return get_status(network, link) # example: set_status(p, ChangeSet({'link': 'p0', 'status': LINK_STATUS_OPEN, 'setting': 10.0})) @app.post("/setstatus/", response_model=None) async def fastapi_set_status_properties( network: str, link: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"link": link} | props return set_status(network, ChangeSet(ps)) ############################################################ # pattern 11.[PATTERNS] ############################################################ @app.get("/getpatternschema") async def fastapi_get_pattern_schema(network: str) -> dict[str, dict[str, Any]]: return get_pattern_schema(network) @app.post("/addpattern/", response_model=None) async def fastapi_add_pattern(network: str, pattern: str, req: Request) -> ChangeSet: props = await req.json() ps = { "id": pattern, } | props return add_pattern(network, ChangeSet(ps)) @app.post("/deletepattern/", response_model=None) async def fastapi_delete_pattern(network: str, pattern: str) -> ChangeSet: ps = {"id": pattern} return delete_pattern(network, ChangeSet(ps)) @app.get("/getpatternproperties/") async def fastapi_get_pattern_properties(network: str, pattern: str) -> dict[str, Any]: return get_pattern(network, pattern) # example: set_pattern(p, ChangeSet({'id' : 'p0', 'factors': [1.0, 2.0, 3.0]})) @app.post("/setpatternproperties/", response_model=None) async def fastapi_set_pattern_properties( network: str, pattern: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": pattern} | props return set_pattern(network, ChangeSet(ps)) ############################################################ # curve 12.[CURVES] ############################################################ @app.get("/getcurveschema") async def fastapi_get_curve_schema(network: str) -> dict[str, dict[str, Any]]: return get_curve_schema(network) @app.post("/addcurve/", response_model=None) async def fastapi_add_curve(network: str, curve: str, req: Request) -> ChangeSet: props = await req.json() print(props) ps = { "id": curve, } | props print(ps) return add_curve(network, ChangeSet(ps)) @app.post("/deletecurve/", response_model=None) async def fastapi_delete_curve(network: str, curve: str) -> ChangeSet: ps = {"id": curve} return delete_curve(network, ChangeSet(ps)) @app.get("/getcurveproperties/") async def fastapi_get_curve_properties(network: str, curve: str) -> dict[str, Any]: return get_curve(network, curve) # example: set_curve(p, ChangeSet({'id' : 'c0', 'c_type' : CURVE_TYPE_PUMP, 'coords': [{'x': 1.0, 'y': 2.0}, {'x': 2.0, 'y': 1.0}]})) @app.post("/setcurveproperties/", response_model=None) async def fastapi_set_curve_properties( network: str, curve: str, req: Request ) -> ChangeSet: props = await req.json() # c_type放到request中 ps = {"id": curve} | props return set_curve(network, ChangeSet(ps)) ############################################################ # control 13.[CONTROLS] ############################################################ @app.get("/getcontrolschema/") async def fastapi_get_control_schema(network: str) -> dict[str, dict[str, Any]]: return get_control_schema(network) @app.get("/getcontrolproperties/") async def fastapi_get_control_properties(network: str) -> dict[str, Any]: return get_control(network) # example: set_control(p, ChangeSet({'control': 'x'})) @app.post("/setcontrolproperties/", response_model=None) async def fastapi_set_control_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_control(network, ChangeSet(props)) ############################################################ # rule 14.[RULES] ############################################################ @app.get("/getruleschema/") async def fastapi_get_rule_schema(network: str) -> dict[str, dict[str, Any]]: return get_rule_schema(network) @app.get("/getruleproperties/") async def fastapi_get_rule_properties(network: str) -> dict[str, Any]: return get_rule(network) # example: set_rule(p, ChangeSet({'rule': 'x'})) @app.post("/setruleproperties/", response_model=None) async def fastapi_set_rule_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_rule(network, ChangeSet(props)) ############################################################ # energy 15.[ENERGY] ############################################################ @app.get("/getenergyschema/") async def fastapi_get_energy_schema(network: str) -> dict[str, dict[str, Any]]: return get_energy_schema(network) @app.get("/getenergyproperties/") async def fastapi_get_energy_properties(network: str) -> dict[str, Any]: return get_energy(network) @app.post("/setenergyproperties/", response_model=None) async def fastapi_set_energy_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_energy(network, ChangeSet(props)) @app.get("/getpumpenergyschema/") async def fastapi_get_pump_energy_schema(network: str) -> dict[str, dict[str, Any]]: return get_pump_energy_schema(network) @app.get("/getpumpenergyproperties//") async def fastapi_get_pump_energy_proeprties(network: str, pump: str) -> dict[str, Any]: return get_pump_energy(network, pump) @app.get("/setpumpenergyproperties//", response_model=None) async def fastapi_set_pump_energy_properties( network: str, pump: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"id": pump} | props return set_pump_energy(network, ChangeSet(ps)) ############################################################ # emitter 16.[EMITTERS] ############################################################ @app.get("/getemitterschema") async def fastapi_get_emitter_schema(network: str) -> dict[str, dict[str, Any]]: return get_emitter_schema(network) @app.get("/getemitterproperties/") async def fastapi_get_emitter_properties(network: str, junction: str) -> dict[str, Any]: return get_emitter(network, junction) # example: set_emitter(p, ChangeSet({'junction': 'j1', 'coefficient': 10.0})) @app.post("/setemitterproperties/", response_model=None) async def fastapi_set_emitter_properties( network: str, junction: str, req: Request ) -> ChangeSet: props = await req.json() ps = {"junction": junction} | props return set_emitter(network, ChangeSet(ps)) ############################################################ # quality 17.[QUALITY] ############################################################ @app.get("/getqualityschema/") async def fastapi_get_quality_schema(network: str) -> dict[str, dict[str, Any]]: return get_quality_schema(network) @app.get("/getqualityproperties/") async def fastapi_get_quality_properties(network: str, node: str) -> dict[str, Any]: return get_quality(network, node) # example: set_quality(p, ChangeSet({'node': 'j1', 'quality': 10.0})) @app.post("/setqualityproperties/", response_model=None) async def fastapi_set_quality_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_quality(network, ChangeSet(props)) ############################################################ # source 18.[SOURCES] ############################################################ @app.get("/getsourcechema/") async def fastapi_get_source_schema(network: str) -> dict[str, dict[str, Any]]: return get_source_schema(network) @app.get("/getsource/") async def fastapi_get_source(network: str, node: str) -> dict[str, Any]: return get_source(network, node) @app.post("/setsource/", response_model=None) async def fastapi_set_source(network: str, req: Request) -> ChangeSet: props = await req.json() return set_source(network, ChangeSet(props)) # example: add_source(p, ChangeSet({'node': 'j0', 's_type': SOURCE_TYPE_CONCEN, 'strength': 10.0, 'pattern': 'p0'})) @app.post("/addsource/", response_model=None) async def fastapi_add_source(network: str, req: Request) -> ChangeSet: props = await req.json() return add_source(network, ChangeSet(props)) @app.post("/deletesource/", response_model=None) async def fastapi_delete_source(network: str, node: str) -> ChangeSet: props = {"node": node} return delete_source(network, ChangeSet(props)) ############################################################ # reaction 19.[REACTIONS] ############################################################ @app.get("/getreactionschema/") async def fastapi_get_reaction_schema(network: str) -> dict[str, dict[str, Any]]: return get_reaction_schema(network) @app.get("/getreaction/") async def fastapi_get_reaction(network: str) -> dict[str, Any]: return get_reaction(network) @app.post("/setreaction/", response_model=None) # set_reaction(p, ChangeSet({ 'ORDER BULK' : '10' })) async def fastapi_set_reaction(network: str, req: Request) -> ChangeSet: props = await req.json() return set_reaction(network, ChangeSet(props)) @app.get("/getpipereactionschema/") async def fastapi_get_pipe_reaction_schema(network: str) -> dict[str, dict[str, Any]]: return get_pipe_reaction_schema(network) @app.get("/getpipereaction/") async def fastapi_get_pipe_reaction(network: str, pipe: str) -> dict[str, Any]: return get_pipe_reaction(network, pipe) @app.post("/setpipereaction/", response_model=None) async def fastapi_set_pipe_reaction(network: str, req: Request) -> ChangeSet: props = await req.json() return set_pipe_reaction(network, ChangeSet(props)) @app.get("/gettankreactionschema/") async def fastapi_get_tank_reaction_schema(network: str) -> dict[str, dict[str, Any]]: return get_tank_reaction_schema(network) @app.get("/gettankreaction/") async def fastapi_get_tank_reaction(network: str, tank: str) -> dict[str, Any]: return get_tank_reaction(network, tank) @app.post("/settankreaction/", response_model=None) async def fastapi_set_tank_reaction(network: str, req: Request) -> ChangeSet: props = await req.json() return set_tank_reaction(network, ChangeSet(props)) ############################################################ # mixing 20.[MIXING] ############################################################ @app.get("/getmixingschema/") async def fastapi_get_mixing_schema(network: str) -> dict[str, dict[str, Any]]: return get_mixing_schema(network) @app.get("/getmixing/") async def fastapi_get_mixing(network: str, tank: str) -> dict[str, Any]: return get_mixing(network, tank) @app.post("/setmixing/", response_model=None) async def fastapi_set_mixing(network: str, req: Request) -> ChangeSet: props = await req.json() return api.set_mixing(network, ChangeSet(props)) # example: add_mixing(p, ChangeSet({'tank': 't0', 'model': MIXING_MODEL_MIXED, 'value': 10.0})) @app.post("/addmixing/", response_model=None) async def fastapi_add_mixing(network: str, req: Request) -> ChangeSet: props = await req.json() return add_mixing(network, ChangeSet(props)) @app.post("/deletemixing/", response_model=None) async def fastapi_delete_mixing(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_mixing(network, ChangeSet(props)) ############################################################ # time 21.[TIME] ############################################################ @app.get("/gettimeschema") async def fastapi_get_time_schema(network: str) -> dict[str, dict[str, Any]]: return get_time_schema(network) @app.get("/gettimeproperties/") async def fastapi_get_time_properties(network: str) -> dict[str, Any]: return get_time(network) @app.post("/settimeproperties/", response_model=None) async def fastapi_set_time_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_time(network, ChangeSet(props)) ############################################################ # option 23.[OPTIONS] ############################################################ @app.get("/getoptionschema/") async def fastapi_get_option_schema(network: str) -> dict[str, dict[str, Any]]: return get_option_v3_schema(network) @app.get("/getoptionproperties/") async def fastapi_get_option_properties(network: str) -> dict[str, Any]: return get_option_v3(network) @app.post("/setoptionproperties/", response_model=None) async def fastapi_set_option_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_option_v3(network, ChangeSet(props)) ############################################################ # coord 24.[COORDINATES] ############################################################ @app.get("/getnodecoord/") async def fastapi_get_node_coord(network: str, node: str) -> dict[str, float] | None: return get_node_coord(network, node) # DingZQ, 2025-01-27, get all node coord/links # nodes: id:type:x:y # links: id:type:node1:node2 # node type: junction, reservoir, tank # link type: pipe, pump, valve @app.get("/getnetworkgeometries/", dependencies=[Depends(verify_token)]) async def fastapi_get_network_geometries(network: str) -> dict[str, Any] | None: # 获取所有节点坐标# 缓存查询结果提高性能 global redis_client cache_key = f"getnetworkgeometries_{network}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict coords = get_network_node_coords(network) nodes = [] for node_id, coord in coords.items(): nodes.append(f"{node_id}:{coord['type']}:{coord['x']}:{coord['y']}") links = get_network_link_nodes(network) # return list of scadas. scada : id, x, y # scadas = get_all_scada_elements(network) # data from WMH's scada info scadas = get_all_scada_info(network) results = {"nodes": nodes, "links": links, "scadas": scadas} # 缓存查询结果提高性能 redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results # DingZQ, 2024-12-31, get major node coord # id:type:x:y # type: junction, reservoir, tank @app.get("/getmajornodecoords/") async def fastapi_get_major_node_coords( network: str, diameter: int ) -> list[str] | None: start_time = time.time() coords = get_major_node_coords(network, diameter) end_time = time.time() logger.info("get_major_node_coords: %s, time: %s", coords, end_time - start_time) result = [] for node_id, coord in coords.items(): result.append(f"{node_id}:{coord['type']}:{coord['x']}:{coord['y']}") return result # DingZQ, 2025-01-03, get network in extent @app.get("/getnetworkinextent/") async def fastapi_get_network_in_extent( network: str, x1: float, y1: float, x2: float, y2: float ) -> dict[str, Any] | None: nodes = api.get_nodes_in_extent(network, x1, y1, x2, y2) links = api.get_links_in_extent(network, x1, y1, x2, y2) return {"nodes": nodes, "links": links} # DingZQ, 2024-12-08, get all links' start and end node # link_id:link_type:node_id1:node_id2 @app.get("/getnetworklinknodes/") async def fastapi_get_network_link_nodes(network: str) -> list[str] | None: return get_network_link_nodes(network) # DingZQ 2024-12-31 # 获取直径大于800的管道 @app.get("/getmajorpipenodes/") async def fastapi_get_major_pipe_nodes(network: str, diameter: int) -> list[str] | None: start_time = time.time() result = get_major_pipe_nodes(network, diameter) end_time = time.time() logger.info("get_major_pipe_nodes: %s, time: %s", result, end_time - start_time) return result ############################################################ # vertex 25.[VERTICES] ############################################################ @app.get("/getvertexschema/") async def fastapi_get_vertex_schema(network: str) -> dict[str, dict[str, Any]]: return get_vertex_schema(network) @app.get("/getvertexproperties/") async def fastapi_get_vertex_properties(network: str, link: str) -> dict[str, Any]: return get_vertex(network, link) # set_vertex(p, ChangeSet({'link' : 'p0', 'coords': [{'x': 1.0, 'y': 2.0}, {'x': 2.0, 'y': 1.0}]})) @app.post("/setvertexproperties/", response_model=None) async def fastapi_set_vertex_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_vertex(network, ChangeSet(props)) @app.post("/addvertex/", response_model=None) async def fastapi_add_vertex(network: str, req: Request) -> ChangeSet: props = await req.json() return add_vertex(network, ChangeSet(props)) @app.post("/deletevertex/", response_model=None) async def fastapi_delete_vertex(network: str, req: Request) -> ChangeSet: props = await req.json() return api.delete_vertex(network, ChangeSet(props)) @app.get("/getallvertexlinks/", response_class=PlainTextResponse) async def fastapi_get_all_vertex_links(network: str) -> list[str]: return json.dumps(get_all_vertex_links(network)) @app.get("/getallvertices/", response_class=PlainTextResponse) async def fastapi_get_all_vertices(network: str) -> list[dict[str, Any]]: return json.dumps(get_all_vertices(network)) ############################################################ # label 26.[LABELS] ############################################################ @app.get("/getlabelschema/") async def fastapi_get_label_schema(network: str) -> dict[str, dict[str, Any]]: return get_label_schema(network) @app.get("/getlabelproperties/") async def fastapi_get_label_properties( network: str, x: float, y: float ) -> dict[str, Any]: return get_label(network, x, y) @app.post("/setlabelproperties/", response_model=None) async def fastapi_set_label_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_label(network, ChangeSet(props)) @app.post("/addlabel/", response_model=None) async def fastapi_add_label(network: str, req: Request) -> ChangeSet: props = await req.json() return add_label(network, ChangeSet(props)) @app.post("/deletelabel/", response_model=None) async def fastapi_delete_label(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_label(network, ChangeSet(props)) ############################################################ # backdrop 27.[BACKDROP] ############################################################ @app.get("/getbackdropschema/") async def fastapi_get_backdrop_schema(network: str) -> dict[str, dict[str, Any]]: return get_backdrop_schema(network) @app.get("/getbackdropproperties/") async def fastapi_get_backdrop_properties(network: str) -> dict[str, Any]: return get_backdrop(network) @app.post("/setbackdropproperties/", response_model=None) async def fastapi_set_backdrop_properties(network: str, req: Request) -> ChangeSet: props = await req.json() return set_backdrop(network, ChangeSet(props)) ############################################################ # scada_device 29 ############################################################ @app.get("/getscadadeviceschema/") async def fastapi_get_scada_device_schema(network: str) -> dict[str, dict[str, Any]]: return get_scada_device_schema(network) @app.get("/getscadadevice/") async def fastapi_get_scada_device(network: str, id: str) -> dict[str, Any]: return get_scada_device(network, id) @app.post("/setscadadevice/", response_model=None) async def fastapi_set_scada_device(network: str, req: Request) -> ChangeSet: props = await req.json() return set_scada_device(network, ChangeSet(props)) @app.post("/addscadadevice/", response_model=None) async def fastapi_add_scada_device(network: str, req: Request) -> ChangeSet: props = await req.json() return add_scada_device(network, ChangeSet(props)) @app.post("/deletescadadevice/", response_model=None) async def fastapi_delete_scada_device(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_scada_device(network, ChangeSet(props)) @app.post("/cleanscadadevice/", response_model=None) async def fastapi_clean_scada_device(network: str) -> ChangeSet: return clean_scada_device(network) @app.get("/getallscadadeviceids/") async def fastapi_get_all_scada_device_ids(network: str) -> list[str]: return get_all_scada_device_ids(network) @app.get("/getallscadadevices/", response_class=PlainTextResponse) async def fastapi_get_all_scada_devices(network: str) -> list[dict[str, Any]]: return json.dumps(get_all_scada_devices(network)) ############################################################ # scada_device_data 30 ############################################################ @app.get("/getscadadevicedataschema/") async def fastapi_get_scada_device_data_schema( network: str, ) -> dict[str, dict[str, Any]]: return get_scada_device_data_schema(network) @app.get("/getscadadevicedata/") async def fastapi_get_scada_device_data(network: str, id: str) -> dict[str, Any]: return get_scada_device_data(network, id) # example: set_scada_device_data(p, ChangeSet({'device_id': 'sm_device', 'data': [{ 'time': '2023-02-10 00:02:22', 'value': 100.0 }, { 'time': '2023-02-10 00:03:22', 'value': 200.0 }]})) # time format must be 'YYYY-MM-DD HH:MM:SS' @app.post("/setscadadevicedata/", response_model=None) async def fastapi_set_scada_device_data(network: str, req: Request) -> ChangeSet: props = await req.json() return set_scada_device_data(network, ChangeSet(props)) # example: add_scada_device_data(p, ChangeSet({'device_id': 'sm_device', 'time': '2023-02-10 00:02:22', 'value': 100.0})) @app.post("/addscadadevicedata/", response_model=None) async def fastapi_add_scada_device_data(network: str, req: Request) -> ChangeSet: props = await req.json() return add_scada_device_data(network, ChangeSet(props)) # example: delete_scada_device_data(p, ChangeSet({'device_id': 'sm_device', 'time': '2023-02-12 00:02:22'})) @app.post("/deletescadadevicedata/", response_model=None) async def fastapi_delete_scada_device_data(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_scada_device_data(network, ChangeSet(props)) @app.post("/cleanscadadevicedata/", response_model=None) async def fastapi_clean_scada_device_data(network: str) -> ChangeSet: return clean_scada_device_data(network) ############################################################ # scada_element 31 ############################################################ @app.get("/getscadaelementschema/") async def fastapi_get_scada_element_schema(network: str) -> dict[str, dict[str, Any]]: return get_scada_element_schema(network) @app.get("/getscadaelements/") async def fastapi_get_scada_elements(network: str) -> list[str]: return get_all_scada_elements(network) @app.get("/getscadaelement/") async def fastapi_get_scada_element(network: str, id: str) -> dict[str, Any]: return get_scada_element(network, id) @app.post("/setscadaelement/", response_model=None) async def fastapi_set_scada_element(network: str, req: Request) -> ChangeSet: props = await req.json() return set_scada_element(network, ChangeSet(props)) @app.post("/addscadaelement/", response_model=None) async def fastapi_add_scada_element(network: str, req: Request) -> ChangeSet: props = await req.json() return add_scada_element(network, ChangeSet(props)) @app.post("/deletescadaelement/", response_model=None) async def fastapi_delete_scada_element(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_scada_element(network, ChangeSet(props)) @app.post("/cleanscadaelement/", response_model=None) async def fastapi_clean_scada_element(network: str) -> ChangeSet: return clean_scada_element(network) ############################################################ # general_region 32 ############################################################ @app.get("/getregionschema/") async def fastapi_get_region_schema(network: str) -> dict[str, dict[str, Any]]: return get_region_schema(network) @app.get("/getregion/") async def fastapi_get_region(network: str, id: str) -> dict[str, Any]: return get_region(network, id) @app.post("/setregion/", response_model=None) async def fastapi_set_region(network: str, req: Request) -> ChangeSet: props = await req.json() return set_region(network, ChangeSet(props)) # example: add_region(p, ChangeSet({'id': 'r', 'boundary': [(0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 0.0)]})) @app.post("/addregion/", response_model=None) async def fastapi_add_region(network: str, req: Request) -> ChangeSet: props = await req.json() return add_region(network, ChangeSet(props)) @app.post("/deleteregion/", response_model=None) async def fastapi_delete_region(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_region(network, ChangeSet(props)) ############################################################ # district_metering_area 33 ############################################################ @app.get("/calculatedistrictmeteringareafornodes/") async def fastapi_calculate_district_metering_area_for_nodes( network: str, req: Request ) -> list[list[str]]: props = await req.json() nodes = props["nodes"] part_count = props["part_count"] part_type = props["part_type"] return calculate_district_metering_area_for_nodes( network, nodes, part_count, part_type ) @app.get("/calculatedistrictmeteringareaforregion/") async def fastapi_calculate_district_metering_area_for_region( network: str, req: Request ) -> list[list[str]]: props = await req.json() region = props["region"] part_count = props["part_count"] part_type = props["part_type"] return calculate_district_metering_area_for_region( network, region, part_count, part_type ) @app.get("/calculatedistrictmeteringareafornetwork/") async def fastapi_calculate_district_metering_area_for_network( network: str, req: Request ) -> list[list[str]]: props = await req.json() part_count = props["part_count"] part_type = props["part_type"] return calculate_district_metering_area_for_network(network, part_count, part_type) @app.get("/getdistrictmeteringareaschema/") async def fastapi_get_district_metering_area_schema( network: str, ) -> dict[str, dict[str, Any]]: return get_district_metering_area_schema(network) @app.get("/getdistrictmeteringarea/") async def fastapi_get_district_metering_area(network: str, id: str) -> dict[str, Any]: return get_district_metering_area(network, id) @app.post("/setdistrictmeteringarea/", response_model=None) async def fastapi_set_district_metering_area(network: str, req: Request) -> ChangeSet: props = await req.json() return set_district_metering_area(network, ChangeSet(props)) @app.post("/adddistrictmeteringarea/", response_model=None) async def fastapi_add_district_metering_area(network: str, req: Request) -> ChangeSet: props = await req.json() # boundary should be [(x,y), (x,y)] boundary = props["boundary"] newBoundary = [] for pt in boundary: newBoundary.append((pt[0], pt[1])) props["boundary"] = newBoundary return add_district_metering_area(network, ChangeSet(props)) @app.post("/deletedistrictmeteringarea/", response_model=None) async def fastapi_delete_district_metering_area( network: str, req: Request ) -> ChangeSet: props = await req.json() return delete_district_metering_area(network, ChangeSet(props)) @app.get("/getalldistrictmeteringareaids/") async def fastapi_get_all_district_metering_area_ids(network: str) -> list[str]: return get_all_district_metering_area_ids(network) @app.get("/getalldistrictmeteringareas/") async def getalldistrictmeteringareas(network: str) -> list[dict[str, Any]]: return get_all_district_metering_areas(network) @app.post("/generatedistrictmeteringarea/", response_model=None) async def fastapi_generate_district_metering_area( network: str, part_count: int, part_type: int, inflate_delta: float ) -> ChangeSet: return generate_district_metering_area( network, part_count, part_type, inflate_delta ) @app.post("/generatesubdistrictmeteringarea/", response_model=None) async def fastapi_generate_sub_district_metering_area( network: str, dma: str, part_count: int, part_type: int, inflate_delta: float ) -> ChangeSet: print(network) print(dma) print(part_count) print(part_type) print(inflate_delta) return generate_sub_district_metering_area( network, dma, part_count, part_type, inflate_delta ) ############################################################ # service_area 34 ############################################################ @app.get("/calculateservicearea/") async def fastapi_calculate_service_area( network: str, time_index: int ) -> dict[str, Any]: return calculate_service_area(network, time_index) @app.get("/getserviceareaschema/") async def fastapi_get_service_area_schema(network: str) -> dict[str, dict[str, Any]]: return get_service_area_schema(network) @app.get("/getservicearea/") async def fastapi_get_service_area(network: str, id: str) -> dict[str, Any]: return get_service_area(network, id) @app.post("/setservicearea/", response_model=None) async def fastapi_set_service_area(network: str, req: Request) -> ChangeSet: props = await req.json() return set_service_area(network, ChangeSet(props)) @app.post("/addservicearea/", response_model=None) async def fastapi_add_service_area(network: str, req: Request) -> ChangeSet: props = await req.json() return add_service_area(network, ChangeSet(props)) @app.post("/deleteservicearea/", response_model=None) async def fastapi_delete_service_area(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_service_area(network, ChangeSet(props)) @app.get("/getallserviceareas/") async def fastapi_get_all_service_areas(network: str) -> list[dict[str, Any]]: return get_all_service_areas(network) @app.post("/generateservicearea/", response_model=None) async def fastapi_generate_service_area( network: str, inflate_delta: float ) -> ChangeSet: return generate_service_area(network, inflate_delta) ############################################################ # virtual_district 35 ############################################################ @app.get("/calculatevirtualdistrict/") async def fastapi_calculate_virtual_district( network: str, centers: list[str] ) -> dict[str, list[Any]]: return calculate_virtual_district(network, centers) @app.get("/getvirtualdistrictschema/") async def fastapi_get_virtual_district_schema( network: str, ) -> dict[str, dict[str, Any]]: return get_virtual_district_schema(network) @app.get("/getvirtualdistrict/") async def fastapi_get_virtual_district(network: str, id: str) -> dict[str, Any]: return get_virtual_district(network, id) @app.post("/setvirtualdistrict/", response_model=None) async def fastapi_set_virtual_district(network: str, req: Request) -> ChangeSet: props = await req.json() return set_virtual_district(network, ChangeSet(props)) @app.post("/addvirtualdistrict/", response_model=None) async def fastapi_add_virtual_district(network: str, req: Request) -> ChangeSet: props = await req.json() return add_virtual_district(network, ChangeSet(props)) @app.post("/deletevirtualdistrict/", response_model=None) async def fastapi_delete_virtual_district(network: str, req: Request) -> ChangeSet: props = await req.json() return delete_virtual_district(network, ChangeSet(props)) @app.get("/getallvirtualdistrict/") async def fastapi_get_all_virtual_district(network: str) -> list[dict[str, Any]]: return get_all_virtual_districts(network) @app.post("/generatevirtualdistrict/", response_model=None) async def fastapi_generate_virtual_district( network: str, inflate_delta: float, req: Request ) -> ChangeSet: props = await req.json() return generate_virtual_district(network, props["centers"], inflate_delta) ############################################################ # water_distribution_area 36 ############################################################ @app.get("/calculatedemandtonodes/") async def fastapi_calculate_demand_to_nodes( network: str, req: Request ) -> dict[str, float]: props = await req.json() demand = props["demand"] nodes = props["nodes"] return calculate_demand_to_nodes(network, demand, nodes) @app.get("/calculatedemandtoregion/") async def fastapi_calculate_demand_to_region( network: str, req: Request ) -> dict[str, float]: props = await req.json() demand = props["demand"] region = props["region"] return calculate_demand_to_region(network, demand, region) @app.get("/calculatedemandtonetwork/") async def fastapi_calculate_demand_to_network( network: str, demand: float ) -> dict[str, float]: return calculate_demand_to_network(network, demand) ########################################################### # scada_info 38 || written by WMH ############################################################ @app.get("/getscadainfoschema/") async def fastapi_get_scada_info_schema(network: str) -> dict[str, dict[str, Any]]: return get_scada_info_schema(network) @app.get("/getscadainfo/") async def fastapi_get_scada_info(network: str, id: str) -> dict[str, float]: return get_scada_info(network, id) @app.get("/getallscadainfo/") async def fastapi_get_all_scada_info(network: str) -> list[dict[str, float]]: return get_all_scada_info(network) ########################################################### # user 39 ########################################################### @app.get("/getuserschema/") async def fastapi_get_user_schema(network: str) -> dict[str, dict[Any, Any]]: return get_user_schema(network) @app.get("/getuser/") async def fastapi_get_user(network: str, user_name: str) -> dict[Any, Any]: return get_user(network, user_name) @app.get("/getallusers/") async def fastapi_get_all_users(network: str) -> list[dict[Any, Any]]: return get_all_users(network) ############################################################ # scheme 40 ############################################################ @app.get("/getschemeschema/") async def fastapi_get_scheme_schema(network: str) -> dict[str, dict[Any, Any]]: return get_scheme_schema(network) @app.get("/getscheme/") async def fastapi_get_scheme(network: str, schema_name: str) -> dict[Any, Any]: return get_scheme(network, schema_name) @app.get("/getallschemes/") async def fastapi_get_all_schemes(network: str) -> list[dict[Any, Any]]: return get_all_schemes(network) ############################################################ # pipe_risk_probability 41 ############################################################ @app.get("/getpiperiskprobabilitynow/") async def fastapi_get_pipe_risk_probability_now( network: str, pipe_id: str ) -> dict[str, Any]: return get_pipe_risk_probability_now(network, pipe_id) @app.get("/getpiperiskprobability/") async def fastapi_get_pipe_risk_probability( network: str, pipe_id: str ) -> dict[str, Any]: return get_pipe_risk_probability(network, pipe_id) @app.get("/getpipesriskprobability/") async def fastapi_get_pipes_risk_probability( network: str, pipe_ids: str ) -> list[dict[str, Any]]: pipeids = pipe_ids.split(",") return get_pipes_risk_probability(network, pipeids) @app.get("/getnetworkpiperiskprobabilitynow/") async def fastapi_get_network_pipe_risk_probability_now( network: str, ) -> list[dict[str, Any]]: return get_network_pipe_risk_probability_now(network) # 返回一个字典,key 是管道的 id,value 是管道的几何信息 # 几何信息是一个字典,包含 start 和 end 两个 key,value 是管道的起点和终点的坐标 # 例如: # "GSD240730154246A51D2C324D1A": { # "start": [ # 106.424759007, # 29.815104642 # ], # "end": [ # 106.424824186, # 29.814950582 # ] # }, @app.get("/getpiperiskprobabilitygeometries/") async def fastapi_get_pipe_risk_probability_geometries(network: str) -> dict[str, Any]: return get_pipe_risk_probability_geometries(network) ############################################################ # sensor_placement 42 ############################################################ @app.get("/getallsensorplacements/") async def fastapi_get_all_sensor_placements(network: str) -> list[dict[Any, Any]]: return get_all_sensor_placements(network) ############################################################ # burst_locate_result 43 ############################################################ @app.get("/getallburstlocateresults/") async def fastapi_get_all_burst_locate_results(network: str) -> list[dict[Any, Any]]: return get_all_burst_locate_results(network) # inp file @app.post("/uploadinp/", status_code=status.HTTP_200_OK) async def fastapi_upload_inp(afile: bytes, name: str): filePath = inpDir + str(name) f = open(filePath, "wb") f.write(afile) f.close() return True @app.get("/downloadinp/", status_code=status.HTTP_200_OK) async def fastapi_download_inp(name: str, response: Response): filePath = inpDir + name if os.path.exists(filePath): return FileResponse( filePath, media_type="application/octet-stream", filename="inp.inp" ) else: response.status_code = status.HTTP_400_BAD_REQUEST return True # DingZQ, 2024-12-28, convert v3 to v2 @app.get("/convertv3tov2/", response_model=None) async def fastapi_convert_v3_to_v2(req: Request) -> ChangeSet: network = "v3Tov2" jo_root = await req.json() inp = jo_root["inp"] cs = convert_inp_v3_to_v2(inp) op = cs.operations[0] open_project(network) op["vertex"] = json.dumps(get_all_vertices(network)) op["scada"] = json.dumps(get_all_scada_elements(network)) op["dma"] = json.dumps(get_all_district_metering_areas(network)) op["sa"] = json.dumps(get_all_service_areas(network)) op["vd"] = json.dumps(get_all_virtual_districts(network)) op["legend"] = get_extension_data(network, "legend") db = get_extension_data(network, "scada_db") print(db) scada_db = "" if db: scada_db = db print(scada_db) op["scada_db"] = scada_db close_project(network) return cs @app.get("/getjson/") async def fastapi_get_json(): return JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={ "code": 400, "message": "this is message", "data": 123, }, ) ############################################################ # DingZQ, 2024-12-09, Add sample API to return real time data/simulation result # influx db operation ############################################################ @app.get("/getrealtimedata/") async def fastapi_get_realtimedata(): data = [random.randint(0, 100) for _ in range(100)] return data @app.get("/getsimulationresult/") async def fastapi_get_simulationresult(): data = [random.randint(0, 100) for _ in range(100)] return data # 下面几个query 函数,都是从 influxdb 中查询的,不与 network 绑定,用固定的network 名字 # DingZQ 2025-01-31 # def query_latest_record_by_ID(ID: str, type: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> dict: @app.get("/querynodelatestrecordbyid/") async def fastapi_query_node_latest_record_by_id(id: str): return influxdb_api.query_latest_record_by_ID(id, type="node") @app.get("/querylinklatestrecordbyid/") async def fastapi_query_link_latest_record_by_id(id: str): return influxdb_api.query_latest_record_by_ID(id, type="link") # query scada @app.get("/queryscadalatestrecordbyid/") async def fastapi_query_scada_latest_record_by_id(id: str): return influxdb_api.query_latest_record_by_ID(id, type="scada") # def query_all_record_by_time(query_time: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> tuple: @app.get("/queryallrecordsbytime/") async def fastapi_query_all_records_by_time(querytime: str) -> dict[str, list]: results: tuple = influxdb_api.query_all_records_by_time(query_time=querytime) return {"nodes": results[0], "links": results[1]} # def query_all_record_by_time_property(querytime: str, type: str, property: str, bucket: str = "realtime_simulation_result") -> tuple: @app.get("/queryallrecordsbytimeproperty/") async def fastapi_query_all_record_by_time_property( querytime: str, type: str, property: str, bucket: str = "realtime_simulation_result" ) -> dict[str, list]: results: tuple = influxdb_api.query_all_record_by_time_property( query_time=querytime, type=type, property=property, bucket=bucket ) return {"results": results} @app.get("/queryallschemerecordsbytimeproperty/") async def fastapi_query_all_scheme_record_by_time_property( querytime: str, type: str, property: str, schemename: str, bucket: str = "scheme_simulation_result", ) -> dict[str, list]: """ 查询指定方案某一时刻的所有记录,查询 'node' 或 'link' 的某一属性值 :param querytime: 查询时间,格式为 '2024-11-24T17:30:00+08:00' :param type: 查询类型 'node' 或 'link' :param property: 查询的属性字段名 :param schemename: 方案名称,如 "FANGAN1761124840355" :param bucket: 数据存储的bucket名称 :return: 包含查询结果的字典 """ results: list = influxdb_api.query_all_scheme_record_by_time_property( query_time=querytime, type=type, property=property, scheme_name=schemename, bucket=bucket, ) return {"results": results} @app.get("/querysimulationrecordsbyidtime/") async def fastapi_query_simulation_record_by_ids_time( id: str, querytime: str, type: str, bucket: str = "realtime_simulation_result" ) -> dict[str, list]: results: tuple = influxdb_api.query_simulation_result_by_ID_time( ID=id, type=type, query_time=querytime, bucket=bucket ) return {"results": results} @app.get("/queryschemesimulationrecordsbyidtime/") async def fastapi_query_scheme_simulation_record_by_ids_time( scheme_name: str, id: str, querytime: str, type: str, bucket: str = "scheme_simulation_result", ) -> dict[str, list]: results: tuple = influxdb_api.query_scheme_simulation_result_by_ID_time( scheme_name=scheme_name, ID=id, type=type, query_time=querytime, bucket=bucket ) return {"results": results} @app.get("/queryallrecordsbydate/") async def fastapi_query_all_records_by_date(querydate: str) -> dict: # 缓存查询结果提高性能 global redis_client is_today_or_future = time_api.is_today_or_future(querydate) logger.info(f"isToday or future: {is_today_or_future}") # 今天的不要去缓存 if not is_today_or_future: cache_key = f"queryallrecordsbydate_{querydate}" logger.info(f"cache_key: {cache_key}") data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 results = msgpack.unpackb(data, object_hook=decode_datetime) logger.info(f"return from cache redis") return results logger.info(f"query from influxdb") nodes_links: tuple = influxdb_api.query_all_records_by_date(query_date=querydate) results = {"nodes": nodes_links[0], "links": nodes_links[1]} # 今天的不要去缓存 if not is_today_or_future: logger.info(f"save to cache redis") redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) logger.info(f"return results") return results @app.get("/queryallrecordsbytimerange/") async def fastapi_query_all_records_by_time_range( starttime: str, endtime: str ) -> dict[str, list]: # 缓存查询结果提高性能 global redis_client # 今天的不要去缓存 if not time_api.is_today_or_future(starttime): cache_key = f"queryallrecordsbytimerange_{starttime}_{endtime}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict nodes_links: tuple = influxdb_api.query_all_records_by_time_range( starttime=starttime, endtime=endtime ) results = {"nodes": nodes_links[0], "links": nodes_links[1]} # 今天的不要去缓存 if not time_api.is_today_or_future(starttime): redis_client.set(cache_key, msgpack.packb(results, default=encode_datetime)) return results # 2025-03-15, DingZQ @app.get("/queryallrecordsbydatewithtype/") async def fastapi_query_all_records_by_date_with_type( querydate: str, querytype: str ) -> list: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_all_records_by_date_with_type( query_date=querydate, query_type=querytype ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results @app.get("/queryallrecordsbyidsdatetype/") async def fastapi_query_all_records_by_ids_date_type( ids: str, querydate: str, querytype: str ) -> list: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallrecordsbydatewithtype_{querydate}_{querytype}" data = redis_client.get(cache_key) results = [] if data: # 使用自定义的反序列化函数 results = msgpack.unpackb(data, object_hook=decode_datetime) else: results = influxdb_api.query_all_records_by_date_with_type( query_date=querydate, query_type=querytype ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) query_ids = ids.split(",") e_results = py_linq.Enumerable(results) lst_results = e_results.where(lambda x: x["ID"] in query_ids).to_list() return lst_results # 查询指定日期、类型、属性的所有记录 # 返回 [{'time': '2024-01-01T00:00:00Z', 'ID': '1', 'value': 1.0}, {'time': '2024-01-01T00:00:00Z', 'ID': '2', 'value': 2.0}] @app.get("/queryallrecordsbydateproperty/") async def fastapi_query_all_records_by_date_property( querydate: str, querytype: str, property: str ) -> list[dict]: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallrecordsbydateproperty_{querydate}_{querytype}_{property}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict result_dict = influxdb_api.query_all_record_by_date_property( query_date=querydate, type=querytype, property=property ) packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) return result_dict # def query_curve_by_ID_property_daterange(ID: str, type: str, property: str, start_date: str, end_date: str, bucket: str="realtime_data", client: InfluxDBClient=client) -> list: @app.get("/querynodecurvebyidpropertydaterange/") async def fastapi_query_node_curve_by_id_property_daterange( id: str, prop: str, startdate: str, enddate: str ): return influxdb_api.query_curve_by_ID_property_daterange( id, type="node", property=prop, start_date=startdate, end_date=enddate ) @app.get("/querylinkcurvebyidpropertydaterange/") async def fastapi_query_link_curve_by_id_property_daterange( id: str, prop: str, startdate: str, enddate: str ): return influxdb_api.query_curve_by_ID_property_daterange( id, type="link", property=prop, start_date=startdate, end_date=enddate ) # ids 用,隔开 # 返回 { 'id': value1, 'id2': value2 } # def query_SCADA_data_by_device_ID_and_time(query_ids_list: List[str], query_time: str, bucket: str="SCADA_data", client: InfluxDBClient=client) -> Dict[str, float]: @app.get("/queryscadadatabydeviceidandtime/") async def fastapi_query_scada_data_by_device_id_and_time(ids: str, querytime: str): query_ids = ids.split(",") logger.info(querytime) return influxdb_api.query_SCADA_data_by_device_ID_and_time( query_ids_list=query_ids, query_time=querytime ) # 2025/05/04 DingZQ # 对于SCAD的曲线数据,我们需要有4 套数据值 # 1. 原始数据 # 2. 补充的数据 (补充前面第一步缺失的数据) # 3. 清洗后的数据点 (用五角星表示) # 4. 模拟曲线 # 查询到的SCADA原始数据 # 数据1 @app.get("/queryscadadatabydeviceidandtimerange/") async def fastapi_query_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) # 查询到的SCADA补充的数据 # 数据2 # 注意: 这里的id是 scada_info中的 api_query_id @app.get("/queryfillingscadadatabydeviceidandtimerange/") async def fastapi_query_filling_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_filling_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) # 查询到的SCADA清洗后的数据点 # 数据3 # 注意: 这里的id是 scada_info中的 api_query_id @app.get("/querycleaningscadadatabydeviceidandtimerange/") async def fastapi_query_cleaning_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_cleaning_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) # 查询到的SCADA模拟数据(从 realtime_simulation bucket 中查找) @app.get("/querysimulationscadadatabydeviceidandtimerange/") async def fastapi_query_simulation_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_simulation_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) # 查询指定时间范围内,多个SCADA设备的清洗后的数据 # DingZQ, 2025-04-19 # 2025/05/04 DingZQ 这个是将原始数据跟清洗后的数据合并到一起,暂时不需要用这个API @app.get("/querycleanedscadadatabydeviceidandtimerange/") async def fastapi_query_cleaned_scada_data_by_device_id_and_time_range( ids: str, starttime: str, endtime: str ): print(f"query_ids: {ids}, starttime: {starttime}, endtime: {endtime}") query_ids = ids.split(",") return influxdb_api.query_cleaned_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids, start_time=starttime, end_time=endtime ) @app.get("/queryscadadatabydeviceidanddate/") async def fastapi_query_scada_data_by_device_id_and_date(ids: str, querydate: str): query_ids = ids.split(",") return influxdb_api.query_SCADA_data_by_device_ID_and_date( query_ids_list=query_ids, query_date=querydate ) # DingZQ, 2025-03-08 # 返回所有SCADA设备在指定日期的所有记录 @app.get("/queryallscadarecordsbydate/") async def fastapi_query_all_scada_records_by_date(querydate: str): global redis_client is_today_or_future = time_api.is_today_or_future(querydate) logger.info(f"isToday or future: {is_today_or_future}") # 今天的不要去缓存 if not is_today_or_future: cache_key = f"queryallscadarecordsbydate_{querydate}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) logger.info(f"return from cache redis") return loaded_dict logger.info(f"query from influxdb") result_dict = influxdb_api.query_all_SCADA_records_by_date(query_date=querydate) # 今天的不要去缓存 if not is_today_or_future: logger.info(f"save to cache redis") packed = msgpack.packb(result_dict, default=encode_datetime) redis_client.set(cache_key, packed) logger.info(f"return results") return result_dict # DingZQ, 2025-03-15 # Scheme @app.get("/queryallschemeallrecords/") async def fastapi_query_all_scheme_all_records( schemetype: str, schemename: str, querydate: str ) -> tuple: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallschemeallrecords_{schemetype}_{schemename}_{querydate}" data = redis_client.get(cache_key) if data: # 使用自定义的反序列化函数 loaded_dict = msgpack.unpackb(data, object_hook=decode_datetime) return loaded_dict results = influxdb_api.query_scheme_all_record( scheme_type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) return results # DingZQ, 2025-03-21 # 缓存是用的queryallschemeallrecords的缓存 @app.get("/queryschemeallrecordsproperty/") async def fastapi_query_all_scheme_all_records_property( schemetype: str, schemename: str, querydate: str, querytype: str, queryproperty: str ) -> list: # 缓存查询结果提高性能 global redis_client cache_key = f"queryallschemeallrecords_{schemetype}_{schemename}_{querydate}" data = redis_client.get(cache_key) all_results = None if data: # 使用自定义的反序列化函数 all_results = msgpack.unpackb(data, object_hook=decode_datetime) else: all_results = influxdb_api.query_scheme_all_record( scheme_type=schemetype, scheme_name=schemename, query_date=querydate ) packed = msgpack.packb(all_results, default=encode_datetime) redis_client.set(cache_key, packed) results = None if querytype == "node": results = all_results[0] elif querytype == "link": results = all_results[1] return results @app.post("/clearrediskey/") async def fastapi_clear_redis_key(key: str): redis_client.delete(key) return True @app.post("/clearrediskeys/") async def fastapi_clear_redis_keys(keys: str): # delete keys contains the key matched_keys = redis_client.keys(f"*{keys}*") redis_client.delete(*matched_keys) return True @app.post("/clearallredis/") async def fastapi_clear_all_redis(): redis_client.flushdb() return True @app.get("/queryredis/") async def fastapi_query_redis(): return redis_client.keys("*") @app.get("/queryinfluxdbbuckets/") async def fastapi_query_influxdb_buckets(): return influxdb_api.query_buckets() @app.get("/queryinfluxdbbucketmeasurements/") async def fastapi_query_influxdb_bucket_measurements(bucket: str): return influxdb_api.query_measurements(bucket=bucket) # DingZQ, 2024-12-31, generate openapi.json def generate_openapi_json(): openapi_json_path = "openapi.json" with open(openapi_json_path, "w") as file: json.dump(app.openapi(), file, indent=4) ############################################################ # real_time api 37 # example: http://127.0.0.1:8000/runsimulation?network=beibeizone&start_time=2024-04-01T08:00:00Z ############################################################ # 必须用这个PlainTextResponse,不然每个key都有引号 # @app.get("/runsimulation/", response_class = PlainTextResponse) # async def fastapi_run_project(network: str,start_time:str,end_time=None) -> str: # filename = 'c:/lock.simulation' # filename2 = 'c:/lock.simulation2' # if os.path.exists(filename2): # print('file exists') # raise HTTPException(status_code=409, detail="is in simulation") # else: # print('file doesnt exists') # #os.rename(filename, filename2) # result = run_simulation(network,start_time,end_time) # #os.rename(filename2, filename) # return result ############################################################ # real_time api 37 # example: http://127.0.0.1:8000/runsimulation?network=beibeizone&start_time=2024-04-01T08:00:00Z ############################################################ # 必须用这个PlainTextResponse,不然每个key都有引号 # @app.get("/runsimulation/", response_class = PlainTextResponse) # async def fastapi_run_project(network: str,start_time:str,end_time=None) -> str: # filename = 'c:/lock.simulation' # filename2 = 'c:/lock.simulation2' # if os.path.exists(filename2): # print('file exists') # raise HTTPException(status_code=409, detail="is in simulation") # else: # print('file doesnt exists') # #os.rename(filename, filename2) # result = run_simulation_ex(name=network, simulation_type='realtime', start_datetime=start_time, end_datetime=end_time) # #os.rename(filename2, filename) # return result # DingZQ, 2025-05-17 class Download_History_Data_Manually(BaseModel): """ download_date:样式如 datetime(2025, 5, 4) """ download_date: datetime # DingZQ, 2025-05-17 @app.post("/download_history_data_manually/") async def fastapi_download_history_data_manually( data: Download_History_Data_Manually, ) -> None: item = data.dict() # 创建东八区时区对象 tz = timezone(timedelta(hours=8)) begin_dt = datetime.combine(item["download_date"].date(), time.min).replace( tzinfo=tz ) end_dt = datetime.combine(item["download_date"].date(), time(23, 59, 59)).replace( tzinfo=tz ) # 2. 转为字符串 begin_time = begin_dt.isoformat() end_time = end_dt.isoformat() influxdb_api.download_history_data_manually( begin_time=begin_time, end_time=end_time ) # DingZQ, 2025-05-17 # 新增开始时间和持续时间参数 class Run_Simulation_Manually_by_Date(BaseModel): """ name:数据库名称 start_time:开始时间,样式如 2025-05-04T08:00:00+08:00 duration:持续时间,单位为分钟 """ name: str start_time: str duration: int @field_validator("start_time") @classmethod def validate_start_time_timezone(cls, value: str) -> str: time_api.parse_aware_time(value, field_name="start_time") return value def run_simulation_manually_by_date( network_name: str, start_time: datetime, duration: int ) -> None: # 计算结束时间 end_datetime = start_time + timedelta(minutes=duration) # 生成时间点,每15分钟一个 current_time = start_time while current_time < end_datetime: ## 执行函数调用 simulation.run_simulation( name=network_name, simulation_type="realtime", modify_pattern_start_time=current_time.isoformat(timespec="seconds"), ) # 增加15分钟 current_time += timedelta(minutes=15) @app.post("/runsimulationmanuallybydate/") async def fastapi_run_simulation_manually_by_date( data: Run_Simulation_Manually_by_Date, ) -> dict[str, str]: item = data.model_dump() print(f"item: {item}") filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") try: simulation.query_corresponding_element_id_and_query_id(item["name"]) simulation.query_corresponding_pattern_id_and_query_id(item["name"]) region_result = simulation.query_non_realtime_region(item["name"]) globals.source_outflow_region_id = simulation.get_source_outflow_region_id( item["name"], region_result ) globals.realtime_region_pipe_flow_and_demand_id = ( simulation.query_realtime_region_pipe_flow_and_demand_id( item["name"], region_result ) ) globals.pipe_flow_region_patterns = ( simulation.query_pipe_flow_region_patterns(item["name"]) ) globals.non_realtime_region_patterns = ( simulation.query_non_realtime_region_patterns( item["name"], region_result ) ) ( globals.source_outflow_region_patterns, globals.realtime_region_pipe_flow_and_demand_patterns, ) = simulation.get_realtime_region_patterns( item["name"], globals.source_outflow_region_id, globals.realtime_region_pipe_flow_and_demand_id, ) start_time = time_api.parse_utc_time( item["start_time"], field_name="start_time" ) thread = threading.Thread( target=lambda: run_simulation_manually_by_date( item["name"], start_time, item["duration"] ) ) thread.start() thread.join() # 等待线程完成 return {"status": "success"} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) from e # thread.join() # DingZQ 08152025 # matched_keys = redis_client.keys(...) # redis_client.delete(*matched_keys) ############################################################ # real_Time api 37.5 # example: # response = requests.post("http://127.0.0.1:8000/runsimulation", # data=json.dumps({'network': 'bb_server', 'simulation_type': 'extended', # 'start_time': '2024-05-17T09:30:00Z', 'duration': 900, # 'pump_control': {'1#': [0, 0], '2#': [1, 1], '3#': [1, 1], '4#': [1, 0], # '5#': [45, 43], '6#': [0, 0], '7#': [0, 0]}}), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ # class RunSimuItem(BaseModel): # network: str # simulation_type: str # start_time: str # end_time: Optional[str] = None # duration: Optional[int] = 900 # pump_control: Optional[dict] = None # # # @app.post("/runsimulation/") # async def fastapi_run_project(item: RunSimuItem) -> str: # item = item.dict() # filename = 'c:/lock.simulation' # filename2 = 'c:/lock.simulation2' # if os.path.exists(filename2): # print('file exists') # raise HTTPException(status_code=409, detail="is in simulation") # else: # print('file doesnt exists') # #os.rename(filename, filename2) # result = run_simulation_ex(item['network'], item['simulation_type'], # item['start_time'], item['end_time'], # item['duration'], item['pump_control']) # #os.rename(filename2, filename) # return result ############################################################ # burst analysis api 38 # example:http://127.0.0.1:8000/burst_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&burst_ID=ZBBGXSZW000001&burst_size=200&duration=1800 ############################################################ # @app.get("/burst_analysis/", response_class = PlainTextResponse) # async def fastapi_burst_analysis(network: str,start_time:str,burst_ID:str,burst_size:float,burst_flow:float=None,duration:int=None) -> str: # filename = 'c:/lock.simulation' # filename2 = 'c:/lock.simulation2' # if os.path.exists(filename2): # print('file exists') # raise HTTPException(status_code=409, detail="is in simulation") # else: # print('file doesnt exists') # #os.rename(filename, filename2) # result = burst_analysis(network,start_time,burst_ID,burst_size,burst_flow,duration) # #os.rename(filename2, filename) # return result ############################################################ # burst analysis api 38.5 # example: # response = requests.post("http://127.0.0.1:8000/burst_analysis", # data=json.dumps({'network': 'bb_server', # 'start_time': '2024-05-17T09:30:00Z', # 'burst_ID': ['ZBBGXSZW000001'], # 'burst_size': [200], # 'duration': 1800, # 'pump_control': {'1#': [0, 0, 0], '2#': [1, 1, 1], '3#': [1, 1, 1], '4#': [1, 1, 1], # '5#': [45, 45, 45], '6#': [0, 0, 0], '7#': [0, 0, 0]} # 'valve_closed': ['GSD2307192058576667FF7B41FF']), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ class BurstAnalysis(BaseModel): name: str modify_pattern_start_time: str burst_ID: Union[List[str], str] = None burst_size: Union[List[float], float, int] = None modify_total_duration: int = 900 modify_fixed_pump_pattern: Optional[dict[str, list]] = None modify_variable_pump_pattern: Optional[dict[str, list]] = None modify_valve_opening: Optional[dict[str, float]] = None scheme_name: Optional[str] = None @app.post("/burst_analysis/") async def fastapi_burst_analysis(data: BurstAnalysis) -> str: item = data.dict() filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) burst_analysis( name=item["name"], modify_pattern_start_time=item["modify_pattern_start_time"], burst_ID=item["burst_ID"], burst_size=item["burst_size"], modify_total_duration=item["modify_total_duration"], modify_fixed_pump_pattern=item["modify_fixed_pump_pattern"], modify_variable_pump_pattern=item["modify_variable_pump_pattern"], modify_valve_opening=item["modify_valve_opening"], scheme_name=item["scheme_name"], ) # os.rename(filename2, filename) """ # 将 时间转换成日期,然后缓存这个计算结果 # 缓存key: burst_analysis__ global redis_client schemename = data.scheme_name print(data.modify_pattern_start_time) querydate = time_api.get_date_from_time(data.modify_pattern_start_time) print(f"schemename: {schemename}, querydate: {querydate}") cache_key = f"queryallschemeallrecords_burst_Analysis_{schemename}_{querydate}" data = redis_client.get(cache_key) if not data: results = influxdb_api.query_scheme_all_record("burst_Analysis", scheme_name=schemename, query_date=querydate) packed = msgpack.packb(results, default=encode_datetime) redis_client.set(cache_key, packed) """ return "success" ############################################################ # valve close analysis api 39 # example:http://127.0.0.1:8000/valve_close_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valves=GSD2307192058577780A3287D78&valves=GSD2307192058572E953B707226(S2)&duration=1800 ############################################################ @app.get("/valve_close_analysis/", response_class=PlainTextResponse) async def fastapi_valve_close_analysis( network: str, start_time: str, valves: Annotated[list[str], Query()], duration: int = None, ) -> str: filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = valve_close_analysis(network, start_time, valves, duration) # os.rename(filename2, filename) return result ############################################################ # pipe flushing analysis api 40 # example:http://127.0.0.1:8000/flushing_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valves=GSD230719205857733F8F5214FF&valves=GSD230719205857C0AF65B6A170&valves_k=0.5&valves_k=0.5&drainage_node_ID=GSD2307192058570DEDF28E4F73&flush_flow=0&duration=1800 ############################################################ @app.get("/flushing_analysis/", response_class=PlainTextResponse) async def fastapi_flushing_analysis( network: str, start_time: str, valves: Annotated[list[str], Query()], valves_k: Annotated[list[float], Query()], drainage_node_ID: str, flush_flow: float = 0, duration: int = None, ) -> str: filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = flushing_analysis( network, start_time, valves, valves_k, drainage_node_ID, flush_flow, duration, ) # os.rename(filename2, filename) return result ############################################################ # contaminant_simulation api 41 # example:http://127.0.0.1:8000/contaminant_simulation?network=beibeizone&start_time=2024-04-01T08:00:00Z&source=ZBBDTZDP002677&concentration=100&duration=1800 ############################################################ @app.get("/contaminant_simulation/", response_class=PlainTextResponse) async def fastapi_contaminant_simulation( network: str, start_time: str, source: str, concentration: float, duration: int, pattern: str = None, scheme_name: str = None, ) -> str: filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = contaminant_simulation( network, start_time, source, concentration, duration, pattern ) # os.rename(filename2, filename) return result ############################################################ # age analysis api 42 # example:http://127.0.0.1:8000/age_analysis/?network=bb&start_time=2024-04-01T00:00:00Z&end_time=2024-04-01T08:00:00Z&duration=28800 ############################################################ @app.get("/age_analysis/", response_class=PlainTextResponse) async def fastapi_age_analysis( network: str, start_time: str, end_time: str, duration: int ) -> str: filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = age_analysis(network, start_time, end_time, duration) # os.rename(filename2, filename) return result ############################################################ # scheduling analysis api 43 ############################################################ class SchedulingAnalysis(BaseModel): network: str start_time: str pump_control: dict tank_id: str water_plant_output_id: str time_delta: Optional[int] = 300 @app.post("/scheduling_analysis/") async def fastapi_scheduling_analysis(data: SchedulingAnalysis) -> str: data = data.dict() filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = scheduling_simulation( data["network"], data["start_time"], data["pump_control"], data["tank_id"], data["water_plant_output_id"], data["time_delta"], ) # os.rename(filename2, filename) return result ############################################################ # pressure_regulating api 44 # example: # response = requests.post("http://127.0.0.1:8000/pressure_regulating", # data=json.dumps({'network': 'bb_server', # 'start_time': '2024-05-17T09:30:00Z', # 'pump_control': {'1#': [0, 0], '2#': [1, 1], '3#': [1, 1], '4#': [1, 1], # '5#': [45, 45], '6#': [0, 0], '7#': [0, 0]} # 'tank_init_level': {'ZBBDTJSC000002': 2, 'ZBBDTJSC000001': 2}}), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ class PressureRegulation(BaseModel): network: str start_time: str pump_control: dict tank_init_level: Optional[dict] = None @app.post("/pressure_regulation/") async def fastapi_pressure_regulation(data: PressureRegulation) -> str: item = data.dict() filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = pressure_regulation( prj_name=item["network"], start_datetime=item["start_time"], pump_control=item["pump_control"], tank_initial_level_control=item["tank_init_level"], ) # os.rename(filename2, filename) return result ############################################################ # project_management api 45 # example: # response = requests.post("http://127.0.0.1:8000/project_management", # data=json.dumps({'network': 'bb_server', # 'start_time': '2024-05-17T00:00:00Z', # 'pump_control': # {'1#':(list:97), '2#':(list:97), '3#':(list:97), '4#':(list:97), # '5#':(list:97), '6#':(list:97), '7#':(list:97)} # 'tank_init_level': {'ZBBDTJSC000002': 2, 'ZBBDTJSC000001': 2} # 'region_demand': {'hp': 150000, 'lp': 40000}}), # headers={'accept': 'application/json', 'Content-Type': 'application/json'}) ############################################################ class ProjectManagement(BaseModel): network: str start_time: str pump_control: dict tank_init_level: Optional[dict] = None region_demand: Optional[dict] = None @app.post("/project_management/") async def fastapi_project_management(data: ProjectManagement) -> str: item = data.dict() filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = project_management( prj_name=item["network"], start_datetime=item["start_time"], pump_control=item["pump_control"], tank_initial_level_control=item["tank_init_level"], region_demand_control=item["region_demand"], ) # os.rename(filename2, filename) return result ############################################################ # project_management api 46 # example: # with open('./inp/bb_temp.inp', 'rb') as file: # response = requests.post("http://127.0.0.1:8000/network_project", # files={'file': file}) ############################################################ @app.post("/network_project/") async def fastapi_network_project(file: UploadFile = File()) -> str: temp_file_path = "./inp/" if not os.path.exists(temp_file_path): os.mkdir(temp_file_path) temp_file_name = f'network_project_{datetime.now().strftime("%Y%m%d")}' temp_file_path = f"{temp_file_path}{temp_file_name}.inp" with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) buffer.close() filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") result = run_inp(temp_file_name) # os.rename(filename2, filename) return result ############################################################ # daily scheduling analysis api 47 ############################################################ class DailySchedulingAnalysis(BaseModel): network: str start_time: str pump_control: dict reservoir_id: str tank_id: str water_plant_output_id: str time_delta: Optional[int] = 300 @app.post("/daily_scheduling_analysis/") async def fastapi_daily_scheduling_analysis(data: DailySchedulingAnalysis) -> str: data = data.dict() filename = "c:/lock.simulation" filename2 = "c:/lock.simulation2" if os.path.exists(filename2): print("file exists") raise HTTPException(status_code=409, detail="is in simulation") else: print("file doesnt exists") # os.rename(filename, filename2) result = daily_scheduling_simulation( data["network"], data["start_time"], data["pump_control"], data["reservoir_id"], data["tank_id"], data["water_plant_output_id"], ) # os.rename(filename2, filename) return result ############################################################ # network_update api 48 ############################################################ @app.post("/network_update/") async def fastapi_network_update(file: UploadFile = File()) -> str: # 默认文件夹 default_folder = "./" # 使用当前时间生成临时文件名 temp_file_name = f'network_update_{datetime.now().strftime("%Y%m%d")}' temp_file_path = os.path.join(default_folder, temp_file_name) # 保存上传的文件到服务器 try: with open(temp_file_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) buffer.close() print(f"文件 {temp_file_name} 已成功保存。") except Exception as e: raise HTTPException(status_code=500, detail=f"文件保存失败: {e}") # 更新数据库 try: network_update(temp_file_path) return json.dumps({"message": "管网更新成功"}) except Exception as e: raise HTTPException(status_code=500, detail=f"数据库操作失败: {e}") ############################################################ # pump failure api 49 ############################################################ class PumpFailureState(BaseModel): time: str pump_status: dict @app.post("/pump_failure/") async def fastapi_pump_failure(data: PumpFailureState) -> str: item = data.dict() with open("./pump_failure_message.txt", "a", encoding="utf-8-sig") as f1: f1.write( "[{}] {}\n".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), item) ) # save message status_info = item.copy() with open("./pump_failure_status.txt", "r", encoding="utf-8-sig") as f2: lines = f2.readlines() first_stage_pump_status_dict = json.loads(json.dumps(eval(lines[0]))) second_stage_pump_status_dict = json.loads( json.dumps(eval(lines[-1])) ) # read local file pump_status_dict = { "first": first_stage_pump_status_dict, # first-stage pump "second": second_stage_pump_status_dict, } # second-stage pump for pump_type in status_info["pump_status"].keys(): # 'first' or 'second' if pump_type in pump_status_dict.keys(): # the type of pumps exists if all( pump_id in pump_status_dict[pump_type].keys() for pump_id in status_info["pump_status"][pump_type].keys() ): # all pump IDs exist for pump_id in status_info["pump_status"][pump_type].keys(): pump_status_dict[pump_type][pump_id] = int( status_info["pump_status"][pump_type][pump_id] ) # modify status dict else: return json.dumps("ERROR: Wrong Pump ID") else: return json.dumps("ERROR: Wrong Pump Type") with open("./pump_failure_status.txt", "w", encoding="utf-8-sig") as f2_: f2_.write( "{}\n{}".format(pump_status_dict["first"], pump_status_dict["second"]) ) # save local file return json.dumps("SUCCESS") ############################################################ # pressure_sensor_placement_sensitivity api 50 ############################################################ # 2025/05/17 class Pressure_Sensor_Placement(BaseModel): name: str scheme_name: str sensor_number: int min_diameter: int = 0 username: str @app.post("/pressure_sensor_placement_sensitivity/") async def fastapi_pressure_sensor_placement_sensitivity( data: Pressure_Sensor_Placement, ) -> None: item = data.dict() pressure_sensor_placement_sensitivity( name=item["name"], scheme_name=item["scheme_name"], sensor_number=item["sensor_number"], min_diameter=item["min_diameter"], username=item["username"], ) @app.post("/pressure_sensor_placement_kmeans/") async def fastapi_pressure_sensor_placement_kmeans( data: Pressure_Sensor_Placement, ) -> None: item = data.dict() pressure_sensor_placement_kmeans( name=item["name"], scheme_name=item["scheme_name"], sensor_number=item["sensor_number"], min_diameter=item["min_diameter"], username=item["username"], ) # 后续改进:合并两个接口为一个,增加method、sensor_type参数选择方法 @app.post("/sensorplacementscheme/create") async def fastapi_pressure_sensor_placement( network: str = Query(...), scheme_name: str = Query(...), sensor_type: str = Query(...), method: str = Query(...), sensor_count: int = Query(...), min_diameter: int = Query(0), user_name: str = Query(...), ) -> str: item = { "network": network, "scheme_name": scheme_name, "sensor_type": sensor_type, "method": method, "sensor_count": sensor_count, "min_diameter": min_diameter, "user_name": user_name, } # 验证方法参数 if item["method"] not in ["sensitivity", "kmeans"]: raise HTTPException( status_code=400, detail="Invalid method. Must be 'sensitivity' or 'kmeans'" ) try: if item["method"] == "sensitivity": pressure_sensor_placement_sensitivity( name=item["network"], scheme_name=item["scheme_name"], sensor_number=item["sensor_count"], min_diameter=item["min_diameter"], username=item["user_name"], ) elif item["method"] == "kmeans": pressure_sensor_placement_kmeans( name=item["network"], scheme_name=item["scheme_name"], sensor_number=item["sensor_count"], min_diameter=item["min_diameter"], username=item["user_name"], ) return "success" except Exception as e: raise HTTPException(status_code=500, detail=f"执行失败: {str(e)}") # 新增 SCADA 设备清洗接口 @app.post("/scadadevicedatacleaning/") async def fastapi_scada_device_data_cleaning( network: str = Query(...), ids_list: List[str] = Query(...), start_time: str = Query(...), end_time: str = Query(...), user_name: str = Query(...), ) -> str: import pandas as pd # 假设可以使用 pandas 处理表格数据 item = { "network": network, "ids": ids_list, "start_time": start_time, "end_time": end_time, "user_name": user_name, } query_ids_list = item["ids"][0].split(",") # 先调用 query_SCADA_data_by_device_ID_and_timerange 获取原始数据 scada_data = influxdb_api.query_SCADA_data_by_device_ID_and_timerange( query_ids_list=query_ids_list, start_time=item["start_time"], end_time=item["end_time"], ) # 获取对应管网的所有 SCADA 设备信息 scada_device_info = influxdb_api.query_pg_scada_info(item["network"]) # 将列表转换为字典,以 device_id 为键 scada_device_info_dict = {info["id"]: info for info in scada_device_info} # 按设备类型分组设备 type_groups = {} for device_id in query_ids_list: device_info = scada_device_info_dict.get(device_id, {}) device_type = device_info.get("type", "unknown") if device_type not in type_groups: type_groups[device_type] = [] type_groups[device_type].append(device_id) # 批量处理每种类型的设备 for device_type, device_ids in type_groups.items(): if device_type not in ["pressure", "pipe_flow"]: continue # 跳过未知类型 # 过滤该类型的设备数据 type_scada_data = { device_id: scada_data[device_id] for device_id in device_ids if device_id in scada_data } if not type_scada_data: continue # 假设所有设备的时间点相同,提取 time 列表 time_list = [record["time"] for record in next(iter(type_scada_data.values()))] # 创建 DataFrame,第一列是 time,然后是每个设备的 value 列 df = pd.DataFrame({"time": time_list}) for device_id in device_ids: if device_id in type_scada_data: values = [record["value"] for record in type_scada_data[device_id]] df[device_id] = values # 移除 time 列,准备输入给清洗方法(清洗方法期望 value 表格) value_df = df.drop(columns=["time"]) # 调用清洗方法 if device_type == "pressure": cleaned_value_df = api_ex.Pdataclean.clean_pressure_data_dict_km(value_df) elif device_type == "pipe_flow": cleaned_value_df = api_ex.Fdataclean.clean_flow_data_dict(value_df) # 添加 time 列到首列 cleaned_value_df = pd.DataFrame(cleaned_value_df) # # 只选择以 '_cleaned' 结尾的清洗数据列 # cleaned_columns = [ # col for col in cleaned_value_df.columns if col.endswith("_cleaned") # ] # cleaned_value_df = cleaned_value_df[cleaned_columns] # # 重命名列,移除 '_cleaned' 后缀 # cleaned_value_df = cleaned_value_df.rename( # columns={ # col: col.replace("_cleaned", "") for col in cleaned_value_df.columns # } # ) cleaned_df = pd.concat([df["time"], cleaned_value_df], axis=1) # 调试输出,确认列名 print(f"清洗后的列名: {cleaned_df.columns.tolist()}") # 将清洗后的数据写回数据库 influxdb_api.import_multicolumn_data_from_dict( data_dict=cleaned_df.to_dict("list"), # 转换为 {column_name: [values]} 格式 raw=False, ) return "success" class Item(BaseModel): str_info: str dict_info: Optional[dict] = None @app.post("/test_dict/") async def get_dict(item: Item): print(item.dict()) return item if __name__ == "__main__": # uvicorn.run(app, host="0.0.0.0", port=8000) # url='http://127.0.0.1:8000/valve_close_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&valve_IDs=GSD2307192058577780A3287D78&valve_IDs=GSD2307192058572E953B707226(S2)&duration=1800' url = "http://127.0.0.1:8000/burst_analysis?network=beibeizone&start_time=2024-04-01T08:00:00Z&burst_ID=ZBBGXSZW000001&duration=1800" # url = "http://192.168.1.36:8000/queryallschemeallrecords/?schemename=Fangan0817114448&querydate=2025-08-13&schemetype=burst_Analysis" # response = Request.get(url) import requests response = requests.get(url)