from __future__ import annotations from typing import Annotated import typer from .apps import component_option_app, network_app from .common import emit_api, runtime_context from .core import CLIError, require_network from .option_types import ComponentOptionKind @network_app.command("get-junction-properties") def network_get_junction_properties( ctx: typer.Context, junction: Annotated[str, typer.Option("--junction", help="节点 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取节点属性成功", method="GET", path="/getjunctionproperties/", params={"network": require_network(runtime), "junction": junction}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-pipe-properties") def network_get_pipe_properties( ctx: typer.Context, pipe: Annotated[str, typer.Option("--pipe", help="管道 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取管道属性成功", method="GET", path="/getpipeproperties/", params={"network": require_network(runtime), "pipe": pipe}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-all-pipes-properties") def network_get_all_pipes_properties(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取全部管道属性成功", method="GET", path="/getallpipeproperties/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-reservoir-properties") def network_get_reservoir_properties( ctx: typer.Context, reservoir: Annotated[str, typer.Option("--reservoir", help="水库 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取水库属性成功", method="GET", path="/getreservoirproperties/", params={"network": require_network(runtime), "reservoir": reservoir}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-all-reservoirs-properties") def network_get_all_reservoir_properties(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取全部水库属性成功", method="GET", path="/getallreservoirproperties/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-tank-properties") def network_get_tank_properties( ctx: typer.Context, tank: Annotated[str, typer.Option("--tank", help="水箱 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取水箱属性成功", method="GET", path="/gettankproperties/", params={"network": require_network(runtime), "tank": tank}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-all-tanks-properties") def network_get_all_tank_properties(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取全部水箱属性成功", method="GET", path="/getalltankproperties/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-pump-properties") def network_get_pump_properties( ctx: typer.Context, pump: Annotated[str, typer.Option("--pump", help="水泵 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取水泵属性成功", method="GET", path="/getpumpproperties/", params={"network": require_network(runtime), "pump": pump}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-all-pumps-properties") def network_get_all_pump_properties(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取全部水泵属性成功", method="GET", path="/getallpumpproperties/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-valve-properties") def network_get_valve_properties( ctx: typer.Context, valve: Annotated[str, typer.Option("--valve", help="阀门 ID")], ) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取阀门属性成功", method="GET", path="/getvalveproperties/", params={"network": require_network(runtime), "valve": valve}, require_auth=True, require_network_ctx=True, ) @network_app.command("get-all-valves-properties") def network_get_all_valve_properties(ctx: typer.Context) -> None: runtime = runtime_context(ctx) emit_api( ctx, summary="读取全部阀门属性成功", method="GET", path="/getallvalveproperties/", params={"network": require_network(runtime)}, require_auth=True, require_network_ctx=True, ) @component_option_app.command("schema") def component_option_schema( ctx: typer.Context, kind: Annotated[ComponentOptionKind, typer.Option("--kind", help="选项类型,仅支持 time|energy|pump-energy|network")], pump: Annotated[str | None, typer.Option("--pump", help="pump-energy 时需要的泵 ID")] = None, ) -> None: runtime = runtime_context(ctx) path = _component_option_path(kind.value, schema=True) params = {"network": require_network(runtime)} if kind == ComponentOptionKind.PUMP_ENERGY and pump: params["pump"] = pump emit_api( ctx, summary="读取选项 schema 成功", method="GET", path=path, params=params, require_auth=True, require_network_ctx=True, ) @component_option_app.command("get") def component_option_get( ctx: typer.Context, kind: Annotated[ComponentOptionKind, typer.Option("--kind", help="选项类型,仅支持 time|energy|pump-energy|network")], pump: Annotated[str | None, typer.Option("--pump", help="pump-energy 时需要的泵 ID")] = None, ) -> None: runtime = runtime_context(ctx) path = _component_option_path(kind.value, schema=False) params = {"network": require_network(runtime)} if kind == ComponentOptionKind.PUMP_ENERGY: if not pump: raise CLIError( "CLI 参数错误", code="PUMP_REQUIRED", message="--pump is required when --kind pump-energy", exit_code=2, ) params["pump"] = pump emit_api( ctx, summary="读取选项属性成功", method="GET", path=path, params=params, require_auth=True, require_network_ctx=True, ) def _component_option_path(kind: str, *, schema: bool) -> str: routes = { ("time", True): "/gettimeschema", ("time", False): "/gettimeproperties/", ("energy", True): "/getenergyschema/", ("energy", False): "/getenergyproperties/", ("pump-energy", True): "/getpumpenergyschema/", ("pump-energy", False): "/getpumpenergyproperties//", ("network", True): "/getoptionschema/", ("network", False): "/getoptionproperties/", } path = routes.get((kind, schema)) if path is None: raise CLIError( "CLI 参数错误", code="INVALID_KIND", message="--kind must be one of time, energy, pump-energy, network", exit_code=2, ) return path