背景痛点:轮询式客服的“高并发噩梦”
去年双十一,我们团队的客服系统被瞬间流量打爆。用户消息像雪片一样飞来,后端却还在用“请求-响应”轮询:每来一条消息就新建线程、查库、调模型、写日志,CPU 飙到 90%,P99 延迟直奔 3 s。最惨的是上下文丢失——用户刚说“我要退货”,下一秒机器人反问“您想咨询什么?”体验瞬间翻车。
痛点总结:
- 线程模型粗暴:一个对话一条线程,1 k 并发就 1 k 线程,切换成本 O(n²)。
- 状态无持久化:Tomcat 重启后 HashMap 里的会话灰飞烟灭。
- 意图识别重复计算:同一句“退款怎么走”被反复扔进 BERT,GPU 80% 时间花在重复推理。
一句话:高并发下,无状态 + 同步 + 粗粒度锁 = 延迟爆炸 + 上下文蒸发。
技术选型:规则、深度、混合三路对比
我们做了 3 组 A/B,数据如下:
| 方案 | 意图准确率 | 单次延迟 | 热更新成本 | 结论 |
|---|---|---|---|---|
| 规则引擎(Drools) | 87% | 5 ms | 低 | 适合固定流程,难泛化 |
| 深度学习(BERT+CRF) | 94% | 120 ms | 高 | 准但慢,GPU 贵 |
| 混合模式(规则兜底+模型) | 92% | 18 ms | 中 | 准确率可接受,延迟降 85% |
最终拍板:混合模式。高频问题用规则“秒杀”,长尾丢给模型,兜底话术走默认分支,既省钱又保命。
核心实现:Flask+Redis+状态机
整体架构
- 网关层:Nginx + Lua 做 WAF 与限流。
- 接入层:Flask 跑在 Gunicorn Gevent 上,单进程 4k 协程无压力。
- 状态层:Redis 存储对话状态,TTL 15 min,支持会话恢复。
- 意图层:本地规则库 → 远程模型服务 → 默认回答三级流水线。
- 消息队列:Celery+Redis 做异步埋点,削峰填谷。
状态机设计
我们把一次客服对话抽象成 5 个状态:
- GREETING
- COLLECT_INFO
- CONFIRM
- EXECUTE
- END
状态机负责两件事:超时回收 & 断线恢复。核心代码如下(PEP8 通过 black 格式化):
# state_machine.py import json import time from enum import Enum, auto from redis import Redis from typing import Dict, Optional redis_client = Redis(host='127.0.0.1', port=6379, db=0, decode_responses=True) class State(Enum): GREETING = auto() COLLECT_INFO = auto() CONFIRM = auto() EXECUTE = auto() END = auto() class Session: def __init__(self, uid: str): self.uid = uid self.state = State.GREETING self.data: Dict = {} # 收集的槽位 self.updated_at = int(time.time()) def to_json(self) -> str: return json.dumps({ 'state': self.state.name, 'data': self.data, 'updated_at': self.updated_at }, ensure_ascii=False) @staticmethod def from_json(uid: str, raw: str) -> Optional['Session']: if not raw: return None try: d = json.loads(raw) s = Session(uid) s.state = State[d['state']] s.data = d['data'] s.updated_at = d['updated_at'] return s except (json.JSONDecodeError, KeyError): return None class StateMachine: TIMEOUT = 900 # 15 min @staticmethod def load(uid: str) -> Session: raw = redis_client.get(f"session:{uid}") if raw: return Session.from_json(uid, raw) return Session(uid) @staticmethod def save(session: Session): session.updated_at = int(time.time()) redis_client.setex(f"session:{uid}", StateMachine.TIMEOUT, session.to_json()) @staticmethod def transition(session: Session, intent: str) -> str: """根据意图推动状态机,返回回复文本""" if intent == "超时": session.state = State.END return "会话已超时,请重新咨询~" if session.state == State.GREETING: if intent == "退货": session.state = State.COLLECT_INFO session.data["intent"] = "退货" return "请问订单号是多少?" return "您好,请问想咨询什么?" if session.state == State.COLLECT_INFO: if intent == "提供订单号": session.data["order_id"] = intent session.state = State.CONFIRM return f"收到订单 {intent},确认退货吗?" return "需要订单号才能继续哦~" if session.state == State.CONFIRM: if intent == "肯定": session.state = State.EXECUTE return "已提交退货申请,预计 1-3 个工作日退款。" if intent == "否定": session.state = State.GREETING return "已取消,还有其他可以帮您的吗?" if session.state == State.EXECUTE: session.state = State.END return "感谢您的使用,再见!" return "默认兜底回复"异步处理框架
Flask 只负责“接包→塞队列→立即返回”,耗时操作全扔给 Celery Worker,代码如下:
# app.py from flask import Flask, request, jsonify from celery import Celery from state_machine import StateMachine, Session app = Flask(__name__) celery = Celery('smart_agent', broker='redis://127.0.0.1:6379/1') @celery.task def async_reply(uid: str, text: str): session = StateMachine.load(uid) intent = fast_rule_intent(text) or model_intent(text) reply = StateMachine.transition(session, intent) StateMachine.save(session) push_to_gateway(uid, reply) # 通过 WebSocket 推回前端 @app.route("/chat", methods=["POST"]) def chat(): uid = request.json["uid"] text = request.json["text"] async_reply.delay(uid, text) return jsonify({"code": 0, "msg": "received"}) def fast_rule_intent(text: str) -> str: # O(1) 哈希匹配,1 ms 内返回 rules = {"退货": "退货", "订单号": "提供订单号", "是的": "肯定"} return rules.get(text, "") def model_intent(text: str) -> str: # 远程 gRPC 调用 BERT 服务,平均 80 ms return bert_stub.predict(text).label关键点:
- Flask 接口只负责校验+发任务,平均 RT 8 ms。
- Celery Worker 可水平扩展,无状态,秒级扩容。
- Redis 既当队列又当状态存储,减少组件碎片化。
性能优化三板斧
1. 连接池管理
Redis、MySQL、BERT 服务全部走池化:
- redis-py 默认
connection_pool,最大 50 连接,阻塞 2 s 超时。 - gRPC 通道复用,全局单例程,避免重复 TLS 握手,延迟降 30%。
2. 对话树压缩算法
客服对话树高度重复,比如“退货→订单号→地址→确认”出现频率 42%。我们把完整路径做前缀树合并,用 DAG 存储,节点复用率 68%,内存节省 2.3 GB,序列化耗时从 12 ms → 3 ms。
时间复杂度:建树 O(n·L),n 为独立会话数,L 为平均深度;查询复用节点 O(1)。
3. 热点问题缓存预热
每天 9:55 统一推送“发货时间”“优惠券使用”等 Top 100 问题到 Redis,缓存 value 是拼好模板。用户提问直接命中,QPS 提升 3 倍,BERT 集群 CPU 从 70% 降到 25%。
避坑指南:生产环境 3 大深坑
歧义语句死循环
现象:用户说“我不是不满意”,规则同时命中“满意→结束”和“不满意→转人工”,状态机来回横跳。
解法:给每条规则加优先级 + 互斥标签;命中多条时取最高优先,仍冲突则降级给模型。会话状态序列化时区问题
现象:服务器 UTC,前端东八区,重启后把“updated_at”误读,直接判超时。
解法:统一存 Unix 时间戳,展示层再本地化;禁止存字符串型“2023-11-11 12:00:00”。异步日志丢失
现象:Celery 异常崩溃,FileHandler 缓存没刷,导致客诉回溯无日志。
解法:日志先写本地 tmp,再让 Filebeat 收集;关键埋点额外写 Redis List,双保险。
延伸思考:大模型时代,客服 Agent 下一步?
混合模式虽香,仍要人工标注规则。随着 ChatGLM、Qwen 等中文大模型开源,我们正试点“规则→小模型→大模型”三级漏斗:
- 规则 1 ms 挡 60% 高频;
- 6B 小模型 30 ms 挡 30% 长尾;
- 175B 大模型留给 10% 复杂语义,必要时调用,延迟 500 ms 可接受。
通过 LoRA 微调 + 强化学习,把企业私有知识灌进去,两周就能把退货政策、订单条款“背”下来,准确率再提 3%,同时减少 40% 人工标注。未来客服 Agent 的核心竞争力不再是“谁家的规则写得多”,而是“谁能用最少标注把大模型驯化成业务专家”。
把系统从“轮询”改到“事件驱动+状态机”后,我们双十一峰值 QPS 从 2 k 涨到 1.2 万,P99 延迟稳定在 280 ms,服务器数量还缩了 30%。代码已开源在内部 Git,如果你也在踩高并发客服的坑,不妨把状态机模板拿去改两行业务规则,先让机器人别再“失忆”,再慢慢上大模型,迭代会更踏实。