from __future__ import annotations import json from typing import Annotated, Any import click import typer from .apps import GROUP_HELP_APPS, TOP_LEVEL_COMMANDS, app from .core import CLIError from .registry import ( get_command_doc, get_group_summary, has_subcommands, is_hidden_path, list_capabilities, list_subcommands, ) def _click_root_command() -> click.Command: # Must stay lazy: the click tree is only complete after command modules import. return typer.main.get_command(app) def _normalize_command_path(tokens: list[str]) -> tuple[str, ...]: while tokens and tokens[0] not in TOP_LEVEL_COMMANDS: tokens = tokens[1:] return tuple(tokens) def context_command_path(click_ctx: click.Context | None) -> tuple[str, ...]: if click_ctx is None: return () return _normalize_command_path(click_ctx.command_path.split()) def _build_click_context(path: tuple[str, ...]) -> click.Context | None: root = _click_root_command() ctx: click.Context = click.Context(root, info_name="tjwater-cli") command: click.Command = root for token in path: if not isinstance(command, click.Group): return None next_command = command.commands.get(token) if next_command is None: return None ctx = click.Context(next_command, info_name=token, parent=ctx) command = next_command return ctx def build_usage(path: tuple[str, ...]) -> str | None: ctx = _build_click_context(path) if ctx is None: return None parts = ["tjwater-cli", *path] for parameter in ctx.command.params: if not isinstance(parameter, click.Option): continue if "--help" in parameter.opts: continue option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "") if parameter.is_flag: parts.append(f"--{option_name}" if parameter.required else f"[--{option_name}]") continue placeholder = option_name.upper().replace("-", "_") if parameter.required: parts.extend([f"--{option_name}", f"<{placeholder}>"]) else: parts.append(f"[--{option_name} <{placeholder}>]") return " ".join(parts) def _click_option_docs(path: tuple[str, ...]) -> list[dict[str, Any]]: ctx = _build_click_context(path) if ctx is None: return [] options: list[dict[str, Any]] = [] for parameter in ctx.command.params: if not isinstance(parameter, click.Option): continue if "--help" in parameter.opts: continue cli_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "") options.append( { "name": cli_name, "description": parameter.help or "", "required": parameter.required, "repeated": parameter.multiple, "default": parameter.default, } ) return options def _sample_option_value(path: tuple[str, ...], option_name: str) -> str: path_specific_samples: dict[tuple[tuple[str, ...], str], str] = { (("component", "option", "schema"), "kind"): "time", (("component", "option", "get"), "kind"): "time", (("data", "timeseries", "composite"), "kind"): "scada-simulation", (("data", "scada", "get"), "kind"): "info", (("data", "scada", "list"), "kind"): "info", } if (path, option_name) in path_specific_samples: return path_specific_samples[(path, option_name)] if option_name == "start-time": return "2025-01-02T03:04:05+08:00" if option_name == "end-time": return "2025-01-02T04:04:05+08:00" if option_name == "date": return "2025-01-02" if option_name == "duration": return "30" if option_name == "kind": return "time" if option_name == "mode": return "close" if option_name == "scheme": return "baseline" if option_name == "output": return "./demo.inp" if "export-inp" in path else "./output.json" if option_name == "pump": return "PUMP-1" if option_name == "node": return "J1" if option_name == "source-node": return "J1" if option_name == "drainage-node": return "J2" if option_name in {"link", "pipe", "pipe-id", "element-id", "element"}: return "P1" if option_name == "flow": return "120.5" if option_name == "concentration": return "0.8" if option_name == "device-id": return "SCADA-001" if option_name == "burst-file": return "./burst.json" if option_name == "valve-setting-file": return "./valves.json" if option_name.endswith("-file"): return "./input.json" if option_name.endswith("-id"): return "demo-id" return "demo" def _build_example(path: tuple[str, ...], *, existing_examples: list[str] | None = None) -> str: ctx = _build_click_context(path) required_option_names: list[str] = [] if ctx is not None: required_option_names = [ next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "") for parameter in ctx.command.params if isinstance(parameter, click.Option) and "--help" not in parameter.opts and parameter.required ] if existing_examples: for example in existing_examples: has_required_options = all(f"--{option_name}" in example for option_name in required_option_names) if has_required_options: return example parts = ["tjwater-cli", *path] if ctx is None: return " ".join(parts) for parameter in ctx.command.params: if not isinstance(parameter, click.Option): continue if "--help" in parameter.opts or not parameter.required: continue option_name = next((opt.lstrip("-") for opt in reversed(parameter.opts) if opt.startswith("--")), parameter.name or "") parts.extend([f"--{option_name}", _sample_option_value(path, option_name)]) return " ".join(parts) def _enrich_leaf_payload(payload: dict[str, Any], path: tuple[str, ...]) -> dict[str, Any]: enriched = dict(payload) enriched["usage"] = build_usage(path) or payload.get("usage") click_options = _click_option_docs(path) if click_options: enriched["options"] = click_options enriched["examples"] = payload.get("examples") or [] if not enriched["examples"] or all("<" in example and ">" in example for example in enriched["examples"]): enriched["examples"] = [_build_example(path, existing_examples=enriched["examples"])] return enriched def _enrich_index_payload(payload: dict[str, Any]) -> dict[str, Any]: enriched = dict(payload) commands: list[dict[str, Any]] = [] for command in payload.get("commands", []): command_item = dict(command) path = tuple(command_item["command"].split()) doc = get_command_doc(path) if doc is None and has_subcommands(path): command_item["usage"] = f"tjwater-cli {' '.join(path)} help" command_item["example"] = f"tjwater-cli {' '.join(path)} help" else: existing_examples = [] if doc is None else list(doc.get("examples", [])) command_item["usage"] = build_usage(path) or command_item.get("usage") command_item["example"] = _build_example(path, existing_examples=existing_examples) commands.append(command_item) enriched["commands"] = commands return enriched def resolve_help_payload(path: tuple[str, ...]) -> tuple[dict[str, Any] | None, bool]: if not path: return list_capabilities(), True payload = get_command_doc(path) if payload is not None: return _enrich_leaf_payload(payload, path), False if has_subcommands(path): return _enrich_index_payload(list_subcommands(path, get_group_summary(path))), True return None, False def emit_help_payload(payload: dict[str, Any]) -> None: typer.echo(json.dumps(payload, ensure_ascii=False)) def merge_next_commands(*groups: list[str] | None) -> list[str]: merged: list[str] = [] seen: set[str] = set() for group in groups: for command in group or []: if command in seen: continue seen.add(command) merged.append(command) return merged def merge_error_data(primary: Any, secondary: Any) -> Any: if primary is None: return secondary if secondary is None: return primary if isinstance(primary, dict) and isinstance(secondary, dict): return {**secondary, **primary} return primary def build_error_guidance(click_ctx: click.Context | None) -> tuple[Any, list[str]]: command_path = context_command_path(click_ctx) usage = build_usage(command_path) if command_path else None if command_path: if command_path[-1] == "help": group_path = command_path[:-1] if group_path: return ( { "command_group": " ".join(group_path), "usage": f"tjwater-cli {' '.join(group_path)} help", "examples": [f"tjwater-cli {' '.join(group_path)} help", f"tjwater-cli help {' '.join(group_path)}"], }, merge_next_commands( [f"tjwater-cli {' '.join(group_path)} help", f"tjwater-cli help {' '.join(group_path)}"], ["tjwater-cli help"], ), ) payload, is_index = resolve_help_payload(command_path) if payload is not None and not is_index: return ( { "command": payload["command"], "usage": payload.get("usage") or usage, "examples": payload.get("examples", []), }, merge_next_commands([f"tjwater-cli help {' '.join(command_path)}"], ["tjwater-cli help"]), ) if payload is not None and is_index: return ( { "command_group": " ".join(command_path), "usage": f"tjwater-cli {' '.join(command_path)} help", "examples": [f"tjwater-cli {' '.join(command_path)} help", f"tjwater-cli help {' '.join(command_path)}"], }, merge_next_commands( [f"tjwater-cli {' '.join(command_path)} help", f"tjwater-cli help {' '.join(command_path)}"], ["tjwater-cli help"], ), ) return ({"usage": usage} if usage else None, ["tjwater-cli help"]) def classify_click_error(exc: click.ClickException) -> tuple[str, str]: if isinstance(exc, click.NoSuchOption): return "未知选项", "UNKNOWN_OPTION" if isinstance(exc, click.MissingParameter): return "缺少参数", "MISSING_PARAMETER" if isinstance(exc, click.BadParameter): return "参数无效", "INVALID_PARAMETER" message = exc.format_message() if "No such command" in message: return "未找到命令", "COMMAND_NOT_FOUND" return "CLI 参数错误", "USAGE_ERROR" def _build_root_help_epilog() -> str: return "\n".join( [ "\b", "Examples:", " tjwater-cli help", " tjwater-cli help simulation run", " tjwater-cli simulation run --help", ] ) def _build_leaf_help_epilog(path: tuple[str, ...], payload: dict[str, Any]) -> str: lines = ["\b"] description = payload.get("description") usage = payload.get("usage") examples = payload.get("examples", []) next_commands = payload.get("next_commands", []) if description: lines.extend([f"Description: {description}", ""]) if usage: lines.extend([f"Usage example: {usage}", ""]) if examples: lines.append("Examples:") lines.extend(f" {example}" for example in examples) lines.append("") if next_commands: lines.append("Next steps:") lines.extend(f" {command}" for command in next_commands) lines.append("") lines.extend(["Structured JSON:", f" tjwater-cli help {' '.join(path)}"]) return "\n".join(lines) def _build_group_help_epilog(path: tuple[str, ...], payload: dict[str, Any]) -> str: lines = ["\b", "Examples:", f" tjwater-cli help {' '.join(path)}"] for command in payload.get("commands", [])[:2]: example = command.get("example") if example: lines.append(f" {example}") return "\n".join(lines) def build_group_help_appendix(click_ctx: click.Context | None) -> str | None: path = context_command_path(click_ctx) if not path: return _build_root_help_epilog() payload, is_index = resolve_help_payload(path) if payload is None or not is_index: return None return _build_group_help_epilog(path, payload) def make_group_help_handler(path_prefix: tuple[str, ...]): def group_help() -> None: payload, is_index = resolve_help_payload(path_prefix) if payload is None: raise CLIError( "未找到命令", code="COMMAND_NOT_FOUND", message=f"unknown command path: {' '.join(path_prefix)}", exit_code=2, next_commands=["tjwater-cli help"], ) emit_help_payload(payload) group_help.__name__ = f"{'_'.join(path_prefix)}_help" return group_help def register_group_help_commands() -> None: for group_app, path_prefix in GROUP_HELP_APPS: group_app.command("help")(make_group_help_handler(path_prefix)) def apply_typer_help_metadata() -> None: app.help = "\n".join( [ "TJWater agent CLI", "", "Examples:", " tjwater-cli help", " tjwater-cli help simulation run", " tjwater-cli simulation run --help", ] ) app.short_help = "TJWater agent CLI" for group_app, path_prefix in GROUP_HELP_APPS: for command_info in group_app.registered_commands: command_path = (*path_prefix, command_info.name) if command_info.name == "help": command_info.help = f"输出 {' '.join(path_prefix)} 的 JSON 帮助信息。" command_info.short_help = command_info.help command_info.epilog = "\n".join(["\b", "Example:", f" tjwater-cli help {' '.join(path_prefix)}"]) command_info.hidden = False continue payload = get_command_doc(command_path) command_info.help = None if payload is None else str(payload.get("summary", "")) command_info.short_help = command_info.help command_info.epilog = None if payload is None else _build_leaf_help_epilog(command_path, payload) command_info.hidden = is_hidden_path(command_path) for group_info in group_app.registered_groups: group_path = (*path_prefix, group_info.name) summary = get_group_summary(group_path) group_info.help = summary group_info.short_help = summary group_info.hidden = is_hidden_path(group_path) for group_info in app.registered_groups: group_path = (group_info.name,) summary = get_group_summary(group_path) group_info.help = summary group_info.short_help = summary group_info.hidden = is_hidden_path(group_path)