Hermes 工具是自注册函数,按 toolset(工具集)分组,并通过中央注册表/调度系统执行。
主要文件:
tools/registry.pymodel_tools.pytoolsets.pytools/terminal_tool.pytools/environments/*
工具注册模型
每个工具模块在导入时调用registry.register(...)。
model_tools.py负责导入/发现工具模块,并构建供模型使用的 schema 列表。
registry.register()的工作原理
tools/中的每个工具文件在模块级别调用registry.register()来声明自身。函数签名如下:
registry.register( name="terminal", # 唯一工具名称(用于 API schema) toolset="terminal", # 该工具所属的 toolset schema={...}, # OpenAI function-calling schema(描述、参数) handler=handle_terminal, # 工具被调用时执行的函数 check_fn=check_terminal, # 可选:返回 True/False 表示是否可用 requires_env=["SOME_VAR"], # 可选:所需的环境变量(用于 UI 显示) is_async=False, # handler 是否为异步协程 description="Run commands", # 人类可读的描述 emoji="💻", # 用于 spinner/进度显示的 emoji )每次调用都会创建一个ToolEntry,以工具名称为键存储在单例ToolRegistry._tools字典中。若不同 toolset 之间出现名称冲突,会记录警告,后注册的条目覆盖前者。
发现机制:discover_builtin_tools()
当model_tools.py被导入时,会调用tools/registry.py中的discover_builtin_tools()。该函数使用 AST 解析扫描所有tools/*.py文件,找出包含顶层registry.register()调用的模块,然后导入它们:
# tools/registry.py(简化版) def discover_builtin_tools(tools_dir=None): tools_path = Path(tools_dir) if tools_dir else Path(__file__).parent for path in sorted(tools_path.glob("*.py")): if path.name in {"__init__.py", "registry.py", "mcp_tool.py"}: continue if _module_registers_tools(path): # AST 检查顶层 registry.register() importlib.import_module(f"tools.{path.stem}")这种自动发现机制意味着新工具文件会被自动识别——无需手动维护列表。AST 检查只匹配顶层的registry.register()调用(不匹配函数内部的调用),因此tools/中的辅助模块不会被导入。
每次导入都会触发模块的registry.register()调用。可选工具中的错误(例如图像生成工具缺少fal_client)会被捕获并记录——不会阻止其他工具加载。
核心工具发现完成后,还会发现 MCP 工具和插件工具:
- MCP 工具—
tools.mcp_tool.discover_mcp_tools()读取 MCP 服务器配置,并注册来自外部服务器的工具。 - 插件工具—
hermes_cli.plugins.discover_plugins()加载用户/项目/pip 插件,这些插件可能注册额外的工具。
工具可用性检查(check_fn)
每个工具可以选择性地提供一个check_fn——一个可调用对象,在工具可用时返回True,否则返回False。典型的检查包括:
- API 密钥是否存在— 例如,
lambda: bool(os.environ.get("SERP_API_KEY"))用于网络搜索 - 服务是否运行— 例如,检查 Honcho 服务器是否已配置
- 二进制文件是否已安装— 例如,验证浏览器工具的
playwright是否可用
当registry.get_definitions()为模型构建 schema 列表时,会运行每个工具的check_fn():
# 简化自 registry.py if entry.check_fn: try: available = bool(entry.check_fn()) except Exception: available = False # 异常 = 不可用 if not available: continue # 完全跳过该工具关键行为:
- 检查结果按调用缓存——若多个工具共享同一个
check_fn,只运行一次。 check_fn()中的异常被视为"不可用"(故障安全)。is_toolset_available()方法检查某个 toolset 的check_fn是否通过,用于 UI 显示和 toolset 解析。
Toolset 解析
Toolset 是工具的命名集合。Hermes 通过以下方式解析它们:
- 显式启用/禁用的 toolset 列表
- 平台预设(
hermes-cli、hermes-telegram等) - 动态 MCP toolset
- 精选的特殊用途集合,如
hermes-acp
get_tool_definitions()如何过滤工具
主入口点为model_tools.get_tool_definitions(enabled_toolsets, disabled_toolsets, quiet_mode):
若提供了
enabled_toolsets— 仅包含这些 toolset 中的工具。每个 toolset 名称通过resolve_toolset()解析,将复合 toolset 展开为单个工具名称。若提供了
disabled_toolsets— 从所有 toolset 开始,减去已禁用的。若两者均未提供— 包含所有已知 toolset。
注册表过滤— 解析后的工具名称集合传递给
registry.get_definitions(),后者应用check_fn过滤并返回 OpenAI 格式的 schema。动态 schema 修补— 过滤后,
execute_code和browser_navigate的 schema 会被动态调整,仅引用实际通过过滤的工具(防止模型幻觉出不可用的工具)。
旧版 toolset 名称
带有_tools后缀的旧版 toolset 名称(例如web_tools、terminal_tools)通过_LEGACY_TOOLSET_MAP映射到其现代工具名称,以保持向后兼容性。
调度
运行时,工具通过中央注册表调度,但部分 agent 级别的工具(如 memory/todo/session-search 处理)由 agent 循环直接处理。
调度流程:模型 tool_call → handler 执行
当模型返回tool_call时,流程如下:
模型响应包含 tool_call ↓ run_agent.py agent 循环 ↓ model_tools.handle_function_call(name, args, task_id, user_task) ↓ [Agent 循环工具?] → 由 agent 循环直接处理(todo、memory、session_search、delegate_task) ↓ [插件 pre-hook] → invoke_hook("pre_tool_call", ...) ↓ registry.dispatch(name, args, **kwargs) ↓ 按名称查找 ToolEntry ↓ [异步 handler?] → 通过 _run_async() 桥接 [同步 handler?] → 直接调用 ↓ 返回结果字符串(或 JSON 错误) ↓ [插件 post-hook] → invoke_hook("post_tool_call", ...)错误包装
所有工具执行在两个层级进行错误处理:
registry.dispatch()— 捕获 handler 抛出的任何异常,并以 JSON 形式返回{"error": "Tool execution failed: ExceptionType: message"}。handle_function_call()— 将整个调度包裹在次级 try/except 中,返回{"error": "Error executing tool_name: message"}。
这确保模型始终收到格式正确的 JSON 字符串,而不会遇到未处理的异常。
Agent 循环工具
以下四个工具在注册表调度之前被拦截,因为它们需要 agent 级别的状态(TodoStore、MemoryStore 等):
todo— 规划/任务跟踪memory— 持久化 memory 写入session_search— 跨会话召回delegate_task— 生成子 agent 会话
这些工具的 schema 仍在注册表中注册(供get_tool_definitions使用),但若调度以某种方式直接到达它们,其 handler 会返回一个存根错误。
异步桥接
当工具 handler 为异步时,_run_async()将其桥接到同步调度路径:
- CLI 路径(无运行中的事件循环)— 使用持久化事件循环以保持缓存的异步客户端存活
- Gateway 路径(有运行中的事件循环)— 使用
asyncio.run()启动一个一次性线程 - 工作线程(并行工具)— 使用存储在线程本地存储中的每线程持久化循环
DANGEROUS_PATTERNS 审批流程
终端工具集成了定义在tools/approval.py中的危险命令审批系统:
模式检测—
DANGEROUS_PATTERNS是一个(regex, description)元组列表,涵盖破坏性操作:- 递归删除(
rm -rf) - 文件系统格式化(
mkfs、dd) - SQL 破坏性操作(
DROP TABLE、不带WHERE的DELETE FROM) - 系统配置覆写(
> /etc/) - 服务操控(
systemctl stop) - 远程代码执行(
curl | sh) - Fork bomb、进程终止等
- 递归删除(
检测— 在执行任何终端命令之前,
detect_dangerous_command(command)会对所有模式进行检查。审批提示— 若发现匹配:
- CLI 模式— 交互式提示要求用户批准、拒绝或永久允许
- Gateway 模式— 异步审批回调将请求发送至消息平台
- 智能审批— 可选地,辅助 LLM 可自动批准匹配模式但风险较低的命令(例如,
rm -rf node_modules/是安全的,但匹配"递归删除"模式)
会话状态— 审批按会话跟踪。一旦在某个会话中批准了"递归删除",后续的
rm -rf命令不会再次提示。永久允许列表— "永久允许"选项会将该模式写入
config.yaml的command_allowlist,跨会话持久化。
终端/运行时环境
终端系统支持多种后端:
- local
- docker
- ssh
- singularity
- modal
- daytona
还支持:
- 按任务的 cwd 覆盖
- 后台进程管理
- PTY 模式
- 危险命令的审批回调
并发
工具调用可以顺序执行,也可以并发执行,具体取决于工具组合和交互需求。
工具注册源码
# ------------------------------------------------------------------ # Registration # ------------------------------------------------------------------ def register( self, name: str, toolset: str, schema: dict, handler: Callable, check_fn: Callable = None, requires_env: list = None, is_async: bool = False, description: str = "", emoji: str = "", max_result_size_chars: int | float | None = None, dynamic_schema_overrides: Callable = None, override: bool = False, ): """Register a tool. Called at module-import time by each tool file. ``override=True`` is an explicit opt-in for plugins that intend to replace an existing built-in tool implementation (e.g. swap the default browser tool for a headed-Chrome CDP backend). Without it, registrations that would shadow an existing tool from a different toolset are rejected to prevent accidental overwrites. """ with self._lock: existing = self._tools.get(name) if existing and existing.toolset != toolset: # Allow MCP-to-MCP overwrites (legitimate: server refresh, # or two MCP servers with overlapping tool names). both_mcp = ( existing.toolset.startswith("mcp-") and toolset.startswith("mcp-") ) if both_mcp: logger.debug( "Tool '%s': MCP toolset '%s' overwriting MCP toolset '%s'", name, toolset, existing.toolset, ) elif override: # Explicit plugin opt-in: replace the existing tool. # Logged at INFO so the override is auditable in agent.log. logger.info( "Tool '%s': toolset '%s' overriding existing toolset '%s' " "(override=True opt-in)", name, toolset, existing.toolset, ) else: # Reject shadowing — prevent plugins/MCP from overwriting # built-in tools or vice versa. logger.error( "Tool registration REJECTED: '%s' (toolset '%s') would " "shadow existing tool from toolset '%s'. Pass " "override=True to register() if the replacement is " "intentional, or deregister the existing tool first.", name, toolset, existing.toolset, ) return self._tools[name] = ToolEntry( name=name, toolset=toolset, schema=schema, handler=handler, check_fn=check_fn, requires_env=requires_env or [], is_async=is_async, description=description or schema.get("description", ""), emoji=emoji, max_result_size_chars=max_result_size_chars, dynamic_schema_overrides=dynamic_schema_overrides, ) if check_fn and toolset not in self._toolset_checks: self._toolset_checks[toolset] = check_fn self._generation += 1工具发现源码
def _module_registers_tools(module_path: Path) -> bool: """Return True when the module contains a top-level ``registry.register(...)`` call. Only inspects module-body statements so that helper modules which happen to call ``registry.register()`` inside a function are not picked up. """ try: source = module_path.read_text(encoding="utf-8") tree = ast.parse(source, filename=str(module_path)) except (OSError, SyntaxError): return False return any(_is_registry_register_call(stmt) for stmt in tree.body) def discover_builtin_tools(tools_dir: Optional[Path] = None) -> List[str]: """Import built-in self-registering tool modules and return their module names.""" tools_path = Path(tools_dir) if tools_dir is not None else Path(__file__).resolve().parent module_names = [ f"tools.{path.stem}" for path in sorted(tools_path.glob("*.py")) if path.name not in {"__init__.py", "registry.py", "mcp_tool.py"} and _module_registers_tools(path) ] imported: List[str] = [] for mod_name in module_names: try: importlib.import_module(mod_name) imported.append(mod_name) except Exception as e: logger.warning("Could not import tool module %s: %s", mod_name, e) return imported工具执行源码
# ------------------------------------------------------------------ # Dispatch # ------------------------------------------------------------------ def dispatch(self, name: str, args: dict, **kwargs) -> str: """Execute a tool handler by name. * Async handlers are bridged automatically via ``_run_async()``. * All exceptions are caught and returned as ``{"error": "..."}`` for consistent error format. """ entry = self.get_entry(name) if not entry: return json.dumps({"error": f"Unknown tool: {name}"}) try: if entry.is_async: from model_tools import _run_async return _run_async(entry.handler(args, **kwargs)) return entry.handler(args, **kwargs) except Exception as e: logger.exception("Tool %s dispatch error: %s", name, e) # Route through the sanitizer so framing tokens / CDATA / fences # in exception strings don't reach the model as structural noise. # See model_tools._sanitize_tool_error for rationale. raw = f"Tool execution failed: {type(e).__name__}: {e}" try: from model_tools import _sanitize_tool_error sanitized = _sanitize_tool_error(raw) except Exception: sanitized = raw # defensive: never let the sanitizer block error propagation return json.dumps({"error": sanitized}) def _run_async(coro): """Run an async coroutine from a sync context. If the current thread already has a running event loop (e.g., inside the gateway's async stack or Atropos's event loop), we spin up a disposable thread so asyncio.run() can create its own loop without conflicting. For the common CLI path (no running loop), we use a persistent event loop so that cached async clients (httpx / AsyncOpenAI) remain bound to a live loop and don't trigger "Event loop is closed" on GC. When called from a worker thread (parallel tool execution), we use a per-thread persistent loop to avoid both contention with the main thread's shared loop AND the "Event loop is closed" errors caused by asyncio.run()'s create-and-destroy lifecycle. This is the single source of truth for sync->async bridging in tool handlers. Each handler is self-protecting via this function. """ try: loop = asyncio.get_running_loop() except RuntimeError: loop = None if loop and loop.is_running(): # Inside an async context (gateway, RL env) — run in a fresh thread # with its own event loop we own a reference to, so on timeout we # can cancel the task inside that loop (ThreadPoolExecutor.cancel() # only works on not-yet-started futures — it's a no-op on a running # worker, which previously leaked the thread on every 300 s timeout). import concurrent.futures worker_loop: Optional[asyncio.AbstractEventLoop] = None loop_ready = threading.Event() def _run_in_worker(): nonlocal worker_loop worker_loop = asyncio.new_event_loop() loop_ready.set() try: asyncio.set_event_loop(worker_loop) return worker_loop.run_until_complete(coro) finally: try: # Cancel anything still pending (e.g. task cancelled # externally via call_soon_threadsafe on timeout). pending = asyncio.all_tasks(worker_loop) for t in pending: t.cancel() if pending: worker_loop.run_until_complete( asyncio.gather(*pending, return_exceptions=True) ) except Exception: pass worker_loop.close() pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) future = pool.submit(_run_in_worker) try: return future.result(timeout=300) except concurrent.futures.TimeoutError: # Cancel the coroutine inside its own loop so the worker thread # can wind down instead of running forever. if loop_ready.wait(timeout=1.0) and worker_loop is not None: try: for t in asyncio.all_tasks(worker_loop): worker_loop.call_soon_threadsafe(t.cancel) except RuntimeError: # Loop already closed — nothing to cancel. pass raise finally: # wait=False: don't block the caller on a stuck coroutine. We've # already requested cancellation above; the worker will exit # once the coroutine observes it (usually at the next await). pool.shutdown(wait=False) # If we're on a worker thread (e.g., parallel tool execution in # delegate_task), use a per-thread persistent loop. This avoids # contention with the main thread's shared loop while keeping cached # httpx/AsyncOpenAI clients bound to a live loop for the thread's # lifetime — preventing "Event loop is closed" on GC cleanup. if threading.current_thread() is not threading.main_thread(): worker_loop = _get_worker_loop() return worker_loop.run_until_complete(coro) tool_loop = _get_tool_loop() return tool_loop.run_until_complete(coro)