415 lines
15 KiB
Python
415 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Annotated, Any
|
|
|
|
import click
|
|
import typer
|
|
|
|
from .apps import GROUP_HELP_APPS, TOP_LEVEL_COMMANDS, app
|
|
from .core import CLIError
|
|
from .registry import (
|
|
get_command_doc,
|
|
get_group_summary,
|
|
has_subcommands,
|
|
is_hidden_path,
|
|
list_capabilities,
|
|
list_subcommands,
|
|
)
|
|
|
|
|
|
def _click_root_command() -> click.Command:
|
|
# Must stay lazy: the click tree is only complete after command modules import.
|
|
return typer.main.get_command(app)
|
|
|
|
|
|
def _normalize_command_path(tokens: list[str]) -> tuple[str, ...]:
|
|
while tokens and tokens[0] not in TOP_LEVEL_COMMANDS:
|
|
tokens = tokens[1:]
|
|
return tuple(tokens)
|
|
|
|
|
|
def context_command_path(click_ctx: click.Context | None) -> tuple[str, ...]:
|
|
if click_ctx is None:
|
|
return ()
|
|
return _normalize_command_path(click_ctx.command_path.split())
|
|
|
|
|
|
def _build_click_context(path: tuple[str, ...]) -> click.Context | None:
|
|
root = _click_root_command()
|
|
ctx: click.Context = click.Context(root, info_name="tjwater-cli")
|
|
command: click.Command = root
|
|
for token in path:
|
|
if not isinstance(command, click.Group):
|
|
return None
|
|
next_command = command.commands.get(token)
|
|
if next_command is None:
|
|
return None
|
|
ctx = click.Context(next_command, info_name=token, parent=ctx)
|
|
command = next_command
|
|
return ctx
|
|
|
|
|
|
def build_usage(path: tuple[str, ...]) -> str | None:
|
|
ctx = _build_click_context(path)
|
|
if ctx is None:
|
|
return None
|
|
parts = ["tjwater-cli", *path]
|
|
for parameter in ctx.command.params:
|
|
if not isinstance(parameter, click.Option):
|
|
continue
|
|
if "--help" in parameter.opts:
|
|
continue
|
|
option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
|
|
if parameter.is_flag:
|
|
parts.append(f"--{option_name}" if parameter.required else f"[--{option_name}]")
|
|
continue
|
|
placeholder = option_name.upper().replace("-", "_")
|
|
if parameter.required:
|
|
parts.extend([f"--{option_name}", f"<{placeholder}>"])
|
|
else:
|
|
parts.append(f"[--{option_name} <{placeholder}>]")
|
|
return " ".join(parts)
|
|
|
|
|
|
def _click_option_docs(path: tuple[str, ...]) -> list[dict[str, Any]]:
|
|
ctx = _build_click_context(path)
|
|
if ctx is None:
|
|
return []
|
|
options: list[dict[str, Any]] = []
|
|
for parameter in ctx.command.params:
|
|
if not isinstance(parameter, click.Option):
|
|
continue
|
|
if "--help" in parameter.opts:
|
|
continue
|
|
cli_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
|
|
options.append(
|
|
{
|
|
"name": cli_name,
|
|
"description": parameter.help or "",
|
|
"required": parameter.required,
|
|
"repeated": parameter.multiple,
|
|
"default": parameter.default,
|
|
}
|
|
)
|
|
return options
|
|
|
|
|
|
def _sample_option_value(path: tuple[str, ...], option_name: str) -> str:
|
|
path_specific_samples: dict[tuple[tuple[str, ...], str], str] = {
|
|
(("component", "option", "schema"), "kind"): "time",
|
|
(("component", "option", "get"), "kind"): "time",
|
|
(("data", "timeseries", "composite"), "kind"): "scada-simulation",
|
|
(("data", "scada", "get"), "kind"): "info",
|
|
(("data", "scada", "list"), "kind"): "info",
|
|
}
|
|
if (path, option_name) in path_specific_samples:
|
|
return path_specific_samples[(path, option_name)]
|
|
if option_name == "start-time":
|
|
return "2025-01-02T03:04:05+08:00"
|
|
if option_name == "end-time":
|
|
return "2025-01-02T04:04:05+08:00"
|
|
if option_name == "date":
|
|
return "2025-01-02"
|
|
if option_name == "duration":
|
|
return "30"
|
|
if option_name == "kind":
|
|
return "time"
|
|
if option_name == "mode":
|
|
return "close"
|
|
if option_name == "scheme":
|
|
return "baseline"
|
|
if option_name == "output":
|
|
return "./demo.inp" if "export-inp" in path else "./output.json"
|
|
if option_name == "pump":
|
|
return "PUMP-1"
|
|
if option_name == "node":
|
|
return "J1"
|
|
if option_name == "source-node":
|
|
return "J1"
|
|
if option_name == "drainage-node":
|
|
return "J2"
|
|
if option_name in {"link", "pipe", "pipe-id", "element-id", "element"}:
|
|
return "P1"
|
|
if option_name == "flow":
|
|
return "120.5"
|
|
if option_name == "concentration":
|
|
return "0.8"
|
|
if option_name == "device-id":
|
|
return "SCADA-001"
|
|
if option_name == "burst-file":
|
|
return "./burst.json"
|
|
if option_name == "valve-setting-file":
|
|
return "./valves.json"
|
|
if option_name.endswith("-file"):
|
|
return "./input.json"
|
|
if option_name.endswith("-id"):
|
|
return "demo-id"
|
|
return "demo"
|
|
|
|
|
|
def _build_example(path: tuple[str, ...], *, existing_examples: list[str] | None = None) -> str:
|
|
ctx = _build_click_context(path)
|
|
required_option_names: list[str] = []
|
|
if ctx is not None:
|
|
required_option_names = [
|
|
next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
|
|
for parameter in ctx.command.params
|
|
if isinstance(parameter, click.Option) and "--help" not in parameter.opts and parameter.required
|
|
]
|
|
if existing_examples:
|
|
for example in existing_examples:
|
|
has_required_options = all(f"--{option_name}" in example for option_name in required_option_names)
|
|
if has_required_options:
|
|
return example
|
|
parts = ["tjwater-cli", *path]
|
|
if ctx is None:
|
|
return " ".join(parts)
|
|
for parameter in ctx.command.params:
|
|
if not isinstance(parameter, click.Option):
|
|
continue
|
|
if "--help" in parameter.opts or not parameter.required:
|
|
continue
|
|
option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
|
|
parts.extend([f"--{option_name}", _sample_option_value(path, option_name)])
|
|
return " ".join(parts)
|
|
|
|
|
|
def _enrich_leaf_payload(payload: dict[str, Any], path: tuple[str, ...]) -> dict[str, Any]:
|
|
enriched = dict(payload)
|
|
enriched["usage"] = build_usage(path) or payload.get("usage")
|
|
click_options = _click_option_docs(path)
|
|
if click_options:
|
|
enriched["options"] = click_options
|
|
enriched["examples"] = payload.get("examples") or []
|
|
if not enriched["examples"] or all("<" in example and ">" in example for example in enriched["examples"]):
|
|
enriched["examples"] = [_build_example(path, existing_examples=enriched["examples"])]
|
|
return enriched
|
|
|
|
|
|
def _enrich_index_payload(payload: dict[str, Any]) -> dict[str, Any]:
|
|
enriched = dict(payload)
|
|
commands: list[dict[str, Any]] = []
|
|
for command in payload.get("commands", []):
|
|
command_item = dict(command)
|
|
path = tuple(command_item["command"].split())
|
|
doc = get_command_doc(path)
|
|
if doc is None and has_subcommands(path):
|
|
command_item["usage"] = f"tjwater-cli {' '.join(path)} help"
|
|
command_item["example"] = f"tjwater-cli {' '.join(path)} help"
|
|
else:
|
|
existing_examples = [] if doc is None else list(doc.get("examples", []))
|
|
command_item["usage"] = build_usage(path) or command_item.get("usage")
|
|
command_item["example"] = _build_example(path, existing_examples=existing_examples)
|
|
commands.append(command_item)
|
|
enriched["commands"] = commands
|
|
return enriched
|
|
|
|
|
|
def resolve_help_payload(path: tuple[str, ...]) -> tuple[dict[str, Any] | None, bool]:
|
|
if not path:
|
|
return list_capabilities(), True
|
|
payload = get_command_doc(path)
|
|
if payload is not None:
|
|
return _enrich_leaf_payload(payload, path), False
|
|
if has_subcommands(path):
|
|
return _enrich_index_payload(list_subcommands(path, get_group_summary(path))), True
|
|
return None, False
|
|
|
|
|
|
def emit_help_payload(payload: dict[str, Any]) -> None:
|
|
typer.echo(json.dumps(payload, ensure_ascii=False))
|
|
|
|
|
|
def merge_next_commands(*groups: list[str] | None) -> list[str]:
|
|
merged: list[str] = []
|
|
seen: set[str] = set()
|
|
for group in groups:
|
|
for command in group or []:
|
|
if command in seen:
|
|
continue
|
|
seen.add(command)
|
|
merged.append(command)
|
|
return merged
|
|
|
|
|
|
def merge_error_data(primary: Any, secondary: Any) -> Any:
|
|
if primary is None:
|
|
return secondary
|
|
if secondary is None:
|
|
return primary
|
|
if isinstance(primary, dict) and isinstance(secondary, dict):
|
|
return {**secondary, **primary}
|
|
return primary
|
|
|
|
|
|
def build_error_guidance(click_ctx: click.Context | None) -> tuple[Any, list[str]]:
|
|
command_path = context_command_path(click_ctx)
|
|
usage = build_usage(command_path) if command_path else None
|
|
if command_path:
|
|
if command_path[-1] == "help":
|
|
group_path = command_path[:-1]
|
|
if group_path:
|
|
return (
|
|
{
|
|
"command_group": " ".join(group_path),
|
|
"usage": f"tjwater-cli {' '.join(group_path)} help",
|
|
"examples": [f"tjwater-cli {' '.join(group_path)} help", f"tjwater-cli help {' '.join(group_path)}"],
|
|
},
|
|
merge_next_commands(
|
|
[f"tjwater-cli {' '.join(group_path)} help", f"tjwater-cli help {' '.join(group_path)}"],
|
|
["tjwater-cli help"],
|
|
),
|
|
)
|
|
payload, is_index = resolve_help_payload(command_path)
|
|
if payload is not None and not is_index:
|
|
return (
|
|
{
|
|
"command": payload["command"],
|
|
"usage": payload.get("usage") or usage,
|
|
"examples": payload.get("examples", []),
|
|
},
|
|
merge_next_commands([f"tjwater-cli help {' '.join(command_path)}"], ["tjwater-cli help"]),
|
|
)
|
|
if payload is not None and is_index:
|
|
return (
|
|
{
|
|
"command_group": " ".join(command_path),
|
|
"usage": f"tjwater-cli {' '.join(command_path)} help",
|
|
"examples": [f"tjwater-cli {' '.join(command_path)} help", f"tjwater-cli help {' '.join(command_path)}"],
|
|
},
|
|
merge_next_commands(
|
|
[f"tjwater-cli {' '.join(command_path)} help", f"tjwater-cli help {' '.join(command_path)}"],
|
|
["tjwater-cli help"],
|
|
),
|
|
)
|
|
return ({"usage": usage} if usage else None, ["tjwater-cli help"])
|
|
|
|
|
|
def classify_click_error(exc: click.ClickException) -> tuple[str, str]:
|
|
if isinstance(exc, click.NoSuchOption):
|
|
return "未知选项", "UNKNOWN_OPTION"
|
|
if isinstance(exc, click.MissingParameter):
|
|
return "缺少参数", "MISSING_PARAMETER"
|
|
if isinstance(exc, click.BadParameter):
|
|
return "参数无效", "INVALID_PARAMETER"
|
|
message = exc.format_message()
|
|
if "No such command" in message:
|
|
return "未找到命令", "COMMAND_NOT_FOUND"
|
|
return "CLI 参数错误", "USAGE_ERROR"
|
|
|
|
|
|
def _build_root_help_epilog() -> str:
|
|
return "\n".join(
|
|
[
|
|
"\b",
|
|
"Examples:",
|
|
" tjwater-cli help",
|
|
" tjwater-cli help simulation run",
|
|
" tjwater-cli simulation run --help",
|
|
]
|
|
)
|
|
|
|
|
|
def _build_leaf_help_epilog(path: tuple[str, ...], payload: dict[str, Any]) -> str:
|
|
lines = ["\b"]
|
|
description = payload.get("description")
|
|
usage = payload.get("usage")
|
|
examples = payload.get("examples", [])
|
|
next_commands = payload.get("next_commands", [])
|
|
if description:
|
|
lines.extend([f"Description: {description}", ""])
|
|
if usage:
|
|
lines.extend([f"Usage example: {usage}", ""])
|
|
if examples:
|
|
lines.append("Examples:")
|
|
lines.extend(f" {example}" for example in examples)
|
|
lines.append("")
|
|
if next_commands:
|
|
lines.append("Next steps:")
|
|
lines.extend(f" {command}" for command in next_commands)
|
|
lines.append("")
|
|
lines.extend(["Structured JSON:", f" tjwater-cli help {' '.join(path)}"])
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _build_group_help_epilog(path: tuple[str, ...], payload: dict[str, Any]) -> str:
|
|
lines = ["\b", "Examples:", f" tjwater-cli help {' '.join(path)}"]
|
|
for command in payload.get("commands", [])[:2]:
|
|
example = command.get("example")
|
|
if example:
|
|
lines.append(f" {example}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def build_group_help_appendix(click_ctx: click.Context | None) -> str | None:
|
|
path = context_command_path(click_ctx)
|
|
if not path:
|
|
return _build_root_help_epilog()
|
|
payload, is_index = resolve_help_payload(path)
|
|
if payload is None or not is_index:
|
|
return None
|
|
return _build_group_help_epilog(path, payload)
|
|
|
|
|
|
def make_group_help_handler(path_prefix: tuple[str, ...]):
|
|
def group_help() -> None:
|
|
payload, is_index = resolve_help_payload(path_prefix)
|
|
if payload is None:
|
|
raise CLIError(
|
|
"未找到命令",
|
|
code="COMMAND_NOT_FOUND",
|
|
message=f"unknown command path: {' '.join(path_prefix)}",
|
|
exit_code=2,
|
|
next_commands=["tjwater-cli help"],
|
|
)
|
|
emit_help_payload(payload)
|
|
|
|
group_help.__name__ = f"{'_'.join(path_prefix)}_help"
|
|
return group_help
|
|
|
|
|
|
def register_group_help_commands() -> None:
|
|
for group_app, path_prefix in GROUP_HELP_APPS:
|
|
group_app.command("help")(make_group_help_handler(path_prefix))
|
|
|
|
|
|
def apply_typer_help_metadata() -> None:
|
|
app.help = "\n".join(
|
|
[
|
|
"TJWater agent CLI",
|
|
"",
|
|
"Examples:",
|
|
" tjwater-cli help",
|
|
" tjwater-cli help simulation run",
|
|
" tjwater-cli simulation run --help",
|
|
]
|
|
)
|
|
app.short_help = "TJWater agent CLI"
|
|
for group_app, path_prefix in GROUP_HELP_APPS:
|
|
for command_info in group_app.registered_commands:
|
|
command_path = (*path_prefix, command_info.name)
|
|
if command_info.name == "help":
|
|
command_info.help = f"输出 {' '.join(path_prefix)} 的 JSON 帮助信息。"
|
|
command_info.short_help = command_info.help
|
|
command_info.epilog = "\n".join(["\b", "Example:", f" tjwater-cli help {' '.join(path_prefix)}"])
|
|
command_info.hidden = False
|
|
continue
|
|
payload = get_command_doc(command_path)
|
|
command_info.help = None if payload is None else str(payload.get("summary", ""))
|
|
command_info.short_help = command_info.help
|
|
command_info.epilog = None if payload is None else _build_leaf_help_epilog(command_path, payload)
|
|
command_info.hidden = is_hidden_path(command_path)
|
|
for group_info in group_app.registered_groups:
|
|
group_path = (*path_prefix, group_info.name)
|
|
summary = get_group_summary(group_path)
|
|
group_info.help = summary
|
|
group_info.short_help = summary
|
|
group_info.hidden = is_hidden_path(group_path)
|
|
for group_info in app.registered_groups:
|
|
group_path = (group_info.name,)
|
|
summary = get_group_summary(group_path)
|
|
group_info.help = summary
|
|
group_info.short_help = summary
|
|
group_info.hidden = is_hidden_path(group_path)
|