1. 为什么需要Redis Stream?
Redis Stream是Redis 5.0引入的一种新的数据结构,它专门为消息队列场景设计。相比传统的List、Pub/Sub等方案,Stream提供了更强大的功能:
- 消息持久化:不像Pub/Sub那样消息发送后就消失
- 消费组支持:多个消费者可以组成消费组,实现消息的负载均衡
- 消息回溯:可以重新消费历史消息
- ACK机制:确保消息被正确处理
我在实际项目中遇到过这样的场景:需要处理大量设备上报的状态数据,既要保证数据不丢失,又要能并行处理提高吞吐量。传统的RabbitMQ方案部署维护成本较高,而Redis Stream完美解决了这个问题。
2. 环境准备与基础配置
2.1 引入必要依赖
首先在pom.xml中添加Spring Data Redis和连接池依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency>2.2 配置Redis连接
在application.yml中配置Redis连接信息:
spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 15000 lettuce: pool: max-idle: 50 min-idle: 10 max-active: 300 max-wait: -12.3 配置RedisTemplate
创建一个配置类来定制RedisTemplate:
@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); // Key序列化 template.setKeySerializer(new StringRedisSerializer()); template.setHashKeySerializer(new StringRedisSerializer()); // Value序列化 template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); return template; } }这里我推荐使用Jackson序列化而不是JdkSerializationRedisSerializer,因为后者会产生不可读的二进制数据,调试起来很不方便。
3. 消息生产与消费基础
3.1 发送简单消息
我们先从最简单的字符串消息开始:
@Autowired private RedisTemplate<String, Object> redisTemplate; public void sendSimpleMessage() { Map<String, String> message = new HashMap<>(); message.put("content", "Hello Redis Stream"); StringRecord record = StreamRecords.string(message) .withStreamKey("mystream") .withId(RecordId.autoGenerate()); RecordId recordId = redisTemplate.opsForStream().add(record); log.info("消息发送成功,ID: {}", recordId.getValue()); }3.2 发送复杂对象
实际项目中我们经常需要发送自定义对象:
@Data public class OrderEvent { private String orderId; private BigDecimal amount; private LocalDateTime createTime; } public void sendObjectMessage() { OrderEvent event = new OrderEvent(); event.setOrderId(UUID.randomUUID().toString()); event.setAmount(new BigDecimal("99.99")); event.setCreateTime(LocalDateTime.now()); ObjectRecord<String, OrderEvent> record = StreamRecords.newRecord() .in("order_stream") .ofObject(event) .withId(RecordId.autoGenerate()); redisTemplate.opsForStream().add(record); }3.3 简单消费消息
对于不需要消费组的简单场景,可以直接读取消息:
public List<MapRecord<String, String, String>> readMessages() { return redisTemplate.opsForStream() .read(StreamOffset.fromStart("mystream")); }4. 消费组实战
4.1 创建消费组
消费组是Redis Stream的核心特性,我们先创建消费组:
public void createConsumerGroup(String streamKey, String groupName) { try { redisTemplate.opsForStream().createGroup(streamKey, groupName); } catch (RedisSystemException e) { if (e.getCause() instanceof RedisBusyException) { log.info("消费组已存在: {}", groupName); } else { throw e; } } }这里有个坑要注意:如果消费组已存在会抛出RedisBusyException,需要捕获处理。
4.2 消费组消息处理
配置消费组监听容器:
@Configuration public class StreamConfig { @Autowired private RedisConnectionFactory redisConnectionFactory; @Bean public StreamMessageListenerContainer<String, MapRecord<String, String, String>> container() { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .batchSize(10) .serializer(new StringRedisSerializer()) .build(); return StreamMessageListenerContainer.create(redisConnectionFactory, options); } @Bean public Subscription subscription(StreamMessageListenerContainer<String, MapRecord<String, String, String>> container) { return container.receive( Consumer.from("mygroup", "consumer1"), StreamOffset.create("mystream", ReadOffset.lastConsumed()), new MessageListener()); } }实现消息监听器:
public class MessageListener implements StreamListener<String, MapRecord<String, String, String>> { @Autowired private RedisTemplate<String, Object> redisTemplate; @Override public void onMessage(MapRecord<String, String, String> message) { try { // 业务处理 processMessage(message); // 确认消息 redisTemplate.opsForStream().acknowledge("mygroup", message); } catch (Exception e) { log.error("消息处理失败: {}", message, e); } } }4.3 处理未ACK的消息
在实际运行中,可能会因为各种原因导致消息未被确认。我们需要定期检查并处理这些消息:
@Scheduled(fixedRate = 60000) public void handlePendingMessages() { PendingMessages pending = redisTemplate.opsForStream() .pending("mystream", "mygroup", Range.unbounded(), 100); pending.forEach(message -> { // 重新处理消息 MapRecord<String, String, String> record = redisTemplate.opsForStream() .range("mystream", Range.closed(message.getIdAsString(), message.getIdAsString())) .get(0); // 再次尝试处理 try { processMessage(record); redisTemplate.opsForStream().acknowledge("mygroup", record); } catch (Exception e) { log.error("重试处理失败: {}", record, e); } }); }5. 生产环境注意事项
5.1 性能优化建议
- 批量操作:尽量使用批量读取和确认
- 合理设置pollTimeout:根据业务特点调整
- 连接池配置:根据并发量调整max-active等参数
- 序列化选择:评估性能和可读性的平衡
5.2 常见问题排查
- 消息堆积:检查消费者处理速度,考虑增加消费者
- 重复消费:确保业务逻辑幂等
- 连接泄漏:检查是否正确关闭连接
- 序列化错误:确保生产者和消费者使用相同的序列化方式
5.3 监控指标
建议监控以下关键指标:
- 消息生产速率
- 消息消费速率
- 未ACK消息数量
- 消费组延迟
6. 完整示例项目结构
一个典型的项目结构如下:
src/main/java ├── config │ ├── RedisConfig.java │ └── StreamConfig.java ├── controller │ └── MessageController.java ├── listener │ └── OrderMessageListener.java ├── model │ └── OrderEvent.java └── service ├── MessageProducer.java └── StreamMonitor.java在实际项目中,我通常会这样组织代码:
- 将Redis配置放在config包
- 消息监听器按业务领域划分
- 生产者和监控服务单独封装
- 使用DTO对象传递消息
7. 高级特性探索
7.1 消息截断
Redis Stream支持消息截断,可以控制流的大小:
public void trimStream(String streamKey, long maxLength) { redisTemplate.opsForStream().trim(streamKey, maxLength); }7.2 消费者组信息查询
可以查询消费组和消费者的状态:
public void printGroupInfo(String streamKey) { StreamInfo.XInfoGroups groups = redisTemplate.opsForStream().groups(streamKey); groups.forEach(group -> { log.info("Group: {}, Consumers: {}, Pending: {}", group.groupName(), group.consumerCount(), group.pendingCount()); }); }7.3 消费者负载均衡
多个消费者可以自动实现负载均衡:
@Bean public Subscription subscription2(StreamMessageListenerContainer<String, MapRecord<String, String, String>> container) { return container.receive( Consumer.from("mygroup", "consumer2"), StreamOffset.create("mystream", ReadOffset.lastConsumed()), new MessageListener()); }8. 与Kafka的对比
在消息中间件选型时,Redis Stream经常被拿来与Kafka比较:
- 部署复杂度:Redis更简单
- 功能丰富度:Kafka更专业
- 性能:Redis在低延迟场景表现更好
- 持久化:Kafka的持久化能力更强
根据我的经验,对于中小型项目,特别是已经使用Redis的项目,Redis Stream是一个很好的轻量级选择。但对于需要严格保证消息顺序、持久化和高吞吐的场景,还是应该考虑Kafka。