最近在做一个智能客服系统的升级项目,老系统用的是纯规则引擎,后来试过直接调用大模型API,效果都不太理想。要么回答死板,要么响应慢,知识更新还得停机维护,业务部门意见很大。痛定思痛,我们决定用 LangGraph 和 RAG(检索增强生成)技术来重构,目标是打造一个既聪明又高效的客服大脑。折腾了几个月,效果还不错,响应速度提升了40%以上,这里把我们的实践和思考记录下来,希望能给有类似需求的同学一些参考。
1. 为什么传统方案不够用?
在深入技术细节前,我们先聊聊为什么之前的方案会“卡脖子”。
- 基于规则的客服系统:这是我们最初用的。优点是稳定、可控,但缺点太明显了。规则是死的,用户的问题是活的。每增加一个新业务或新话术,就得写一堆 if-else 规则,维护成本指数级上升。而且,它完全无法理解语义相似的问题,比如“怎么付款”和“支付方式有哪些”在它看来就是两个问题。
- 纯LLM(大语言模型)客服系统:后来我们想,直接用大模型总行了吧?确实,它的理解和生成能力很强。但问题也来了:第一,响应速度慢,每次都要让模型“从头思考”,复杂问题可能要等好几秒;第二,容易“胡说八道”,模型会基于其训练数据生成答案,但我们的产品细节、最新政策它并不知道,导致回答不准确;第三,成本高,每次对话都消耗大量Token。
核心痛点就两个:知识更新滞后和响应延迟。RAG 技术正好能解决知识新鲜度问题,而 LangGraph 的任务编排能力,则能优化整个问答流程的效率。
2. 为什么是 LangGraph?技术选型对比
构建一个智能客服,本质上是在编排一个工作流:接收问题 -> 检索知识 -> 组织答案 -> 可能还要反问澄清。我们需要一个灵活的工作流引擎。
我们对比过 Airflow、LangChain 和 LangGraph。
- Apache Airflow:功能强大,但它是为定时批处理任务设计的,调度粒度大(分钟级),不适合需要毫秒级响应的实时对话场景。它的 DAG(有向无环图)定义也略显繁重。
- LangChain:我们很熟悉,它的 Chain 和 Agent 概念很棒。但在处理复杂的、带状态的多轮对话时,用 Chain 来手动维护状态和跳转逻辑,代码会变得很绕,可读性和可维护性下降。
- LangGraph:它是基于 LangChain 构建的,但引入了状态图(StateGraph)的概念。你可以把对话的每个步骤(如检索、生成、确认)定义为一个节点(Node),节点之间通过边(Edge)连接,并且可以基于当前对话状态(State)决定下一步走哪条边。这完美契合了多轮对话的“状态机”特性。它的异步支持也更好,方便我们做并发优化。
所以,LangGraph 在对话式AI应用场景下,在灵活性和开发效率上优势明显。
3. 核心架构:RAG 与 LangGraph 如何协同工作?
整个系统的核心流程可以用下面这个图来概括:
graph TD A[用户提问] --> B[LangGraph: 路由节点] B -- 新问题/需检索 --> C[检索节点] C --> D[知识库<br/>向量检索] D --> E[获取相关文档片段] E --> F[生成节点] B -- 简单问候/上下文延续 --> F F --> G[LLM合成最终答案] G --> H[返回答案并更新对话状态] H --> I[结束] F -- 需要澄清 --> J[澄清节点] J --> H这个流程的关键在于「状态」的流转。我们定义了一个AgentState类来承载所有信息:
from typing import TypedDict, List, Annotated import operator class AgentState(TypedDict): # 用户当前输入的问题 question: str # 对话历史 chat_history: List[str] # 从知识库检索到的上下文 context: str # 模型生成的最终答案 answer: str # 一个标志位,用于控制流程跳转,例如是否需要澄清 needs_clarification: bool有了状态容器,我们就可以定义各个节点了。其中,检索节点和生成节点的异步通信是性能关键。
- 检索节点:它不直接调用LLM,只负责“找资料”。接收到用户问题后,它会将问题转换成向量,去向量数据库(如 FAISS)里搜索最相关的几个文档片段,并把它们拼接到
state['context']里。这个过程可以完全异步化。 - 生成节点:这是消耗计算资源的大户。它拿到
state['context']和state['question']后,会构造一个包含系统指令和上下文的 Prompt,发送给 LLM(如 GPT-4、文心一言等),生成最终答案。为了不让用户等待,这个节点我们也要做异步优化。
两个节点通过共享的AgentState进行通信,解耦了检索和生成,为并行处理提供了可能。
4. 代码实现:从知识库到对话流
理论说再多不如看代码。我们分两步走:先准备知识库,再搭建对话图。
第一步:知识库向量化处理
我们选用 FAISS 作为本地向量存储,轻量且高效。
from langchain_community.document_loaders import DirectoryLoader, TextLoader from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_huggingface import HuggingFaceEmbeddings from langchain_community.vectorstores import FAISS import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def create_vector_store(knowledge_dir: str, persist_path: str): """ 将知识库文档转化为FAISS向量存储 :param knowledge_dir: 存放知识文档的目录 :param persist_path: 向量存储持久化路径 """ try: # 1. 加载文档 loader = DirectoryLoader(knowledge_dir, glob="**/*.txt", loader_cls=TextLoader) documents = loader.load() logger.info(f"成功加载 {len(documents)} 个文档") # 2. 分割文本为适合检索的片段 text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, # 每个片段约500字符 chunk_overlap=50 # 片段间重叠50字符,保证上下文连贯 ) splits = text_splitter.split_documents(documents) logger.info(f"文档被分割为 {len(splits)} 个片段") # 3. 使用嵌入模型生成向量 # 这里选用开源模型,平衡效果与速度 embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5") # 4. 构建向量存储并持久化 vectorstore = FAISS.from_documents(splits, embeddings) vectorstore.save_local(persist_path) logger.info(f"向量存储已创建并保存至 {persist_path}") except Exception as e: logger.error(f"创建向量存储时发生错误: {e}") raise # 使用示例 if __name__ == "__main__": create_vector_store("./knowledge_base", "./vector_store/faiss_index")第二步:构建 LangGraph 对话流程
这是核心部分,我们定义图、节点和流转逻辑。
from langgraph.graph import StateGraph, END from langchain_community.vectorstores import FAISS from langchain_huggingface import HuggingFaceEmbeddings from langchain.prompts import ChatPromptTemplate from langchain_openai import ChatOpenAI # 示例使用OpenAI,可替换为其他LLM import asyncio # 初始化组件(实际项目中应使用依赖注入或配置中心) embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-small-zh-v1.5") vectorstore = FAISS.load_local("./vector_store/faiss_index", embeddings, allow_dangerous_deserialization=True) llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0.1) # 低温度保证答案稳定 # 定义节点函数 def retrieve_node(state: AgentState): """检索节点:从知识库中查找相关信息""" logger.info(f"检索节点处理问题: {state['question']}") try: # 执行相似度搜索,获取最相关的3个片段 docs = vectorstore.similarity_search(state['question'], k=3) context = "\n\n".join([doc.page_content for doc in docs]) # 更新状态中的上下文 return {"context": context} except Exception as e: logger.error(f"检索过程中出错: {e}") # 出错时返回空上下文,避免流程中断 return {"context": "检索服务暂时不可用。"} def generate_node(state: AgentState): """生成节点:基于检索到的上下文和问题,生成答案""" logger.info(f"生成节点开始合成答案,上下文长度: {len(state.get('context', ''))}") try: # 构建Prompt模板,明确指令和上下文 prompt_template = ChatPromptTemplate.from_messages([ ("system", "你是一个专业的客服助手。请严格根据以下已知信息来回答用户的问题。如果信息不足以回答问题,请礼貌地告知用户你不知道,不要编造信息。\n已知信息:{context}"), ("human", "{question}") ]) # 格式化Prompt formatted_prompt = prompt_template.format_messages( context=state.get('context', ''), question=state['question'] ) # 调用LLM生成答案 response = llm.invoke(formatted_prompt) answer = response.content # 一个简单的逻辑:如果答案中包含“请问您是指”等短语,则认为需要澄清 needs_clarification = "请问您是指" in answer or "您能具体说明一下吗" in answer return {"answer": answer, "needs_clarification": needs_clarification} except Exception as e: logger.error(f"生成答案过程中出错: {e}") return {"answer": "抱歉,思考过程出了点问题,请稍后再试。", "needs_clarification": False} def route_question(state: AgentState): """路由逻辑:决定流程走向""" # 规则1:如果是简单的问候语,直接跳转到生成节点(无需检索) simple_greetings = ["你好", "嗨", "在吗", "hello", "hi"] if state['question'].lower().strip() in simple_greetings: logger.info("检测到简单问候,路由至生成节点") return "generate" # 规则2:否则,先进行检索 return "retrieve" # 构建图 workflow = StateGraph(AgentState) # 添加节点 workflow.add_node("retrieve", retrieve_node) workflow.add_node("generate", generate_node) # 设置入口点 workflow.set_entry_point("retrieve") # 添加条件边 workflow.add_conditional_edges( "retrieve", # 路由函数,根据state决定下一个节点 lambda state: "generate", # 可能的下一个节点映射 { "generate": "generate", } ) workflow.add_conditional_edges( "generate", # 根据是否需要澄清来决定是否结束 lambda state: "clarify" if state.get('needs_clarification', False) else END, { "clarify": "generate", # 如果需要澄清,可以指向一个专门的澄清节点,这里简化为再次生成 END: END } ) # 编译图 app = workflow.compile()5. 性能优化:让客服“快”起来
架构搭好了,但要达到“高效”的目标,必须进行性能优化。我们主要做了两件事:缓存和异步化。
1. 缓存策略设计:用 Redis 记住“答案”
我们发现,用户问的很多问题是重复的,比如“运费多少”、“怎么退货”。每次都对相同问题进行向量检索和LLM生成,是巨大的浪费。
我们在检索节点和生成节点之前,加入了一个两级缓存:
- 一级缓存(内存缓存):使用
functools.lru_cache缓存最近会话的热点问题。速度快,但重启即失效。 - 二级缓存(Redis缓存):缓存问题的“检索结果”和“最终答案”。我们以问题的文本作为 Key,Value 是序列化后的答案或上下文。
import redis import json import hashlib class QueryCache: def __init__(self, redis_url='redis://localhost:6379', ttl=3600): self.redis_client = redis.from_url(redis_url) self.ttl = ttl # 缓存生存时间,单位秒 def get_key(self, question: str): """生成缓存键,使用MD5哈希避免长Key问题""" return f"rag_cache:{hashlib.md5(question.encode()).hexdigest()}" def get(self, question: str): """获取缓存""" key = self.get_key(question) cached = self.redis_client.get(key) return json.loads(cached) if cached else None def set(self, question: str, data: dict): """设置缓存""" key = self.get_key(question) self.redis_client.setex(key, self.ttl, json.dumps(data)) # 在检索节点中集成缓存 def retrieve_node_with_cache(state: AgentState, cache: QueryCache): cached_result = cache.get(state['question']) if cached_result: logger.info(f"缓存命中: {state['question'][:50]}...") return cached_result # 直接返回缓存的上下文 # 缓存未命中,执行正常检索流程... result = do_retrieve(state['question']) cache.set(state['question'], result) return result2. 负载测试与效果
优化后,我们使用 Locust 进行了压力测试。模拟了从简单问候到复杂业务咨询的多种问题。
- 优化前(无缓存,同步调用):平均响应时间约 2.8 秒,QPS(每秒查询率)在 15 左右达到瓶颈。
- 优化后(Redis缓存 + 异步节点):对于缓存命中率高的问题(约占70%),响应时间降至 200-500 毫秒。整体平均响应时间降至约 1.6 秒,提升超过40%。QPS 在同样资源下可以稳定支持 35+。
6. 避坑指南:我们踩过的那些“坑”
坑一:对话状态管理混乱初期我们把所有历史对话都塞进state['chat_history'],导致后面 Prompt 越来越长,不仅拖慢速度,还可能超出模型上下文窗口。解决方案:只保留最近3-5轮对话,或者使用 LangGraph 的checkpointer功能来管理更长的记忆,将历史摘要化存储。
坑二:知识库冷启动效果差刚上线时,向量数据库里文档少,检索到的内容可能不相关,导致LLM“瞎编”。最佳实践:
- 数据清洗:上线前对知识文档做仔细的清洗和格式化,确保没有乱码和无关信息。
- 混合检索:不要只依赖向量检索。可以结合关键词检索(如 BM25),当向量检索置信度低时,用关键词结果作为补充。
- 人工反馈闭环:初期设置一个后台,将低置信度的问答对推送给运营人员审核,纠正后直接补充进知识库,快速迭代。
坑三:LLM生成答案的稳定性即使给了上下文,LLM有时还是会忽略它,或者格式混乱。解决方案:在系统 Prompt 里使用更强烈的指令,比如“你必须且只能根据以下信息回答”,并设置较低的temperature参数(如0.1)来减少随机性。
7. 延伸思考:未来还能怎么优化?
目前这个系统已经能稳定运行,但技术永远有优化空间。我们接下来想探索两个方向:
动态知识更新:现在更新知识库,需要重新跑一遍向量化脚本,服务有短暂中断。我们计划实现一个“增量更新”管道。监控知识源(如Confluence、Git)的变更,通过消息队列触发,自动将变更的文档切片、向量化并增量更新到 FAISS 或更高级的向量数据库(如 Weaviate、Pinecone)中,实现近乎实时的知识同步。
多租户隔离:我们公司有多个产品线,希望共用一套客服系统,但数据要隔离。这需要在架构层面进行改造。可能的方案是:为每个租户创建独立的向量存储索引,在 LangGraph 的入口节点根据请求头或用户信息动态选择对应的向量库和LLM配置。状态管理也需要区分租户上下文。
写在最后
从规则引擎到 LangGraph + RAG,这次重构让我们深刻体会到,选择合适的工具和架构对系统性能和维护性至关重要。LangGraph 让我们能清晰地描绘和掌控复杂的对话流程,RAG 则保证了答案的准确性和时效性。加上缓存、异步等工程优化,最终让智能客服真正做到了“又快又准”。
当然,没有银弹。这个架构在应对非常复杂的、逻辑严密的业务咨询时仍有局限,可能需要结合业务规则引擎。但就解决“海量知识快速问答”这个核心诉求而言,它已经是一个非常得力的方案了。希望我们的实践能为你带来一些启发。
(上图:智能客服系统架构简化示意图,展示了从用户输入到答案输出的核心数据流)
整个搭建过程就像搭积木,LangGraph 提供了稳固的框架,RAG 是核心的知识处理引擎,而缓存、异步等优化则是让这个机器跑得更快的润滑剂。如果你也在考虑构建或升级智能客服,不妨试试这个组合拳。