Files
TJWaterServerBinary/cli/tjwater_agent_cli/helping.py
T

404 lines
15 KiB
Python

from __future__ import annotations
import json
from typing import Annotated, Any
import click
import typer
from .apps import GROUP_HELP_APPS, TOP_LEVEL_COMMANDS, app
from .core import CLIError
from .registry import (
get_command_doc,
get_group_summary,
has_subcommands,
is_hidden_path,
list_capabilities,
list_subcommands,
)
def _click_root_command() -> click.Command:
# Must stay lazy: the click tree is only complete after command modules import.
return typer.main.get_command(app)
def _normalize_command_path(tokens: list[str]) -> tuple[str, ...]:
while tokens and tokens[0] not in TOP_LEVEL_COMMANDS:
tokens = tokens[1:]
return tuple(tokens)
def context_command_path(click_ctx: click.Context | None) -> tuple[str, ...]:
if click_ctx is None:
return ()
return _normalize_command_path(click_ctx.command_path.split())
def _build_click_context(path: tuple[str, ...]) -> click.Context | None:
root = _click_root_command()
ctx: click.Context = click.Context(root, info_name="tjwater")
command: click.Command = root
for token in path:
if not isinstance(command, click.Group):
return None
next_command = command.commands.get(token)
if next_command is None:
return None
ctx = click.Context(next_command, info_name=token, parent=ctx)
command = next_command
return ctx
def build_usage(path: tuple[str, ...]) -> str | None:
ctx = _build_click_context(path)
if ctx is None:
return None
parts = ["tjwater", *path]
for parameter in ctx.command.params:
if not isinstance(parameter, click.Option):
continue
if "--help" in parameter.opts:
continue
option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
if parameter.is_flag:
parts.append(f"--{option_name}" if parameter.required else f"[--{option_name}]")
continue
placeholder = option_name.upper().replace("-", "_")
if parameter.required:
parts.extend([f"--{option_name}", f"<{placeholder}>"])
else:
parts.append(f"[--{option_name} <{placeholder}>]")
return " ".join(parts)
def _click_option_docs(path: tuple[str, ...]) -> list[dict[str, Any]]:
ctx = _build_click_context(path)
if ctx is None:
return []
options: list[dict[str, Any]] = []
for parameter in ctx.command.params:
if not isinstance(parameter, click.Option):
continue
if "--help" in parameter.opts:
continue
cli_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
options.append(
{
"name": cli_name,
"description": parameter.help or "",
"required": parameter.required,
"repeated": parameter.multiple,
"default": parameter.default,
}
)
return options
def _sample_option_value(path: tuple[str, ...], option_name: str) -> str:
path_specific_samples: dict[tuple[tuple[str, ...], str], str] = {
(("project", "data"), "kind"): "scada-info",
(("component", "option", "schema"), "kind"): "time",
(("component", "option", "get"), "kind"): "time",
(("data", "timeseries", "composite"), "kind"): "scada-simulation",
(("data", "scada", "schema"), "kind"): "device",
(("data", "scada", "get"), "kind"): "device",
(("data", "scada", "list"), "kind"): "device",
}
if (path, option_name) in path_specific_samples:
return path_specific_samples[(path, option_name)]
if option_name == "start-time":
return "2025-01-02T03:04:05+08:00"
if option_name == "end-time":
return "2025-01-02T04:04:05+08:00"
if option_name == "date":
return "2025-01-02"
if option_name == "duration":
return "30"
if option_name == "kind":
return "time"
if option_name == "mode":
return "close"
if option_name == "scheme":
return "baseline"
if option_name == "output":
return "./demo.inp" if "export-inp" in path else "./output.json"
if option_name == "pump":
return "PUMP-1"
if option_name == "node":
return "J1"
if option_name == "source-node":
return "J1"
if option_name == "drainage-node":
return "J2"
if option_name in {"link", "pipe", "pipe-id", "element-id", "element"}:
return "P1"
if option_name == "flow":
return "120.5"
if option_name == "concentration":
return "0.8"
if option_name == "device-id":
return "SCADA-001"
if option_name == "burst-file":
return "./burst.json"
if option_name == "valve-setting-file":
return "./valves.json"
if option_name.endswith("-file"):
return "./input.json"
if option_name.endswith("-id"):
return "demo-id"
return "demo"
def _build_example(path: tuple[str, ...], *, existing_examples: list[str] | None = None) -> str:
ctx = _build_click_context(path)
required_option_names: list[str] = []
if ctx is not None:
required_option_names = [
next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
for parameter in ctx.command.params
if isinstance(parameter, click.Option) and "--help" not in parameter.opts and parameter.required
]
if existing_examples:
for example in existing_examples:
has_auth = "--auth-context" in example
has_required_options = all(f"--{option_name}" in example for option_name in required_option_names)
if has_auth and has_required_options:
return example
parts = ["tjwater", "--auth-context", "auth.json", *path]
if ctx is None:
return " ".join(parts)
for parameter in ctx.command.params:
if not isinstance(parameter, click.Option):
continue
if "--help" in parameter.opts or not parameter.required:
continue
option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
parts.extend([f"--{option_name}", _sample_option_value(path, option_name)])
return " ".join(parts)
def _enrich_leaf_payload(payload: dict[str, Any], path: tuple[str, ...]) -> dict[str, Any]:
enriched = dict(payload)
enriched["usage"] = build_usage(path) or payload.get("usage")
click_options = _click_option_docs(path)
if click_options:
enriched["options"] = click_options
enriched["examples"] = payload.get("examples") or []
if not enriched["examples"] or all("<" in example and ">" in example for example in enriched["examples"]):
enriched["examples"] = [_build_example(path, existing_examples=enriched["examples"])]
return enriched
def _enrich_index_payload(payload: dict[str, Any]) -> dict[str, Any]:
enriched = dict(payload)
commands: list[dict[str, Any]] = []
for command in payload.get("commands", []):
command_item = dict(command)
path = tuple(command_item["command"].split())
doc = get_command_doc(path)
if doc is None and has_subcommands(path):
command_item["usage"] = f"tjwater {' '.join(path)} help"
command_item["example"] = f"tjwater {' '.join(path)} help"
else:
existing_examples = [] if doc is None else list(doc.get("examples", []))
command_item["usage"] = build_usage(path) or command_item.get("usage")
command_item["example"] = _build_example(path, existing_examples=existing_examples)
commands.append(command_item)
enriched["commands"] = commands
return enriched
def resolve_help_payload(path: tuple[str, ...]) -> tuple[dict[str, Any] | None, bool]:
if not path:
return list_capabilities(), True
payload = get_command_doc(path)
if payload is not None:
return _enrich_leaf_payload(payload, path), False
if has_subcommands(path):
return _enrich_index_payload(list_subcommands(path, get_group_summary(path))), True
return None, False
def emit_help_payload(payload: dict[str, Any], *, json_output: bool, is_index: bool) -> None:
if json_output:
typer.echo(json.dumps(payload, ensure_ascii=False))
else:
typer.echo(render_help_text(payload, is_index=is_index))
def merge_next_commands(*groups: list[str] | None) -> list[str]:
merged: list[str] = []
seen: set[str] = set()
for group in groups:
for command in group or []:
if command in seen:
continue
seen.add(command)
merged.append(command)
return merged
def merge_error_data(primary: Any, secondary: Any) -> Any:
if primary is None:
return secondary
if secondary is None:
return primary
if isinstance(primary, dict) and isinstance(secondary, dict):
return {**secondary, **primary}
return primary
def build_error_guidance(click_ctx: click.Context | None) -> tuple[Any, list[str]]:
command_path = context_command_path(click_ctx)
usage = build_usage(command_path) if command_path else None
if command_path:
if command_path[-1] == "help":
group_path = command_path[:-1]
if group_path:
return (
{
"command_group": " ".join(group_path),
"usage": f"tjwater {' '.join(group_path)} help",
"examples": [f"tjwater {' '.join(group_path)} help", f"tjwater help {' '.join(group_path)}"],
},
merge_next_commands(
[f"tjwater {' '.join(group_path)} help", f"tjwater help {' '.join(group_path)}"],
["tjwater help"],
),
)
payload, is_index = resolve_help_payload(command_path)
if payload is not None and not is_index:
return (
{
"command": payload["command"],
"usage": payload.get("usage") or usage,
"examples": payload.get("examples", []),
},
merge_next_commands([f"tjwater help {' '.join(command_path)}"], ["tjwater help"]),
)
if payload is not None and is_index:
return (
{
"command_group": " ".join(command_path),
"usage": f"tjwater {' '.join(command_path)} help",
"examples": [f"tjwater {' '.join(command_path)} help", f"tjwater help {' '.join(command_path)}"],
},
merge_next_commands(
[f"tjwater {' '.join(command_path)} help", f"tjwater help {' '.join(command_path)}"],
["tjwater help"],
),
)
return ({"usage": usage} if usage else None, ["tjwater help"])
def classify_click_error(exc: click.ClickException) -> tuple[str, str]:
if isinstance(exc, click.NoSuchOption):
return "未知选项", "UNKNOWN_OPTION"
if isinstance(exc, click.MissingParameter):
return "缺少参数", "MISSING_PARAMETER"
if isinstance(exc, click.BadParameter):
return "参数无效", "INVALID_PARAMETER"
message = exc.format_message()
if "No such command" in message:
return "未找到命令", "COMMAND_NOT_FOUND"
return "CLI 参数错误", "USAGE_ERROR"
def render_help_text(payload: dict[str, Any], *, is_index: bool) -> str:
lines: list[str] = [str(payload.get("summary", ""))]
if is_index:
is_top_level = payload.get("menu_level") == 1
lines.append("")
lines.append("Commands:")
for command in payload.get("commands", []):
lines.append(f" {command['command']}: {command['summary']}")
if not is_top_level and command.get("usage"):
lines.append(f" usage: {command['usage']}")
if not is_top_level and command.get("example"):
lines.append(f" example: {command['example']}")
lines.append("")
if is_top_level:
lines.append("Use `tjwater <menu> help` to see subcommands.")
else:
lines.append("Use `tjwater help --json` for structured output.")
return "\n".join(lines)
lines.append("")
lines.append(f"Command: {payload['command']}")
lines.append(f"Description: {payload['description']}")
if payload.get("usage"):
lines.append(f"Usage: {payload['usage']}")
options = payload.get("options", [])
if options:
lines.append("")
lines.append("Options:")
for option in options:
suffix = " (required)" if option.get("required") else ""
lines.append(f" --{option['name']}{suffix}: {option['description']}")
examples = payload.get("examples", [])
if examples:
lines.append("")
lines.append("Examples:")
for example in examples:
lines.append(f" {example}")
lines.append("")
lines.append("Use `tjwater help --json` for structured output.")
return "\n".join(lines)
def make_group_help_handler(path_prefix: tuple[str, ...]):
def group_help(
json_output: Annotated[bool, typer.Option("--json", help="输出 JSON")] = False,
) -> None:
payload, is_index = resolve_help_payload(path_prefix)
if payload is None:
raise CLIError(
"未找到命令",
code="COMMAND_NOT_FOUND",
message=f"unknown command path: {' '.join(path_prefix)}",
exit_code=2,
next_commands=["tjwater help"],
)
emit_help_payload(payload, json_output=json_output, is_index=is_index)
group_help.__name__ = f"{'_'.join(path_prefix)}_help"
return group_help
def register_group_help_commands() -> None:
for group_app, path_prefix in GROUP_HELP_APPS:
group_app.command("help")(make_group_help_handler(path_prefix))
def apply_typer_help_metadata() -> None:
app.help = "TJWater agent CLI"
app.short_help = "TJWater agent CLI"
for group_app, path_prefix in GROUP_HELP_APPS:
for command_info in group_app.registered_commands:
command_path = (*path_prefix, command_info.name)
if command_info.name == "help":
command_info.help = f"显示 {' '.join(path_prefix)} 的帮助信息。"
command_info.short_help = command_info.help
command_info.hidden = False
continue
payload = get_command_doc(command_path)
command_info.help = None if payload is None else str(payload.get("summary", ""))
command_info.short_help = command_info.help
command_info.hidden = is_hidden_path(command_path)
for group_info in group_app.registered_groups:
group_path = (*path_prefix, group_info.name)
summary = get_group_summary(group_path)
group_info.help = summary
group_info.short_help = summary
group_info.hidden = is_hidden_path(group_path)
for group_info in app.registered_groups:
group_path = (group_info.name,)
summary = get_group_summary(group_path)
group_info.help = summary
group_info.short_help = summary
group_info.hidden = is_hidden_path(group_path)