news 2026/4/21 15:19:55

SpringBoot集成Redis Stream:从基础配置到消费组实战

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
SpringBoot集成Redis Stream:从基础配置到消费组实战

1. 为什么需要Redis Stream?

Redis Stream是Redis 5.0引入的一种新的数据结构,它专门为消息队列场景设计。相比传统的List、Pub/Sub等方案,Stream提供了更强大的功能:

  • 消息持久化:不像Pub/Sub那样消息发送后就消失
  • 消费组支持:多个消费者可以组成消费组,实现消息的负载均衡
  • 消息回溯:可以重新消费历史消息
  • ACK机制:确保消息被正确处理

我在实际项目中遇到过这样的场景:需要处理大量设备上报的状态数据,既要保证数据不丢失,又要能并行处理提高吞吐量。传统的RabbitMQ方案部署维护成本较高,而Redis Stream完美解决了这个问题。

2. 环境准备与基础配置

2.1 引入必要依赖

首先在pom.xml中添加Spring Data Redis和连接池依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>

2.2 配置Redis连接

在application.yml中配置Redis连接信息:

spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 15000 lettuce: pool: max-idle: 50 min-idle: 10 max-active: 300 max-wait: -1

2.3 配置RedisTemplate

创建一个配置类来定制RedisTemplate:

@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); // Key序列化 template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); // Value序列化 template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); return template; } }

这里我推荐使用Jackson序列化而不是JdkSerializationRedisSerializer,因为后者会产生不可读的二进制数据,调试起来很不方便。

3. 消息生产与消费基础

3.1 发送简单消息

我们先从最简单的字符串消息开始:

@Autowired private RedisTemplate<String, Object> redisTemplate; public void sendSimpleMessage() { Map<String, String> message = new HashMap<>(); message.put("content", "Hello Redis Stream"); StringRecord record = StreamRecords.string(message) .withStreamKey("mystream") .withId(RecordId.autoGenerate()); RecordId recordId = redisTemplate.opsForStream().add(record); log.info("消息发送成功,ID: {}", recordId.getValue()); }

3.2 发送复杂对象

实际项目中我们经常需要发送自定义对象:

@Data public class OrderEvent { private String orderId; private BigDecimal amount; private LocalDateTime createTime; } public void sendObjectMessage() { OrderEvent event = new OrderEvent(); event.setOrderId(UUID.randomUUID().toString()); event.setAmount(new BigDecimal("99.99")); event.setCreateTime(LocalDateTime.now()); ObjectRecord<String, OrderEvent> record = StreamRecords.newRecord() .in("order_stream") .ofObject(event) .withId(RecordId.autoGenerate()); redisTemplate.opsForStream().add(record); }

3.3 简单消费消息

对于不需要消费组的简单场景,可以直接读取消息:

public List<MapRecord<String, String, String>> readMessages() { return redisTemplate.opsForStream() .read(StreamOffset.fromStart("mystream")); }

4. 消费组实战

4.1 创建消费组

消费组是Redis Stream的核心特性,我们先创建消费组:

public void createConsumerGroup(String streamKey, String groupName) { try { redisTemplate.opsForStream().createGroup(streamKey, groupName); } catch (RedisSystemException e) { if (e.getCause() instanceof RedisBusyException) { log.info("消费组已存在: {}", groupName); } else { throw e; } } }

这里有个坑要注意:如果消费组已存在会抛出RedisBusyException,需要捕获处理。

4.2 消费组消息处理

配置消费组监听容器:

@Configuration public class StreamConfig { @Autowired private RedisConnectionFactory redisConnectionFactory; @Bean public StreamMessageListenerContainer<String, MapRecord<String, String, String>> container() { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .batchSize(10) .serializer(new StringRedisSerializer()) .build(); return StreamMessageListenerContainer.create(redisConnectionFactory, options); } @Bean public Subscription subscription(StreamMessageListenerContainer<String, MapRecord<String, String, String>> container) { return container.receive( Consumer.from("mygroup", "consumer1"), StreamOffset.create("mystream", ReadOffset.lastConsumed()), new MessageListener()); } }

实现消息监听器:

public class MessageListener implements StreamListener<String, MapRecord<String, String, String>> { @Autowired private RedisTemplate<String, Object> redisTemplate; @Override public void onMessage(MapRecord<String, String, String> message) { try { // 业务处理 processMessage(message); // 确认消息 redisTemplate.opsForStream().acknowledge("mygroup", message); } catch (Exception e) { log.error("消息处理失败: {}", message, e); } } }

4.3 处理未ACK的消息

在实际运行中,可能会因为各种原因导致消息未被确认。我们需要定期检查并处理这些消息:

@Scheduled(fixedRate = 60000) public void handlePendingMessages() { PendingMessages pending = redisTemplate.opsForStream() .pending("mystream", "mygroup", Range.unbounded(), 100); pending.forEach(message -> { // 重新处理消息 MapRecord<String, String, String> record = redisTemplate.opsForStream() .range("mystream", Range.closed(message.getIdAsString(), message.getIdAsString())) .get(0); // 再次尝试处理 try { processMessage(record); redisTemplate.opsForStream().acknowledge("mygroup", record); } catch (Exception e) { log.error("重试处理失败: {}", record, e); } }); }

5. 生产环境注意事项

5.1 性能优化建议

  • 批量操作:尽量使用批量读取和确认
  • 合理设置pollTimeout:根据业务特点调整
  • 连接池配置:根据并发量调整max-active等参数
  • 序列化选择:评估性能和可读性的平衡

5.2 常见问题排查

  1. 消息堆积:检查消费者处理速度,考虑增加消费者
  2. 重复消费:确保业务逻辑幂等
  3. 连接泄漏:检查是否正确关闭连接
  4. 序列化错误:确保生产者和消费者使用相同的序列化方式

5.3 监控指标

建议监控以下关键指标:

  • 消息生产速率
  • 消息消费速率
  • 未ACK消息数量
  • 消费组延迟

6. 完整示例项目结构

一个典型的项目结构如下:

src/main/java ├── config │ ├── RedisConfig.java │ └── StreamConfig.java ├── controller │ └── MessageController.java ├── listener │ └── OrderMessageListener.java ├── model │ └── OrderEvent.java └── service ├── MessageProducer.java └── StreamMonitor.java

在实际项目中,我通常会这样组织代码:

  • 将Redis配置放在config包
  • 消息监听器按业务领域划分
  • 生产者和监控服务单独封装
  • 使用DTO对象传递消息

7. 高级特性探索

7.1 消息截断

Redis Stream支持消息截断,可以控制流的大小:

public void trimStream(String streamKey, long maxLength) { redisTemplate.opsForStream().trim(streamKey, maxLength); }

7.2 消费者组信息查询

可以查询消费组和消费者的状态:

public void printGroupInfo(String streamKey) { StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(streamKey); groups.forEach(group -> { log.info("Group: {}, Consumers: {}, Pending: {}", group.groupName(), group.consumerCount(), group.pendingCount()); }); }

7.3 消费者负载均衡

多个消费者可以自动实现负载均衡:

@Bean public Subscription subscription2(StreamMessageListenerContainer<String, MapRecord<String, String, String>> container) { return container.receive( Consumer.from("mygroup", "consumer2"), StreamOffset.create("mystream", ReadOffset.lastConsumed()), new MessageListener()); }

8. 与Kafka的对比

在消息中间件选型时,Redis Stream经常被拿来与Kafka比较:

  • 部署复杂度:Redis更简单
  • 功能丰富度:Kafka更专业
  • 性能:Redis在低延迟场景表现更好
  • 持久化:Kafka的持久化能力更强

根据我的经验,对于中小型项目,特别是已经使用Redis的项目,Redis Stream是一个很好的轻量级选择。但对于需要严格保证消息顺序、持久化和高吞吐的场景,还是应该考虑Kafka。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/21 15:17:38

别再纠结选哪种了!立体视觉、结构光、TOF深度相机,看完这篇保姆级对比就知道你的项目该用谁

深度相机技术选型实战指南&#xff1a;立体视觉、结构光与TOF的黄金分割点 当你的机器人项目需要一双"慧眼"来感知三维世界时&#xff0c;摆在面前的技术选项往往令人眼花缭乱。市面上主流的深度感知方案——立体视觉、结构光和TOF&#xff0c;就像三种不同性格的助手…

作者头像 李华
网站建设 2026/4/21 15:16:39

如何用RyTuneX优化Windows系统性能:从基础配置到高级调优

如何用RyTuneX优化Windows系统性能&#xff1a;从基础配置到高级调优 【免费下载链接】RyTuneX RyTuneX is a cutting-edge optimizer built with the WinUI 3 framework, designed to amplify the performance of Windows devices. Crafted for both Windows 10 and 11. 项目…

作者头像 李华
网站建设 2026/4/21 15:08:35

GLM-4.1V-9B-Base企业落地:HR招聘简历中证书/成绩单图像结构化提取

GLM-4.1V-9B-Base企业落地&#xff1a;HR招聘简历中证书/成绩单图像结构化提取 1. 企业招聘场景中的痛点 在HR日常招聘工作中&#xff0c;处理大量候选人简历中的证书和成绩单是一项耗时费力的工作。传统方式需要人工逐页查看PDF或图片格式的附件&#xff0c;手动记录关键信息…

作者头像 李华
网站建设 2026/4/21 15:06:58

如何解决ComfyUI_TensorRT节点安装的安全级别限制问题:终极指南

如何解决ComfyUI_TensorRT节点安装的安全级别限制问题&#xff1a;终极指南 【免费下载链接】ComfyUI_TensorRT 项目地址: https://gitcode.com/gh_mirrors/co/ComfyUI_TensorRT 当你尝试安装ComfyUI_TensorRT节点时&#xff0c;可能会遇到一个令人困惑的错误提示&…

作者头像 李华