背景痛点:规则引擎的“三板斧”失灵了
第一次做客服系统时,我把所有 FAQ 写成正则,上线当天就被用户“教做人”:
- 冷启动难:新领域没数据,规则写一条漏十条
- 泛化能力差:“我要退货”和“想退掉昨天买的衣服”被当成两句话
- 维护噩梦:运营每周加 30 个新问法,开发在 Git 里玩“找不同”
当并发量一上来,正则引擎直接把 CPU 跑成“电热器”。痛定思痛,决定用 NLP 重做。
技术选型:Rasa、Dialogflow 还是自研?
| 维度 | Rasa | Dialogflow | BERT+FastAPI+Redis(自研) | |----| 评价 | 评价 | 评价 | | 中文效果 | 需要大量语料调 CRF,意图 F1≈0.85 | 中文支持一般,且强制云端 | 微调 BERT 轻松 0.93+ | | 定制深度 | 源码开放,但 DSL 学习曲线陡 | 黑盒,只能插件式 webhook | 全链路可控,可插拔 | | 并发成本 | Python 多进程吃机器 | 按调用计费,高并发肉疼 | 容器水平扩容,2000 TPS 成本 < 5 台 4C8G | | 数据隐私 | 本地部署 | 必须上云 | 本地 GPU/CPU 皆可 |
结论:团队有算法工程化能力,选“自研”最划算。
核心实现:三步搭出能跑的 MVP
1. 微调 BERT 做多意图分类
数据格式:一行一条 JSON,字段包括text与label(支持多标签,用|分隔)。
训练脚本(PyTorch 1.13,含类型注解):
# train_intent.py from datasets import load_dataset from transformers import BertTokenizerFast, BertForSequenceClassification import torch, torch.nn as nn from torch.utils.data import DataLoader from typing import List, Dict MODEL_NAME = "bert-base-chinese" MAX_LEN = 128 BATCH = 32 EPOCHS = 3 LR = 2e-5 tokenizer = BertTokenizerFast.from_pretrained(MODEL_NAME) def collate(batch: List[Dict]) -> Dict[str, torch.Tensor]: texts = [b["text"] for b in batch] ] labels = [list(map(int, b["label"].split("|"))) for b in batch] encoded = tokenizer(texts, padding=True, truncation=True, max_length=MAX_LEN, return_tensors="pt") # 多标签 0/1 向量 num_labels = 37 # 按自己业务改 target = torch.zeros(len(batch), num_labels) for idx, ls in enumerate(labels): for l in ls: target[idx][l] = 1. return {**encoded, "labels": target} train_ds = load_dataset("json", data_files="train.json")["train"] loader = DataLoader(train_ds.shuffle(), batch_size=BATCH, collate_fn=collate) model = BertForSequenceClassification.from_pretrained( MODEL_NAME, num_labels=37, problem_type="multi_label_classification" ) model.cuda() opt = torch.optim.AdamW(model.parameters(), lr=LR) for epoch in range(EPOCHS): for batch in loader: batch = {k: v.cuda() for k, v in batch.items()} out = model(**batch) loss = out.loss loss.backward() opt.step(); opt.zero_grad() torch.save(model.state_dict(), f"epoch{epoch}.pt")训练 30 min(T4 GPU)即可把线上意图准确率从 0.78 提到 0.93。
2. 用 RedisGraph 做对话状态机
传统 if/else 写状态容易爆炸,这里把“状态=图节点,事件=边”搬到 RedisGraph,查询一条 Cypher 就能决定下一步。
状态图示例(部分):
关键代码(Python,redisgraph-py):
# state_machine.py import redis from redisgraph import Graph, Node, Edge r = redis.Redis(host="rds", port=6379, decode_responses=True) g = Graph(g, r) def transit(session_id: str, intent: str) -> str: # 查当前状态节点 rs = g.query( f"MATCH (s:State)-[:CURRENT]->(p:Session {{id:'{session_id}'}}) RETURN s.name" ).result_set if not rs: # 新会话,默认 Welcome g.query( "CREATE (p:Session {id:$sid}), (s:State {name:'Welcome'}) " "CREATE (s)-[:CURRENT]->(p)", {"sid": session_id} ) current = "Welcome" else: current = rs[0][0] # 找边 es = g.query( f"MATCH (s:State {{name:'{current}'}})-[:ON {{intent:'{intent}'}}]->(next:State) " "RETURN next.name" ).result_set if es: nxt = es[0][0] g.query( f"MATCH (s:State)-[c:CURRENT]->(p:Session {{id:'{session_id}'}}) DELETE c " f"CREATE (s2:State {{name:'{nxt}'}})-[:CURRENT]->(p)" ) return nxt return current # 未命中边,保持原状态好处:状态逻辑可视化,运营可直接改图,无需发版。
3. 异步消息队列扛高并发
用户请求先进 Redis Stream,再由 SpringBoot 消费,返回走 WebSocket,整体解耦。
SpringBoot 关键片段(Kotlin,与 Java 同理):
@Service class StreamConsumer( private val redis: ReactiveRedisTemplate<String, String> ) { @Scheduled(fixedDelay = 100) fun consume() { redis.opsForStream().read( Consumer.from("grp", "c1"), StreamOffset.latest("incoming") ) .flatMap { msg -> // 调用 Python 意图服务 webClient.post() .uri("http://intent-svc:8000/predict") .bodyValue(msg.value["text"]!!) .retrieve() .bodyToMono<IntentResp>() .map { it to msg } } .flatMap { (intentResp, msg) -> // 回写 Websocket wsTemplate.convertAndSend( "/queue/${msg.value["uid"]}", mapOf("reply" to buildReply(intentResp)) ) redis.opsForStream().acknowledge("incoming", "grp", msg.id) } .subscribe() } }压测 2000 TPS 时,Stream+异步保证 99 百分位延迟 < 200 ms。
生产考量:上线前必须补的功课
1. 压力测试
JMeter 配置要点:
- 线程组:2000 并发,Ramp-up 60 s,循环 300 次
- HTTP 请求默认值:连接超时 5 s,响应超时 10 s
- 后端监听器:Grafana+Prometheus,盯紧 GPU Util、Redis 连接数、Pod CPU
瓶颈常出现在 RedisGraph 查询,开启SLOWLOG get 10可秒级定位。
2. 敏感词过滤与数据脱敏
- 敏感词:把 3 万条词库编译成 DFA,一次性加载到 Redis BloomFilter,O(1) 拦截
- 脱敏:用基于正则的“命名实体识别”,手机、身份证、银行卡号统一替换成
****,日志落盘前先过一遍desensitize()
避坑指南:血泪踩出来的细节
对话超时重试的幂等性
每条消息带 UUID,下游做INSERT … ON CONFLICT DO NOTHING,保证重复请求不重复建单。模型热更新导致内存泄漏
旧经验:Torch 模型直接load_state_dict会残留计算图。
解决:- 先
deepcopy(model).cpu()到共享内存 - 再让 gunicorn
USR2平滑重启 Worker,老进程处理完手头请求后退出,显存瞬间降回基线。
- 先
日志一定打 Request-ID
分布式排障时,把 UID+TraceID 注入 MDC,SpringBoot + Python 统一格式,方便 Zipkin 串联。
代码规范小结
- Python 侧全程 PEP8,Black 自动格式化,类型注解覆盖率 100 %
- 所有函数写
Raises:注释,异常向上抛,最外层由 FastAPIexception_handler统一转 JSON - Java/Kotlin 侧遵循 Sonar A 级规则,Stream 消费逻辑必须加
@Retryable做兜底
开放问题
当用户先在小程序咨询,又跳去 App 时,如何设计“跨渠道会话粘性保持”?
欢迎交流,也推荐一篇参考论文:《Cross-domain Dialogue State Tracking via Multi-level Matching》
目前我们打算把匿名 UID 与手机号映射存成 Redis 二级索引,Web 与小程序共用同一套 WebSocket Gateway,理论上能续上上下文,但落地还在灰度。如果你有更好的思路,留言一起头脑风暴。