Files

415 lines
15 KiB
Python

from __future__ import annotations
import json
from typing import Annotated, Any
import click
import typer
from .apps import GROUP_HELP_APPS, TOP_LEVEL_COMMANDS, app
from .core import CLIError
from .registry import (
get_command_doc,
get_group_summary,
has_subcommands,
is_hidden_path,
list_capabilities,
list_subcommands,
)
def _click_root_command() -> click.Command:
# Must stay lazy: the click tree is only complete after command modules import.
return typer.main.get_command(app)
def _normalize_command_path(tokens: list[str]) -> tuple[str, ...]:
while tokens and tokens[0] not in TOP_LEVEL_COMMANDS:
tokens = tokens[1:]
return tuple(tokens)
def context_command_path(click_ctx: click.Context | None) -> tuple[str, ...]:
if click_ctx is None:
return ()
return _normalize_command_path(click_ctx.command_path.split())
def _build_click_context(path: tuple[str, ...]) -> click.Context | None:
root = _click_root_command()
ctx: click.Context = click.Context(root, info_name="tjwater-cli")
command: click.Command = root
for token in path:
if not isinstance(command, click.Group):
return None
next_command = command.commands.get(token)
if next_command is None:
return None
ctx = click.Context(next_command, info_name=token, parent=ctx)
command = next_command
return ctx
def build_usage(path: tuple[str, ...]) -> str | None:
ctx = _build_click_context(path)
if ctx is None:
return None
parts = ["tjwater-cli", *path]
for parameter in ctx.command.params:
if not isinstance(parameter, click.Option):
continue
if "--help" in parameter.opts:
continue
option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
if parameter.is_flag:
parts.append(f"--{option_name}" if parameter.required else f"[--{option_name}]")
continue
placeholder = option_name.upper().replace("-", "_")
if parameter.required:
parts.extend([f"--{option_name}", f"<{placeholder}>"])
else:
parts.append(f"[--{option_name} <{placeholder}>]")
return " ".join(parts)
def _click_option_docs(path: tuple[str, ...]) -> list[dict[str, Any]]:
ctx = _build_click_context(path)
if ctx is None:
return []
options: list[dict[str, Any]] = []
for parameter in ctx.command.params:
if not isinstance(parameter, click.Option):
continue
if "--help" in parameter.opts:
continue
cli_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
options.append(
{
"name": cli_name,
"description": parameter.help or "",
"required": parameter.required,
"repeated": parameter.multiple,
"default": parameter.default,
}
)
return options
def _sample_option_value(path: tuple[str, ...], option_name: str) -> str:
path_specific_samples: dict[tuple[tuple[str, ...], str], str] = {
(("component", "option", "schema"), "kind"): "time",
(("component", "option", "get"), "kind"): "time",
(("data", "timeseries", "composite"), "kind"): "scada-simulation",
(("data", "scada", "get"), "kind"): "info",
(("data", "scada", "list"), "kind"): "info",
}
if (path, option_name) in path_specific_samples:
return path_specific_samples[(path, option_name)]
if option_name == "start-time":
return "2025-01-02T03:04:05+08:00"
if option_name == "end-time":
return "2025-01-02T04:04:05+08:00"
if option_name == "date":
return "2025-01-02"
if option_name == "duration":
return "30"
if option_name == "kind":
return "time"
if option_name == "mode":
return "close"
if option_name == "scheme":
return "baseline"
if option_name == "output":
return "./demo.inp" if "export-inp" in path else "./output.json"
if option_name == "pump":
return "PUMP-1"
if option_name == "node":
return "J1"
if option_name == "source-node":
return "J1"
if option_name == "drainage-node":
return "J2"
if option_name in {"link", "pipe", "pipe-id", "element-id", "element"}:
return "P1"
if option_name == "flow":
return "120.5"
if option_name == "concentration":
return "0.8"
if option_name == "device-id":
return "SCADA-001"
if option_name == "burst-file":
return "./burst.json"
if option_name == "valve-setting-file":
return "./valves.json"
if option_name.endswith("-file"):
return "./input.json"
if option_name.endswith("-id"):
return "demo-id"
return "demo"
def _build_example(path: tuple[str, ...], *, existing_examples: list[str] | None = None) -> str:
ctx = _build_click_context(path)
required_option_names: list[str] = []
if ctx is not None:
required_option_names = [
next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
for parameter in ctx.command.params
if isinstance(parameter, click.Option) and "--help" not in parameter.opts and parameter.required
]
if existing_examples:
for example in existing_examples:
has_required_options = all(f"--{option_name}" in example for option_name in required_option_names)
if has_required_options:
return example
parts = ["tjwater-cli", *path]
if ctx is None:
return " ".join(parts)
for parameter in ctx.command.params:
if not isinstance(parameter, click.Option):
continue
if "--help" in parameter.opts or not parameter.required:
continue
option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "")
parts.extend([f"--{option_name}", _sample_option_value(path, option_name)])
return " ".join(parts)
def _enrich_leaf_payload(payload: dict[str, Any], path: tuple[str, ...]) -> dict[str, Any]:
enriched = dict(payload)
enriched["usage"] = build_usage(path) or payload.get("usage")
click_options = _click_option_docs(path)
if click_options:
enriched["options"] = click_options
enriched["examples"] = payload.get("examples") or []
if not enriched["examples"] or all("<" in example and ">" in example for example in enriched["examples"]):
enriched["examples"] = [_build_example(path, existing_examples=enriched["examples"])]
return enriched
def _enrich_index_payload(payload: dict[str, Any]) -> dict[str, Any]:
enriched = dict(payload)
commands: list[dict[str, Any]] = []
for command in payload.get("commands", []):
command_item = dict(command)
path = tuple(command_item["command"].split())
doc = get_command_doc(path)
if doc is None and has_subcommands(path):
command_item["usage"] = f"tjwater-cli {' '.join(path)} help"
command_item["example"] = f"tjwater-cli {' '.join(path)} help"
else:
existing_examples = [] if doc is None else list(doc.get("examples", []))
command_item["usage"] = build_usage(path) or command_item.get("usage")
command_item["example"] = _build_example(path, existing_examples=existing_examples)
commands.append(command_item)
enriched["commands"] = commands
return enriched
def resolve_help_payload(path: tuple[str, ...]) -> tuple[dict[str, Any] | None, bool]:
if not path:
return list_capabilities(), True
payload = get_command_doc(path)
if payload is not None:
return _enrich_leaf_payload(payload, path), False
if has_subcommands(path):
return _enrich_index_payload(list_subcommands(path, get_group_summary(path))), True
return None, False
def emit_help_payload(payload: dict[str, Any]) -> None:
typer.echo(json.dumps(payload, ensure_ascii=False))
def merge_next_commands(*groups: list[str] | None) -> list[str]:
merged: list[str] = []
seen: set[str] = set()
for group in groups:
for command in group or []:
if command in seen:
continue
seen.add(command)
merged.append(command)
return merged
def merge_error_data(primary: Any, secondary: Any) -> Any:
if primary is None:
return secondary
if secondary is None:
return primary
if isinstance(primary, dict) and isinstance(secondary, dict):
return {**secondary, **primary}
return primary
def build_error_guidance(click_ctx: click.Context | None) -> tuple[Any, list[str]]:
command_path = context_command_path(click_ctx)
usage = build_usage(command_path) if command_path else None
if command_path:
if command_path[-1] == "help":
group_path = command_path[:-1]
if group_path:
return (
{
"command_group": " ".join(group_path),
"usage": f"tjwater-cli {' '.join(group_path)} help",
"examples": [f"tjwater-cli {' '.join(group_path)} help", f"tjwater-cli help {' '.join(group_path)}"],
},
merge_next_commands(
[f"tjwater-cli {' '.join(group_path)} help", f"tjwater-cli help {' '.join(group_path)}"],
["tjwater-cli help"],
),
)
payload, is_index = resolve_help_payload(command_path)
if payload is not None and not is_index:
return (
{
"command": payload["command"],
"usage": payload.get("usage") or usage,
"examples": payload.get("examples", []),
},
merge_next_commands([f"tjwater-cli help {' '.join(command_path)}"], ["tjwater-cli help"]),
)
if payload is not None and is_index:
return (
{
"command_group": " ".join(command_path),
"usage": f"tjwater-cli {' '.join(command_path)} help",
"examples": [f"tjwater-cli {' '.join(command_path)} help", f"tjwater-cli help {' '.join(command_path)}"],
},
merge_next_commands(
[f"tjwater-cli {' '.join(command_path)} help", f"tjwater-cli help {' '.join(command_path)}"],
["tjwater-cli help"],
),
)
return ({"usage": usage} if usage else None, ["tjwater-cli help"])
def classify_click_error(exc: click.ClickException) -> tuple[str, str]:
if isinstance(exc, click.NoSuchOption):
return "未知选项", "UNKNOWN_OPTION"
if isinstance(exc, click.MissingParameter):
return "缺少参数", "MISSING_PARAMETER"
if isinstance(exc, click.BadParameter):
return "参数无效", "INVALID_PARAMETER"
message = exc.format_message()
if "No such command" in message:
return "未找到命令", "COMMAND_NOT_FOUND"
return "CLI 参数错误", "USAGE_ERROR"
def _build_root_help_epilog() -> str:
return "\n".join(
[
"\b",
"Examples:",
" tjwater-cli help",
" tjwater-cli help simulation run",
" tjwater-cli simulation run --help",
]
)
def _build_leaf_help_epilog(path: tuple[str, ...], payload: dict[str, Any]) -> str:
lines = ["\b"]
description = payload.get("description")
usage = payload.get("usage")
examples = payload.get("examples", [])
next_commands = payload.get("next_commands", [])
if description:
lines.extend([f"Description: {description}", ""])
if usage:
lines.extend([f"Usage example: {usage}", ""])
if examples:
lines.append("Examples:")
lines.extend(f" {example}" for example in examples)
lines.append("")
if next_commands:
lines.append("Next steps:")
lines.extend(f" {command}" for command in next_commands)
lines.append("")
lines.extend(["Structured JSON:", f" tjwater-cli help {' '.join(path)}"])
return "\n".join(lines)
def _build_group_help_epilog(path: tuple[str, ...], payload: dict[str, Any]) -> str:
lines = ["\b", "Examples:", f" tjwater-cli help {' '.join(path)}"]
for command in payload.get("commands", [])[:2]:
example = command.get("example")
if example:
lines.append(f" {example}")
return "\n".join(lines)
def build_group_help_appendix(click_ctx: click.Context | None) -> str | None:
path = context_command_path(click_ctx)
if not path:
return _build_root_help_epilog()
payload, is_index = resolve_help_payload(path)
if payload is None or not is_index:
return None
return _build_group_help_epilog(path, payload)
def make_group_help_handler(path_prefix: tuple[str, ...]):
def group_help() -> None:
payload, is_index = resolve_help_payload(path_prefix)
if payload is None:
raise CLIError(
"未找到命令",
code="COMMAND_NOT_FOUND",
message=f"unknown command path: {' '.join(path_prefix)}",
exit_code=2,
next_commands=["tjwater-cli help"],
)
emit_help_payload(payload)
group_help.__name__ = f"{'_'.join(path_prefix)}_help"
return group_help
def register_group_help_commands() -> None:
for group_app, path_prefix in GROUP_HELP_APPS:
group_app.command("help")(make_group_help_handler(path_prefix))
def apply_typer_help_metadata() -> None:
app.help = "\n".join(
[
"TJWater agent CLI",
"",
"Examples:",
" tjwater-cli help",
" tjwater-cli help simulation run",
" tjwater-cli simulation run --help",
]
)
app.short_help = "TJWater agent CLI"
for group_app, path_prefix in GROUP_HELP_APPS:
for command_info in group_app.registered_commands:
command_path = (*path_prefix, command_info.name)
if command_info.name == "help":
command_info.help = f"输出 {' '.join(path_prefix)} 的 JSON 帮助信息。"
command_info.short_help = command_info.help
command_info.epilog = "\n".join(["\b", "Example:", f" tjwater-cli help {' '.join(path_prefix)}"])
command_info.hidden = False
continue
payload = get_command_doc(command_path)
command_info.help = None if payload is None else str(payload.get("summary", ""))
command_info.short_help = command_info.help
command_info.epilog = None if payload is None else _build_leaf_help_epilog(command_path, payload)
command_info.hidden = is_hidden_path(command_path)
for group_info in group_app.registered_groups:
group_path = (*path_prefix, group_info.name)
summary = get_group_summary(group_path)
group_info.help = summary
group_info.short_help = summary
group_info.hidden = is_hidden_path(group_path)
for group_info in app.registered_groups:
group_path = (group_info.name,)
summary = get_group_summary(group_path)
group_info.help = summary
group_info.short_help = summary
group_info.hidden = is_hidden_path(group_path)