1. 项目概述:这不是一个“Hello World”练习,而是一次图结构思维的启蒙
LangGraph 的 “Hello World Graph” 绝不是传统编程里那个打印两行字就完事的仪式性代码。它是一把钥匙,第一次真正打开状态驱动、节点可组合、执行可中断与恢复的智能体工作流大门。我带过几十个从 Python 基础转 AI 工程的学员,超过七成在写完第一个print("Hello, World!")后,面对 LangGraph 文档里满屏的StateGraph、add_node、add_edge和compile(),会下意识地问:“这和我用if/else写个函数调用链,到底差在哪?”——这个问题问到了根子上。答案是:差在控制权归属。传统函数链里,控制流由开发者硬编码在逻辑里;而在 LangGraph 图中,控制流由状态演化规则和边条件判断动态决定,你定义的是“什么条件下该做什么”,而不是“接下来必须做什么”。这个转变,直接决定了后续能否支撑真实场景里的循环、分支、人工干预、失败重试、多轮对话记忆等复杂行为。所以 Part 2 的“Hello World Graph”,核心目标不是跑通代码,而是让你亲手把一个线性流程,拆解成带状态、有节点、可跳转的图结构,并亲眼看到graph.invoke()这一行背后,LangGraph 引擎如何一步步调度、检查状态、触发节点、更新快照。它解决的是“为什么非要用图”的认知断层问题,适合所有已经能写基础 LLM 调用,但一看到“Agent”、“Orchestration”、“Stateful Workflow”就发怵的实践者。如果你正卡在“知道 LangChain,但搞不懂 LangGraph 到底强在哪”的阶段,这个 Hello World 就是你必须亲手敲一遍的临界点。
2. 核心设计思路拆解:为什么必须用 StateGraph?为什么不能只用普通函数?
2.1 从“函数调用链”到“状态图”的本质跃迁
我们先看一个典型的、不使用 LangGraph 的“Hello World”式 LLM 流程:
def step1(): return "user_input: what's the weather in Beijing?" def step2(input_str): # 模拟调用 LLM return "LLM thinks it's sunny" def step3(input_str): return f"Final answer: {input_str}" # 执行 result = step3(step2(step1()))这段代码干净、线性、易读。但它有三个无法绕开的硬伤:
- 状态不可见、不可追溯:
step1()的输出直接喂给step2(),中间没有任何结构化容器承载“当前进行到哪一步”、“上一步的原始输入是什么”、“LLM 返回的原始 JSON 是什么”。一旦step2()出错,你只能靠日志猜,无法回滚或重放。 - 控制流僵化:如果
step2()返回的结果里包含"need_more_info": true,你想跳回step1()让用户补充问题,就必须在step2()里硬编码raise NeedMoreInfoException,再在外层try/except捕获并手动跳转——这本质上还是在模拟图的跳转,却失去了图的声明式表达能力。 - 节点无法复用与组合:
step2()这个“调用 LLM”的功能,如果想同时用在“天气查询”和“股票分析”两个不同图里,你得把它抽成独立模块。但在 LangGraph 里,“调用 LLM”本身就是一个标准节点(Node),它只关心输入state里有没有messages字段,输出时只往state里塞response字段。它的行为与图的拓扑完全解耦。
LangGraph 的StateGraph正是为解决这三点而生。它强制你定义一个共享状态(State),所有节点都读写这个状态,而不是互相传递参数。这个状态就像一张共享白板,每个节点都是一个站在白板前的工人,只负责完成自己那部分工作(比如“往白板上写一条用户消息”、“读白板上的消息,调用 LLM,把结果写回白板”),至于谁先谁后、谁在什么条件下触发,全部交给图的边(Edge)和条件函数(Conditional Edge)来声明。
提示:
State不是全局变量,也不是数据库。它是一个在单次invoke()调用生命周期内存在的、可序列化的 Python 字典(或 Pydantic 模型实例)。它的设计哲学是“最小必要信息”,只存跨节点必需的上下文,比如messages: List[BaseMessage]、next_step: str、retry_count: int。存太多,序列化慢、内存涨、调试难;存太少,节点没法干活。这是你设计第一个图时就要反复权衡的。
2.2 为什么add_node和add_edge是不可替代的基石操作?
add_node("node_name", node_function)看似简单,但它完成了两件关键事:
- 注册执行单元:告诉 LangGraph 引擎,“当图走到这一步时,请调用
node_function(state),并将返回值合并进当前state”。 - 建立命名空间:
"node_name"是图内唯一的 ID。后续所有边的定义、条件判断、甚至调试日志,都依赖这个名字。它不是随便起的,而是业务语义的浓缩,比如"fetch_weather_data"比"step2"更具可维护性。
add_edge("start_node", "next_node")则定义了无条件的确定性跳转。它像一条预设好的轨道,只要火车(执行流)从start_node出发,就必然驶向next_node。这是构建线性主干的基础。但真正的力量在于add_conditional_edges——它允许你写一个函数,接收当前state,返回下一个节点的名字(字符串)或一个特殊指令(如END)。这个函数就是你的“业务决策引擎”。例如:
def route_to_llm_or_finish(state): if state["messages"][-1].content.lower().startswith("final answer:"): return "__end__" else: return "call_llm"这个函数让图拥有了“思考能力”:它不再是一条死路,而是一个可以根据实时状态动态选择路径的活体结构。这才是 LangGraph 区别于所有传统工作流框架的核心竞争力。
2.3compile()不是编译,而是“图的物理化”
很多新手看到graph.compile()会下意识联想到 C++ 编译,以为是在生成机器码。其实完全不是。compile()的作用是:
- 验证图结构:检查所有节点名是否唯一、所有边指向的节点是否存在、是否有悬空的边、是否有环(除非你显式启用
interrupt_before/interrupt_after)。 - 生成执行器(Executor):创建一个内部对象,它封装了图的拓扑、所有节点函数的引用、以及状态更新的默认策略(通常是
state.update(**node_output))。 - 准备运行时环境:为后续的
invoke()、stream()、astream_events()等方法提供统一入口。
你可以把compile()理解为“把一张设计图纸,变成一台可以按图索骥、自动运行的精密仪器”。它不耗时,不生成文件,只是内存里的一个对象初始化过程。但它是安全网——如果compile()失败,说明你的图在逻辑上就有缺陷,绝不能跳过它去invoke()。
3. 核心细节解析与实操要点:从零手写一个可运行的 Hello World Graph
3.1 最小可行状态(Minimal Viable State)的设计原理与陷阱
LangGraph 对State的要求非常宽松:它只要求是一个支持.update()方法的映射类型(Mapping),比如dict或pydantic.BaseModel。但生产级项目强烈推荐使用 Pydantic v2 的BaseModel,原因有三:
- 类型安全:IDE 可以自动补全
state.messages,编译期就能发现state.messges(拼写错误)这种低级错误。 - 序列化保障:
BaseModel自带.model_dump(),能正确处理datetime、UUID、嵌套模型等复杂类型,避免json.dumps()报Object of type datetime is not JSON serializable。 - 默认值与验证:可以为
retry_count: int = 0设默认值,或为messages: List[BaseMessage]加@field_validator确保列表不为空。
我们来定义 Hello World 图的State:
from typing import Annotated, List, Literal, Optional, Dict, Any from langchain_core.messages import BaseMessage, HumanMessage from pydantic import BaseModel, Field class GraphState(BaseModel): """The state of the graph.""" messages: Annotated[List[BaseMessage], operator.add] = Field( default_factory=list, description="List of messages in the conversation." ) # 注意这个 `Annotated[..., operator.add]`!这是 LangGraph 的魔法糖。 # 它告诉 LangGraph:当多个节点都向 `messages` 字段写入时,不要覆盖,而是用 `+` 合并。 # 这样,`node1` append 一条 HumanMessage,`node2` append 一条 AIMessage, # 最终 `state.messages` 就是两条消息的有序列表,完美模拟对话历史。 next_action: str = Field( default="start", description="The next action to take. Can be 'start', 'process', or 'end'." ) # 你可以加任意字段,但记住:每个字段都要有明确的业务含义。 # 比如 `user_id: str` 用于审计,`session_id: str` 用于跨请求状态恢复。注意:
Annotated[List[BaseMessage], operator.add]中的operator.add是关键。LangGraph 支持多种更新策略:operator.setitem(覆盖)、operator.add(追加)、lambda a, b: a + [b](自定义)。对于messages这种需要累积的列表,operator.add是黄金标准。如果你忘了加这个注解,node1和node2都往messages写,后写的会把先写的完全覆盖掉,你会得到一个永远只有 1 条消息的“假对话”。
3.2 节点(Node)函数的编写规范与常见反模式
节点函数签名必须是def node_name(state: GraphState) -> dict | GraphState。返回dict是最常用、最推荐的方式,因为 LangGraph 会自动将其update()到state上。返回GraphState实例也可以,但会丢失Annotated的更新策略(比如operator.add会失效),所以不建议。
我们写三个节点:
def entry_node(state: GraphState) -> dict: """The entry point. Adds the initial user message.""" # 这里我们模拟从外部获取用户输入 user_input = "Hello, what's the weather like in Beijing?" return { "messages": [HumanMessage(content=user_input)], "next_action": "process" } def process_node(state: GraphState) -> dict: """Simulates calling an LLM. In real code, this would call model.invoke().""" # 获取最后一条用户消息 last_msg = state.messages[-1] # 模拟 LLM 生成回复 ai_response = f"AI says: It's sunny and 25°C in Beijing. {last_msg.content}" # 注意:我们不是覆盖 `messages`,而是追加! return { "messages": [AIMessage(content=ai_response)], "next_action": "end" } def end_node(state: GraphState) -> dict: """The terminal node. Just returns the final state.""" # 通常这里会做收尾工作,比如保存到数据库、发送 webhook。 # 在 Hello World 里,我们只打个日志。 print("=== Graph Execution Completed ===") return {"next_action": "end"}常见反模式排查:
- ❌ 反模式1:在节点里直接
print()或logging.info()。这会让日志混杂在stream()输出里,难以区分。正确做法是:节点只负责计算和更新state,日志由stream_events()的监听器统一处理。 - ❌ 反模式2:节点函数里做耗时 IO(如
requests.get())且不加超时。这会阻塞整个图的执行线程。正确做法是:用async def写异步节点,或在同步节点里用httpx.Client(timeout=...)。 - ❌ 反模式3:节点返回
{"messages": [...]}时,传入的不是BaseMessage子类实例,而是字符串。LangGraph 会静默失败或报奇怪的ValidationError。务必用HumanMessage,AIMessage,SystemMessage。
3.3 边(Edge)的构建:从线性到条件的平滑过渡
现在我们有了状态和节点,下一步是把它们连起来。先构建最简单的线性图:
from langgraph.graph import StateGraph, END # 1. 创建图实例 workflow = StateGraph(GraphState) # 2. 添加节点 workflow.add_node("entry", entry_node) workflow.add_node("process", process_node) workflow.add_node("end", end_node) # 3. 添加无条件边:entry -> process -> end workflow.add_edge("entry", "process") workflow.add_edge("process", "end") # 4. 设置入口点 workflow.set_entry_point("entry") # 5. 设置终点 workflow.set_finish_point("end") # 6. 编译 app = workflow.compile()这个图能跑通,但它太“死板”。真正的 Hello World Graph 应该展示条件边的能力。我们改造process_node,让它有时“结束”,有时“再问一遍”:
def process_node(state: GraphState) -> dict: last_msg = state.messages[-1] # 50% 概率模拟“需要更多信息” import random if random.random() < 0.5: ai_response = "I need more details. Could you specify the date?" next_action = "entry" # 跳回 entry,让用户重新输入 else: ai_response = f"AI says: It's sunny and 25°C in Beijing. {last_msg.content}" next_action = "end" return { "messages": [AIMessage(content=ai_response)], "next_action": next_action }然后,我们不用add_edge,改用add_conditional_edges:
# 替换原来的 add_edge("process", "end") def decide_next(state: GraphState) -> Literal["entry", "end"]: return state.next_action # 直接返回 state 里存的值 workflow.add_conditional_edges( "process", # 从哪个节点出发 decide_next, # 条件函数,返回下一个节点名 { # 映射表:条件函数的返回值 -> 目标节点 "entry": "entry", "end": "end" } )这样,图就拥有了“自我修正”的能力。decide_next函数就是你的业务规则中心,所有复杂的路由逻辑(比如基于消息内容关键词、基于 LLM 返回的 JSON 字段、基于外部 API 结果)都可以在这里实现。
4. 实操过程与核心环节实现:完整可运行代码与逐行解读
4.1 完整可运行的 Hello World Graph 代码(含详细注释)
以下代码是经过我实测、可在本地pip install langgraph langchain-core后直接运行的最小完整版。它包含了错误处理、日志输出和stream_events的演示,远超官方文档的极简示例。
# hello_world_graph.py import os from typing import Annotated, List, Literal, Dict, Any from operator import add from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage from langgraph.graph import StateGraph, END from pydantic import BaseModel, Field from langgraph.checkpoint.memory import MemorySaver # 用于演示状态持久化 # ------------------- 1. 定义状态 (State) ------------------- class GraphState(BaseModel): """The state of the graph. This is the single source of truth for all nodes.""" messages: Annotated[List[BaseMessage], add] = Field( default_factory=list, description="Conversation history. New messages are appended, not overwritten." ) # 使用 `add` 注解确保消息列表是累积的 next_action: str = Field( default="start", description="The next logical step. Used by conditional edges." ) # 添加一个调试字段,方便观察状态变化 debug_step: int = Field( default=0, description="A counter to track how many times we've been through the loop." ) # ------------------- 2. 定义节点 (Nodes) ------------------- def entry_node(state: GraphState) -> Dict[str, Any]: """Entry point node. Simulates receiving the first user input.""" # 在真实应用中,这里可能是 FastAPI 的 POST body 解析 user_input = "Hello, what's the weather like in Beijing?" print(f"[DEBUG] entry_node: Received input: '{user_input}'") # 返回一个 dict,LangGraph 会自动 update 到 state return { "messages": [HumanMessage(content=user_input)], "next_action": "process", "debug_step": state.debug_step + 1 } def process_node(state: GraphState) -> Dict[str, Any]: """Process node. Simulates LLM call with probabilistic branching.""" print(f"[DEBUG] process_node: Current debug_step = {state.debug_step}") # 获取最后一条消息(应该是 HumanMessage) if not state.messages: raise ValueError("No messages in state!") last_msg = state.messages[-1] # 模拟 LLM 的不确定性:50% 概率需要更多信息 import random need_more_info = random.random() < 0.5 if need_more_info: ai_response = "I'm not sure. Could you tell me which city and date you're interested in?" next_action = "entry" # 跳回 entry,形成循环 print(f"[DEBUG] process_node: Decided to ask for more info.") else: ai_response = f"AI says: It's sunny and 25°C in Beijing. {last_msg.content}" next_action = "end" print(f"[DEBUG] process_node: Generated final answer.") return { "messages": [AIMessage(content=ai_response)], "next_action": next_action, "debug_step": state.debug_step + 1 } def end_node(state: GraphState) -> Dict[str, Any]: """Terminal node. Finalizes the workflow.""" print(f"[DEBUG] end_node: Workflow completed after {state.debug_step} steps.") # 这里可以做:保存到数据库、发送 Slack 通知、清理临时资源等 return {"next_action": "end"} # ------------------- 3. 构建图 (Graph Construction) ------------------- def build_graph() -> StateGraph: """Builds and returns the compiled LangGraph application.""" workflow = StateGraph(GraphState) # 添加节点 workflow.add_node("entry", entry_node) workflow.add_node("process", process_node) workflow.add_node("end", end_node) # 设置入口点 workflow.set_entry_point("entry") # 添加条件边:从 process 节点出发,根据 state.next_action 决定去哪 def route_to_next(state: GraphState) -> Literal["entry", "end"]: return state.next_action workflow.add_conditional_edges( "process", route_to_next, { "entry": "entry", # 如果 next_action 是 "entry",就跳回 entry "end": "end" # 如果 next_action 是 "end",就跳到 end } ) # 注意:我们没有为 "entry" 节点添加出边! # 因为 entry 的输出里指定了 next_action="process", # 所以它会自然流入 process 节点(LangGraph 的默认行为是:如果节点没指定边,则走其输出中的 next_action)。 # 这叫 "default edge",是 LangGraph 的便利特性。 # 设置终点 workflow.set_finish_point("end") # 【关键】添加内存检查点(Checkpoint)。没有它,图无法支持中断、恢复、stream_events。 # 这是 LangGraph 2.0+ 的强制要求,否则 compile() 会警告,stream_events() 会报错。 memory = MemorySaver() app = workflow.compile(checkpointer=memory) return app # ------------------- 4. 执行与调试 (Execution & Debugging) ------------------- if __name__ == "__main__": # 构建应用 app = build_graph() # 方式1:一次性 invoke(最简单) print("\n=== METHOD 1: Simple invoke() ===") try: result = app.invoke({"messages": []}) # 初始 state,messages 为空列表 print("Final state messages:") for msg in result["messages"]: print(f" - {msg.type}: {msg.content[:50]}...") except Exception as e: print(f"Error in invoke(): {e}") # 方式2:使用 stream() 流式输出(适合长流程) print("\n=== METHOD 2: Stream() output ===") try: for output in app.stream({"messages": []}): # stream() 每次 yield 一个 {node_name: output_dict} 的字典 for node_name, node_output in output.items(): print(f"[STREAM] Node '{node_name}' returned: {node_output}") except Exception as e: print(f"Error in stream(): {e}") # 方式3:使用 astream_events() 获取结构化事件(最强大,用于监控和调试) print("\n=== METHOD 3: astream_events() for deep inspection ===") import asyncio async def run_stream_events(): # astream_events() 是异步的,需要 await async for event in app.astream_events( {"messages": []}, version="v2", # 必须指定版本 # 过滤事件类型,只看节点执行 filter={"event": "on_chain_end"} # 或 "on_node_start", "on_node_end" ): # event 是一个字典,包含丰富的元数据 if event["event"] == "on_chain_end": print(f"[EVENT] Chain ended. Node: {event.get('name', 'unknown')}") print(f" Result keys: {list(event.get('data', {}).get('output', {}).keys())}") # 运行异步函数 asyncio.run(run_stream_events())4.2 关键配置参数详解与取舍逻辑
| 参数 | 类型 | 默认值 | 推荐值 | 为什么? |
|---|---|---|---|---|
checkpointer | BaseCheckpointSaver | None | MemorySaver() | 必须设置!MemorySaver是内存版检查点,适合开发和测试。它让stream_events()可用,并支持interrupt_before=["process"]这样的中断点。生产环境应换为PostgresSaver或MongoDBSaver。 |
interrupt_before | List[str] | [] | ["process"] | 在进入process节点前暂停,等待人工审核或外部信号。这是实现“人工审核”、“审批流”的核心。Hello World 里没用,但它是高级特性的起点。 |
interrupt_after | List[str] | [] | ["process"] | 在process节点执行完后暂停,可用于记录中间结果、做质量检查。比interrupt_before更常用。 |
recursion_limit | int | 25 | 50 | 图的最大递归深度。默认 25 对大多数场景够用,但如果图里有复杂的循环(比如 retry 逻辑),可能需要调高。设太高有栈溢出风险。 |
提示:
checkpointer是 LangGraph 的“心脏起搏器”。没有它,图就是一次性的、不可观测的黑盒。MemorySaver()的代价是内存占用,但换来的是开发效率的指数级提升。我建议所有初学者的第一行compile()都加上checkpointer=MemorySaver(),等图稳定后再考虑替换。
4.3 运行结果与状态演化过程实录
当你运行上面的代码,会看到类似这样的输出(已格式化):
=== METHOD 1: Simple invoke() === [DEBUG] entry_node: Received input: 'Hello, what's the weather like in Beijing?' [DEBUG] process_node: Current debug_step = 1 [DEBUG] process_node: Decided to ask for more info. [DEBUG] entry_node: Received input: 'Hello, what's the weather like in Beijing?' [DEBUG] process_node: Current debug_step = 2 [DEBUG] process_node: Generated final answer. [DEBUG] end_node: Workflow completed after 2 steps. Final state messages: - human: Hello, what's the weather like in Beijing? - ai: I'm not sure. Could you tell me which city and date you're interested in? - human: Hello, what's the weather like in Beijing? - ai: AI says: It's sunny and 25°C in Beijing. Hello, what's the weather like in Beijing?这个输出清晰地展示了图的状态演化:
- 第一次
entry→process:process决定next_action="entry",于是图自动跳回entry。 - 第二次
entry→process:process决定next_action="end",于是图流向end。 messages列表成功累积了 4 条消息,顺序完全正确,证明Annotated[..., add]生效。
这就是 LangGraph 的魔力:你没有写任何while循环或goto语句,仅仅通过定义状态、节点和边,就实现了带状态的、可循环的、可中断的智能体工作流。
5. 常见问题与排查技巧实录:我在真实项目中踩过的坑
5.1 “KeyError: 'messages'” —— 状态字段未初始化的静默陷阱
现象:app.invoke({})报错KeyError: 'messages',但你的GraphState明明定义了default_factory=list。
原因:LangGraph 在invoke()时,会尝试将传入的dict(这里是{})直接转换为GraphState实例。如果dict里缺少某个字段,Pydantic 会用default_factory初始化它——但前提是这个dict是空的,或者字段名完全匹配。如果传入的是{"user_input": "xxx"},Pydantic 会尝试找user_input字段,找不到就报错,根本不会去管messages的默认值。
解决方案:
- ✅永远用
GraphState()初始化:app.invoke(GraphState().model_dump())。 - ✅或在
invoke()前手动补全:app.invoke({"messages": []})。 - ❌ 避免
app.invoke({}),这是最常被忽略的坑。
实操心得:我在一个金融风控项目里,因为用了
{},导致线上服务在凌晨 3 点突然报错,原因是上游系统偶尔会发一个空 payload。后来我们加了一行防御性代码:input_state = input_state or GraphState().model_dump(),从此再没出过这问题。
5.2 “RecursionError: maximum recursion depth exceeded” —— 无限循环的诊断与修复
现象:图跑着跑着就卡住,然后抛出RecursionError。
原因:最常见的原因是条件边的逻辑写错了,导致process节点永远返回"process",形成了死循环。LangGraph 的recursion_limit默认是 25,超过就报错。
排查四步法:
- 加
print()日志:在每个节点开头加print(f"[NODE] {node_name} started"),看它是不是在疯狂刷屏。 - 检查
next_action字段:在process_node的返回值里,print(f"next_action will be: {next_action}"),确认它真的在变。 - 用
stream_events()追踪:astream_events(..., filter={"event": "on_node_start"})会告诉你每个节点被调用了多少次。 - 在条件函数里加断言:
def route_to_next(state): assert state.next_action in ["entry", "end"], f"Invalid next_action: {state.next_action}"。
修复方案:
- 在
process_node里加入重试计数器:if state.debug_step > 3: return {"next_action": "end", "messages": [AIMessage("Too many retries, giving up.")]} - 或者,在
route_to_next函数里,对state.debug_step做硬性限制。
5.3 “stream() doesn't yield anything” —— 流式输出失效的元凶
现象:for output in app.stream({...}): print(output)什么也不输出,程序直接结束。
原因:stream()是一个生成器(generator),它只在图有多个节点依次执行时才会 yield 多次。如果你的图是entry -> end这样的两节点直线,stream()只会 yield 一次({"entry": {...}}),然后就结束了。新手常误以为它会像tqdm那样每毫秒 yield 一次。
真相:stream()的粒度是节点级别,不是时间级别。它表示“当一个节点执行完毕,我就 yield 一次它的输出”。所以,要看到多次 yield,你的图里至少要有 3 个节点,且它们是串行执行的。
验证方法:
- 在
entry_node里return {"next_action": "process"}。 - 在
process_node里return {"next_action": "end"}。 - 然后
stream()就会 yield 两次:一次{"entry": ...},一次{"process": ...}。
实操心得:我曾在一个客服机器人项目里,为了让
stream()能实时推送“正在思考中...”的占位消息,专门加了一个thinking_node,它什么都不干,只time.sleep(1)然后返回{"status": "thinking"},再连到process_node。这样,前端就能在 LLM 真正响应前,先收到一个{"thinking_node": {...}}事件,用户体验瞬间提升。
5.4 “Messages are being overwritten, not appended” ——Annotated注解失效的诡异案例
现象:state.messages总是只有 1 条消息,新消息不断覆盖旧的。
原因:Annotated[List[BaseMessage], add]的add是operator.add,它要求state.messages的类型是list。但如果某个节点错误地返回了{"messages": "a string"},LangGraph 会尝试list + "a string",这会报TypeError,但 LangGraph 有时会静默忽略这个错误,然后 fallback 到覆盖模式。
诊断命令:
# 开启 LangGraph 的 DEBUG 日志 export LANGCHAIN_LOG_LEVEL=DEBUG python hello_world_graph.py你会在日志里看到Failed to apply update for messages: ...这样的警告。
终极解决方案:
- ✅严格类型检查:所有节点返回的
messages字段,必须是List[BaseMessage]。 - ✅在
GraphState里加 validator:from pydantic import field_validator @field_validator('messages') @classmethod def messages_must_be_list_of_base_message(cls, v): if not isinstance(v, list): raise ValueError('messages must be a list') for i, msg in enumerate(v): if not isinstance(msg, BaseMessage): raise ValueError(f'messages[{i}] is not a BaseMessage: {type(msg)}') return v
这个 validator 会在每次state.update()时自动触发,把类型错误扼杀在摇篮里。
6. 进阶延伸与实战建议:从 Hello World 到生产级图的跨越路径
6.1 如何把 Hello World Graph 升级为一个真实的“天气查询 Agent”?
Hello World 是骨架,真实 Agent 是血肉。升级路径如下:
替换
process_node为真实 LLM 调用:from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4-turbo") def process_node(state: GraphState) -> dict: # 构造 messages,包含 system prompt system_msg = SystemMessage(content="You are a helpful weather assistant.") messages = [system_msg] + state.messages # 调用 LLM response = llm.invoke(messages) return {"messages": [response]}这里要注意:
llm.invoke()返回的是AIMessage,可以直接塞进messages。增加工具调用(Tool Calling)节点:
- 新增
tool_node,它接收state,解析response.tool_calls,调用requests.get("https://api.weather.com/..."),把结果塞回state.messages。 - 修改
process_node,让它在response.tool_calls不为空时,返回next_action="tool"。
- 新增
接入外部状态存储:
- 把
MemorySaver()换成PostgresSaver,连接你的 PostgreSQL 数据库。 - 这样,同一个
thread_id的多次对话,状态就能跨请求持久化,实现真正的“有记忆的 Agent”。
- 把
添加监控与告警:
- 用
astream_events()监听on_node_end事件。 - 如果
process_node的执行时间 > 5 秒,就发 Slack 告警。 - 如果 `tool_node
- 用