视频看了几百小时还迷糊?关注我,几分钟让你秒懂!
在使用 RabbitMQ 实现延迟任务时,很多人用TTL + 死信队列的方案。但这种方法有个致命缺陷:
同一个队列只能设置统一的 TTL!
想要 5 秒、30 秒、1 小时三种延迟?得建三个队列!
这时候,RabbitMQ 官方延时插件(rabbitmq-delayed-message-exchange)就派上大用场了!
本文将手把手教你:
- 如何安装延时插件;
- 如何在 Spring Boot 中使用;
- 对比传统方案的优势;
- 附完整代码 + 注意事项 + 反例避坑。
一、为什么需要延时插件?
🎯 真实场景:订单多级超时
- 用户下单后:
- 5 分钟未支付→ 发提醒;
- 30 分钟未支付→ 自动取消;
- 24 小时未评价→ 发催评消息。
如果用 TTL + DLQ 方案,你需要:
order.delay.5m.queueorder.delay.30m.queueorder.delay.24h.queue
维护成本高,扩展性差!
而延时插件支持:每条消息独立设置延迟时间,一个交换机搞定所有!
二、延时插件 vs TTL+DLQ 对比
| 特性 | TTL + 死信队列 | 延时插件 |
|---|---|---|
| 延迟精度 | 队列级(所有消息相同) | 消息级(每条可不同) |
| 资源消耗 | 多队列,多绑定 | 单交换机,单队列 |
| 实现复杂度 | 高(需管理多个队列) | 低(直接发消息) |
| 消息顺序 | 可能乱序(TTL 不同) | 严格按延迟时间投递 |
| 适用场景 | 固定延迟(如统一 30 分钟) | 动态延迟(如用户自定义) |
✅结论:只要需要动态延迟时间,就用延时插件!
三、延时插件安装(Linux / Docker / Windows)
✅ 步骤 1:下载插件
插件地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择与你 RabbitMQ 版本匹配的.ez文件。
例如 RabbitMQ 3.11.x → 下载rabbitmq_delayed_message_exchange-3.11.1.ez
✅ 步骤 2:放入插件目录
Linux / macOS
# 查找插件目录 rabbitmq-plugins directories # 通常路径为 /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/ # 复制插件 sudo cp rabbitmq_delayed_message_exchange-*.ez /usr/lib/rabbitmq/lib/rabbitmq_server-*/plugins/Docker(推荐方式)
# Dockerfile FROM rabbitmq:3.11-management # 复制插件 COPY rabbitmq_delayed_message_exchange-3.11.1.ez /opt/rabbitmq/plugins/ # 启用插件 RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange或运行时挂载:
docker run -d \ --name rabbitmq \ -v $(pwd)/rabbitmq_delayed_message_exchange-3.11.1.ez:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.11.1.ez \ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq delayed_message_exchange true" \ rabbitmq:3.11-management # 进入容器启用插件 docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_delayed-message-exchangeWindows
- 插件目录通常为:
C:\Program Files\RabbitMQ Server\rabbitmq_server-<version>\plugins - 复制
.ez文件进去 - 管理员权限运行 CMD:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
✅ 步骤 3:重启 RabbitMQ
# Linux sudo systemctl restart rabbitmq-server # Docker docker restart rabbitmq✅ 步骤 4:验证安装成功
- 访问 RabbitMQ 管理界面(http://localhost:15672)
- 创建 Exchange 时,类型下拉框应出现:
x-delayed-message - 或命令行查看:
rabbitmq-plugins list | grep delayed # 应显示 [E*] rabbitmq_delayed_message_exchange
⚠️ 注意:插件名中是
_(下划线),但启用时用-(连字符)!
四、Spring Boot 实战:动态延迟任务
✅ 第一步:添加依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>✅ 第二步:声明延时交换机和队列
@Configuration public class DelayPluginConfig { public static final String DELAY_EXCHANGE = "delay.exchange"; public static final String DELAY_QUEUE = "delay.queue"; // 声明延时交换机(关键!) @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); // 底层使用 direct 类型 return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args); } @Bean public Queue delayQueue() { return QueueBuilder.durable(DELAY_QUEUE).build(); } @Bean public Binding delayBinding() { return BindingBuilder.bind(delayQueue()) .to(delayExchange()) .with("delay.key") .noargs(); } }🔥 关键点:
- 交换机类型必须是
"x-delayed-message";- 必须指定
x-delayed-type(如direct、topic)。
✅ 第三步:生产者发送延迟消息
@Service public class DelayMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMessage(String message, int delaySeconds) { MessageProperties props = new MessageProperties(); props.setHeader("x-delay", delaySeconds * 1000); // 👈 单位:毫秒! Message msg = new Message(message.getBytes(), props); rabbitTemplate.send(DelayPluginConfig.DELAY_EXCHANGE, "delay.key", msg); } }⚠️注意:
- 延迟时间通过Header
x-delay设置;- 单位是毫秒(不是秒!);
- 必须使用
rabbitTemplate.send(),不能用convertAndSend()(会丢失 header)。
✅ 第四步:消费者处理消息
@Component public class DelayMessageConsumer { @RabbitListener(queues = DelayPluginConfig.DELAY_QUEUE) public void handle(String message) { System.out.println("Received delayed message: " + message + " at " + new Date()); // 执行业务逻辑,如取消订单、发提醒等 } }✅ 测试:发送不同延迟的消息
@RestController public class TestController { @Autowired private DelayMessageProducer producer; @GetMapping("/test") public String test() { producer.sendDelayMessage("5秒后执行", 5); producer.sendDelayMessage("30秒后执行", 30); producer.sendDelayMessage("2分钟后执行", 120); return "Messages sent!"; } }✅效果:
三条消息分别在 5s、30s、120s 后被消费,互不影响,精准投递!
❌ 反例:这些用法会导致失败!
反例 1:用convertAndSend发送
// ❌ 错误!header 会被忽略 rabbitTemplate.convertAndSend(exchange, key, message, m -> { m.getMessageProperties().setHeader("x-delay", 5000); return m; });原因:convertAndSend内部会重新构造 Message,可能丢失自定义 header。
✅ 正确做法:使用send()+ 手动构造 Message。
反例 2:忘记声明交换机类型
// ❌ 错误!类型写成 "direct" @Bean public Exchange delayExchange() { return ExchangeBuilder.directExchange("delay.exchange").build(); // 不行! }后果:消息直接被丢弃,无任何报错!
✅ 正确做法:必须用CustomExchange+"x-delayed-message"。
⚠️ 关键注意事项
延迟时间上限
插件默认最大延迟~1 年(约 2^32 毫秒),超过会报错。消息持久化
- 交换机、队列必须
durable=true; - 消息发送时需设置
deliveryMode=2(持久化);
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);- 交换机、队列必须
性能影响
延时消息会存储在 Mnesia 数据库(内存+磁盘),大量延迟消息可能影响性能。建议:- 监控
rabbitmqctl eval 'rabbit_memory_monitor:memory_breakdown().' - 避免百万级延迟消息堆积。
- 监控
不支持集群同步(旧版本)
RabbitMQ < 3.8 时,插件在集群中可能不一致。
✅ 解决方案:升级到 3.8+,或确保所有节点安装相同插件。替代方案考虑
如果延迟时间固定,TTL+DLQ 更轻量;
如果需要复杂调度(如 cron),考虑Quartz + DB。
五、总结
| 场景 | 推荐方案 |
|---|---|
| 固定延迟(如统一 30 分钟) | TTL + 死信队列 |
| 动态延迟(每条消息不同) | 延时插件 ✅ |
| 高精度定时任务(秒级) | Redis ZSet / 时间轮 |
| 分布式复杂调度 | XXL-JOB / ElasticJob |
延时插件的核心价值:
用最简单的方式,解决“每条消息延迟时间不同”的痛点!
只要记住三点:
- 安装插件并启用;
- 交换机类型为
x-delayed-message; - 通过 Header
x-delay设置毫秒数。
你就能轻松实现精准延迟任务!
视频看了几百小时还迷糊?关注我,几分钟让你秒懂!