Files
TJWaterServerBinary/cli/tjwater_cli/commands_readonly.py
T

145 lines
4.3 KiB
Python

from __future__ import annotations
from typing import Annotated
import typer
from .apps import component_option_app, network_app
from .common import emit_api, runtime_context
from .core import CLIError, require_network
@network_app.command("get-node-properties")
def network_get_node_properties(
ctx: typer.Context,
node: Annotated[str, typer.Option("--node", help="节点 ID")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取节点属性成功",
method="GET",
path="/getnodeproperties/",
params={"network": require_network(runtime), "node": node},
require_auth=True,
require_network_ctx=True,
)
@network_app.command("get-link-properties")
def network_get_link_properties(
ctx: typer.Context,
link: Annotated[str, typer.Option("--link", help="管线 ID")],
) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取管线属性成功",
method="GET",
path="/getlinkproperties/",
params={"network": require_network(runtime), "link": link},
require_auth=True,
require_network_ctx=True,
)
@network_app.command("get-all-junction-properties")
def network_get_all_junction_properties(ctx: typer.Context) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取全部节点属性成功",
method="GET",
path="/getalljunctionproperties/",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@network_app.command("get-all-pipe-properties")
def network_get_all_pipe_properties(ctx: typer.Context) -> None:
runtime = runtime_context(ctx)
emit_api(
ctx,
summary="读取全部管道属性成功",
method="GET",
path="/getallpipeproperties/",
params={"network": require_network(runtime)},
require_auth=True,
require_network_ctx=True,
)
@component_option_app.command("schema")
def component_option_schema(
ctx: typer.Context,
kind: Annotated[str, typer.Option("--kind", help="time|energy|pump-energy|network")],
pump: Annotated[str | None, typer.Option("--pump", help="pump-energy 时需要的泵 ID")] = None,
) -> None:
runtime = runtime_context(ctx)
path = _component_option_path(kind, schema=True)
params = {"network": require_network(runtime)}
if kind == "pump-energy" and pump:
params["pump"] = pump
emit_api(
ctx,
summary="读取选项 schema 成功",
method="GET",
path=path,
params=params,
require_auth=True,
require_network_ctx=True,
)
@component_option_app.command("get")
def component_option_get(
ctx: typer.Context,
kind: Annotated[str, typer.Option("--kind", help="time|energy|pump-energy|network")],
pump: Annotated[str | None, typer.Option("--pump", help="pump-energy 时需要的泵 ID")] = None,
) -> None:
runtime = runtime_context(ctx)
path = _component_option_path(kind, schema=False)
params = {"network": require_network(runtime)}
if kind == "pump-energy":
if not pump:
raise CLIError(
"CLI 参数错误",
code="PUMP_REQUIRED",
message="--pump is required when --kind pump-energy",
exit_code=2,
)
params["pump"] = pump
emit_api(
ctx,
summary="读取选项属性成功",
method="GET",
path=path,
params=params,
require_auth=True,
require_network_ctx=True,
)
def _component_option_path(kind: str, *, schema: bool) -> str:
routes = {
("time", True): "/gettimeschema",
("time", False): "/gettimeproperties/",
("energy", True): "/getenergyschema/",
("energy", False): "/getenergyproperties/",
("pump-energy", True): "/getpumpenergyschema/",
("pump-energy", False): "/getpumpenergyproperties//",
("network", True): "/getoptionschema/",
("network", False): "/getoptionproperties/",
}
path = routes.get((kind, schema))
if path is None:
raise CLIError(
"CLI 参数错误",
code="INVALID_KIND",
message="--kind must be one of time, energy, pump-energy, network",
exit_code=2,
)
return path