MT5 Streamlit工具二次开发:接入LangChain实现链式文本处理流水线
1. 为什么需要把MT5改写工具“串起来”?
你有没有遇到过这样的场景:
刚用MT5工具生成了5条语义一致但表达各异的句子,想立刻把这些结果喂给另一个模型做情感分析;
或者,你希望用户输入一句话后,系统自动完成“改写→提取关键词→生成摘要→翻译成英文”这一整套动作,而不是在多个界面间来回切换;
又或者,你想让改写结果自动过滤掉带敏感词的变体,再把合规的版本存进数据库——但原版Streamlit应用只负责“生成”,其他一概不管。
这就是单点工具和链式流水线的本质区别:
前者是“手电筒”,照亮一个点;后者是“自动化产线”,让文本在多个AI模块间有序流动、层层加工。
而LangChain,正是为这种需求而生的——它不替代MT5,也不取代Streamlit,而是像一条柔性传送带,把原本孤立的NLP能力(改写、分类、抽取、检索……)按需组装、顺序调度、状态传递。
本文不讲抽象概念,只带你实操:如何在现有MT5 Streamlit应用基础上,零侵入式接入LangChain,把“单次改写”升级为“可编排、可扩展、可复用”的文本处理流水线。整个过程无需重写模型推理逻辑,不改动原有UI结构,所有新增代码控制在200行以内。
2. 理解当前MT5 Streamlit应用的“可插拔接口”
在动手改造前,先看清它的骨架。原项目核心结构非常清晰:
app.py # Streamlit主入口 ├── model_loader.py # 加载mT5模型和tokenizer(本地路径加载) ├── paraphrase.py # 核心改写函数:paraphrase_text(text, num_beams=5, temperature=0.8) └── requirements.txt关键发现:paraphrase.py中的paraphrase_text()函数就是我们的“锚点”。它接收原始文本和参数,返回一个字符串列表(如["这家餐厅口味很棒,服务也十分贴心。", "味道极佳,服务员态度热情周到。"]),完全符合LangChain中Runnable接口的输入/输出契约——即:输入str,输出List[str]。
这意味着我们不需要碰模型加载、不修改Streamlit渲染逻辑、甚至不用动app.py里st.button那一行。只需把paraphrase_text包装成LangChain组件,再在其后挂接新能力即可。
小贴士:LangChain的
Runnable不是魔法,它本质就是一个带.invoke()方法的Python对象。你写的任何函数,只要签名明确、输入输出稳定,都能被RunnableLambda轻松包裹。
3. 构建第一条链:改写 → 关键词提取 → 摘要生成
我们以“用户输入一句话,系统返回改写结果+每条结果的关键词+一句话摘要”为例,搭建首条实用流水线。
3.1 安装必要依赖
在原项目目录下执行:
pip install langchain-community langchain-openai jieba注意:这里选用开源轻量方案。
langchain-community提供中文分词与基础工具,jieba用于关键词提取,全程离线运行,不依赖OpenAI API(如需调用大模型摘要,后续再扩展)。
3.2 编写链式处理模块(chain_pipeline.py)
# chain_pipeline.py from langchain_core.runnables import RunnableParallel, RunnablePassthrough from langchain_core.output_parsers import StrOutputParser import jieba from jieba import analyse from typing import List, Dict, Any # 1. 包装原MT5改写函数为Runnable from paraphrase import paraphrase_text from langchain_core.runnables import RunnableLambda mt5_paraphraser = RunnableLambda( lambda x: paraphrase_text(x["input_text"], num_beams=x.get("num_beams", 5), temperature=x.get("temperature", 0.8)) ) # 2. 关键词提取Runnable(基于TF-IDF) def extract_keywords(text: str) -> List[str]: # 使用jieba的TF-IDF提取前3个关键词 keywords = analyse.extract_tags(text, topK=3, withWeight=False) return keywords or ["未提取到关键词"] keyword_extractor = RunnableLambda(lambda texts: [extract_keywords(t) for t in texts]) # 3. 摘要生成Runnable(规则+统计法) def simple_summary(text: str) -> str: # 简单策略:取最长句 + 最短句 + 首尾各10字拼接(真实场景可替换为tiny-llm) sentences = [s.strip() for s in text.split("。") if s.strip()] if not sentences: return text[:30] + "..." longest = max(sentences, key=len) shortest = min(sentences, key=len) return f"{longest[:15]}...{shortest}...{text[:10]}{text[-10:]}" summary_generator = RunnableLambda(lambda texts: [simple_summary(t) for t in texts]) # 4. 组装完整链:输入→改写→并行提取关键词&摘要→结构化输出 full_chain = ( {"input_text": RunnablePassthrough(), "num_beams": RunnablePassthrough(), "temperature": RunnablePassthrough()} | mt5_paraphraser | { "paraphrased": RunnablePassthrough(), "keywords": keyword_extractor, "summaries": summary_generator } )这段代码做了三件关键事:
- 用
RunnableLambda把paraphrase_text变成LangChain可调度的节点; - 用
RunnableParallel让关键词提取和摘要生成并行执行(提升响应速度); - 最终输出是字典结构:
{"paraphrased": [...], "keywords": [...], "summaries": [...]},天然适配Streamlit的st.json()或自定义表格展示。
3.3 在Streamlit中调用新链(修改app.py)
找到原app.py中点击按钮后的处理逻辑(通常是if st.button(" 开始裂变/改写"):之后),将原来的paraphrase_text(...)调用,替换成:
# 替换原调用位置 from chain_pipeline import full_chain if st.button(" 开始裂变/改写"): with st.spinner("正在构建文本处理流水线..."): try: # 构造输入字典(兼容原参数) input_data = { "input_text": user_input, "num_beams": num_beams, "temperature": temperature } # 执行整条链 result = full_chain.invoke(input_data) # 展示结构化结果 st.subheader(" 流水线处理结果") for i, (para, kw, summ) in enumerate(zip( result["paraphrased"], result["keywords"], result["summaries"] ), 1): with st.expander(f"变体 {i} —— {para[:20]}...", expanded=i==1): st.markdown(f"**原文改写**:{para}") st.markdown(f"**提取关键词**:{'、'.join(kw)}") st.markdown(f"**一句话摘要**:{summ}") except Exception as e: st.error(f"处理失败:{str(e)}")刷新页面,你会发现:原来只显示5条纯文本的界面,现在每条结果都自带关键词标签和摘要卡片——没有新增API请求,没有额外GPU开销,只是把已有能力重新组织。
4. 进阶实战:添加条件路由与错误恢复机制
真实业务中,流水线不能“一杆子捅到底”。比如:
- 当改写结果中某条含违禁词时,应跳过其后续处理,直接标记为“已过滤”;
- 若关键词提取失败(如空输入),不应中断整条链,而应返回默认值;
- 用户可能想“只做改写”或“改写+关键词”,而非固定全链。
LangChain的RouterRunnable和Try/Except模式完美解决这些问题。
4.1 实现动态路由:根据用户选择启用不同分支
在chain_pipeline.py末尾追加:
from langchain_core.runnables import RouterRunnable, RunnableBranch # 定义路由逻辑:根据用户传入的mode决定走哪条子链 def route_by_mode(input_dict: Dict[str, Any]) -> str: return input_dict.get("mode", "full") # 默认全链 mode_router = RouterRunnable({ "paraphrase_only": mt5_paraphraser, "with_keywords": mt5_paraphraser | {"paraphrased": RunnablePassthrough(), "keywords": keyword_extractor}, "full": full_chain # 前面定义的完整链 }, route_by_mode) # 新增调用入口 def run_pipeline(input_text: str, mode: str = "full", **kwargs) -> Any: return mode_router.invoke({ "input_text": input_text, "mode": mode, **kwargs })然后在app.py的UI中增加单选框:
mode_option = st.radio( "选择处理模式", ["paraphrase_only", "with_keywords", "full"], captions=["仅生成改写结果", "改写+关键词提取", "改写+关键词+摘要"] ) # 调用时传入mode result = run_pipeline(user_input, mode=mode_option, num_beams=num_beams, temperature=temperature)4.2 添加容错:当某环节失败时降级处理
修改keyword_extractor,加入异常捕获:
def safe_keyword_extractor(texts: List[str]) -> List[List[str]]: results = [] for text in texts: try: results.append(extract_keywords(text)) except Exception: results.append(["关键词提取失败"]) return results keyword_extractor = RunnableLambda(safe_keyword_extractor)这样即使jieba因特殊字符报错,整条链仍能返回其他字段,不会导致前端白屏。
5. 可持续扩展:如何添加新能力而不重构?
流水线的价值在于“可生长”。未来你想加入新能力(如:情感倾向判断、实体识别、自动纠错),只需遵循三个原则:
- 输入输出对齐:新函数必须接收
str或List[str],返回同类型数据; - 无状态设计:避免在函数内维护全局变量或缓存(LangChain会并发调用);
- 独立测试:每个新组件单独写单元测试,确保输入
"今天天气真好"能稳定输出预期结果。
例如,添加“情感分析”组件:
# sentiment_analyzer.py from textblob import TextBlob # 示例库,实际可用SnowNLP等中文库 def analyze_sentiment(text: str) -> str: # 简化版:正向/中性/负向 blob = TextBlob(text) polarity = blob.sentiment.polarity # -1~1 if polarity > 0.1: return "正向" elif polarity < -0.1: return "负向" else: return "中性" sentiment_analyzer = RunnableLambda(lambda texts: [analyze_sentiment(t) for t in texts])然后在full_chain中插入:
full_chain = ( ... | { "paraphrased": RunnablePassthrough(), "keywords": keyword_extractor, "summaries": summary_generator, "sentiment": sentiment_analyzer # ← 一行新增 } )无需修改任何已有代码,不重启服务,新能力立即生效。
6. 总结:从工具到平台的关键一跃
回顾整个过程,我们没有:
重写mT5模型加载逻辑;
修改Streamlit UI渲染方式;
引入复杂中间件或消息队列;
要求用户学习LangChain语法。
我们只做了三件事:
把原有函数包装成Runnable——赋予其“可编排”身份;
用|和{}符号声明数据流向——用声明式语法替代命令式胶水代码;
通过RunnableBranch和异常处理构建韧性——让流水线像真实产线一样可靠。
这正是LangChain在轻量级AI应用中的核心价值:它不追求替代模型,而是成为模型能力的“操作系统”。当你下次接到需求——“把改写结果同步发到企业微信”、“根据用户等级动态调整温度参数”、“把高频改写组合存为模板”——你不再需要从头写接口、搭后台、连数据库,而是在chain_pipeline.py里加几行Runnable,再在app.py里接个回调函数。
工具止于功能,平台始于连接。而连接,从来不该是高墙深垒。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。