DDD-027:事件溯源(Event Sourcing)
本章导读
事件溯源(Event Sourcing)是一种革命性的数据持久化范式,它不再存储对象的当前状态,而是存储导致当前状态的所有事件。每一个业务操作都以事件的形式被记录,通过回放这些事件可以重建任意时刻的系统状态。本章将深入探讨事件溯源的核心原理、实现方式以及与 CQRS 的结合应用。
学习目标
- 理解事件溯源的核心思想及其与传统 CRUD 的本质区别
- 掌握事件溯源的关键要素:事件存储、聚合重建、快照机制
- 学会在 Java 项目中实现事件溯源模式
前置知识
- DDD 聚合与领域事件基础
- CQRS 架构模式
- 数据库事务与并发控制基础
阅读时长
约 60-75 分钟
【原理】事件溯源核心思想与设计原理
一、事件溯源的本质与定义
1.1 什么是事件溯源
【原理】
Event Sourcing(事件溯源)由 Martin Fowler 提出,是一种将系统状态变更记录为不可变事件序列的架构模式。
核心思想:不存储对象的当前状态,而是存储导致该状态的所有事件。通过回放事件流,可以重建任意时刻的状态。
传统 CRUD 模式: ┌─────────────────────────────────────────────────────────┐ │ 状态存储 │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ orders 表 │ │ │ ├─────────────────────────────────────────────────┤ │ │ │ id | customer_id | status | total | updated_at │ │ │ │-----|-------------|--------|-------|------------│ │ │ │ 001 | CUST-001 | PAID | 1000 | 2025-01-15 │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ 问题:历史状态丢失,只保留最终状态 │ │ 无法回答:订单经历过哪些状态?谁修改了?为什么修改? │ └─────────────────────────────────────────────────────────┘ 事件溯源模式: ┌─────────────────────────────────────────────────────────┐ │ 事件存储 │ │ │ │ ┌─────────────────────────────────────────────────┐ │ │ │ events 表 │ │ │ ├─────────────────────────────────────────────────┤ │ │ │ aggregate_id | version | event_type | data │ │ │ │--------------|---------|---------------|---------│ │ │ │ ORDER-001 | 1 | OrderCreated | {...} │ │ │ │ ORDER-001 | 2 | ItemAdded | {...} │ │ │ │ ORDER-001 | 3 | OrderPaid | {...} │ │ │ │ ORDER-001 | 4 | OrderShipped | {...} │ │ │ └─────────────────────────────────────────────────┘ │ │ │ │ 优势:完整保留历史,可重建任意时刻状态 │ │ 可以回答:订单完整生命周期、每次变更的原因和时间 │ └─────────────────────────────────────────────────────────┘核心原则:
| 原则 | 说明 |
|---|---|
| 事件不可变 | 已发生的事件不能修改或删除 |
| 事件有序 | 事件按发生顺序组成事件流 |
| 状态即事件流 | 当前状态 = 初始状态 + 所有事件的应用结果 |
| 追加写入 | 新事件只能追加,不能覆盖 |
【历史架构问题】
问题 1:状态丢失,无法追溯历史
// ❌ 传统 CRUD:只存储最终状态,历史信息丢失@Entity@Table(name="orders")publicclassOrderEntity{@IdprivateStringid;privateStringstatus;// 只有当前状态:CREATED/PAID/SHIPPEDprivateBigDecimaltotal;// 只有当前金额// 问题:// 1. 订单什么时候创建的?被谁创建的?// 2. 订单状态何时变更为 PAID?// 3. 订单金额是否被修改过?// 4. 为什么订单从 CREATED 变成 PAID 又变回 CREATED?// 这些问题都无法从当前状态中得知}问题 2:审计日志分散且不完整
// ❌ 传统审计方案:日志分散,难以关联// 方案 1:数据库字段记录@EntitypublicclassOrderEntity{privateStringcreatedBy;privateLocalDateTimecreatedAt;privateStringupdatedBy;privateLocalDateTimeupdatedAt;// 问题:只能记录最后一次修改,之前的修改历史丢失}// 方案 2:独立的审计日志表@EntitypublicclassAuditLogEntity{privateStringtableName;privateStringrecordId;privateStringoperation;// INSERT/UPDATE/DELETEprivateStringoldValue;// JSONprivateStringnewValue;// JSONprivateStringoperator;privateLocalDateTimeoperatedAt;}// 问题:// 1. 审计逻辑与业务逻辑分离,容易遗漏// 2. oldValue/newValue 记录的是完整对象,难以看出具体哪个字段变化// 3. 业务含义不清晰,只有技术层面的 CRUD 记录// 4. 查询和重建历史状态非常复杂问题 3:复杂的业务规则难以表达
// ❌ 传统状态管理:业务规则分散在各个 Service 中@ServicepublicclassOrderService{@AutowiredprivateOrderRepositoryorderRepository;publicvoidpayOrder(StringorderId,Paymentpayment){Orderorder=orderRepository.findById(orderId);// 业务规则分散:什么条件下可以支付?if(order.getStatus()!=OrderStatus.CREATED){thrownewBusinessException("只有待支付订单才能支付");}// 状态变更逻辑分散:支付后应该做什么?order.setStatus(OrderStatus.PAID);order.setPaymentId(payment.getId());order.setPaidAt(LocalDateTime.now());// 问题:// 1. 业务规则分散在各个方法中,难以统一管理// 2. 状态变更的"原因"没有被记录// 3. 复杂的状态机逻辑难以维护}}问题 4:时间旅行查询困难
// ❌ 传统方案:无法查询历史状态// 需求:查询订单在某时刻的状态publicOrdergetOrderAtTime(StringorderId,LocalDateTimeatTime){// 传统方案无法实现,因为没有存储历史状态// 只能返回当前状态thrownewUnsupportedOperationException("不支持历史状态查询");}// 需求:统计某段时间内状态变更情况publicList<StateChange>getStateChanges(StringorderId,LocalDateTimefrom,LocalDateTimeto){// 需要额外的审计表,且数据可能不完整// ...}【DDD 如何解决】
事件溯源的核心思路:
┌──────────────────────────────────────────────────────────────────┐ │ 事件溯源架构 │ │ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 命令处理 │ │ │ │ │ │ │ │ Command ──▶ CommandHandler ──▶ Aggregate ──▶ Event │ │ │ │ │ │ │ │ 1. 加载聚合(从事件流重建) │ │ │ │ 2. 执行业务方法 │ │ │ │ 3. 生成事件 │ │ │ │ 4. 保存事件 │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 事件存储 │ │ │ │ │ │ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ │ │ Event Store │ │ │ │ │ ├──────────────────────────────────────────────────────┤ │ │ │ │ │ aggregate_id | version | event_type | event_data │ │ │ │ │ │ ORDER-001 | 1 | OrderCreated | {...} │ │ │ │ │ │ ORDER-001 | 2 | ItemAdded | {...} │ │ │ │ │ │ ORDER-001 | 3 | OrderPaid | {...} │ │ │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ │ │ │ │ │ 特点: │ │ │ │ - 只追加,不修改 │ │ │ │ - 事件不可变 │ │ │ │ - 有序存储 │ │ │ └────────────────────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ 事件发布 │ │ │ │ │ │ │ │ Event ──▶ EventPublisher ──▶ Subscribers │ │ │ │ │ │ │ │ 1. 更新读模型(CQRS) │ │ │ │ 2. 触发下游业务 │ │ │ │ 3. 发送通知 │ │ │ └────────────────────────────────────────────────────────────┘ │ └──────────────────────────────────────────────────────────────────┘关键设计:
| 设计点 | 传统 CRUD | 事件溯源 |
|---|---|---|
| 存储内容 | 当前状态 | 状态变更事件 |
| 写入方式 | 覆盖更新 | 追加写入 |
| 数据完整性 | 只保留最终状态 | 完整历史 |
| 审计能力 | 需要额外实现 | 天然支持 |
| 历史查询 | 困难或不可能 | 轻松支持 |
| 并发控制 | 乐观锁/悲观锁 | 乐观锁(版本号) |
二、事件溯源的基本要素
2.1 领域事件
【原理】
事件溯源中的事件是已发生事实的记录,具有以下特征:
| 特征 | 说明 |
|---|---|
| 不可变 | 事件代表已发生的事实,不能被修改或删除 |
| 过去时态 | 命名使用过去时:OrderCreated、PaymentProcessed |
| 自描述 | 事件包含完整的信息,不需要额外查询 |
| 有序性 | 每个事件有版本号,保证顺序 |
事件结构: ┌─────────────────────────────────────────────────────┐ │ DomainEvent │ ├─────────────────────────────────────────────────────┤ │ + eventId: String // 事件唯一标识 │ │ + aggregateId: String // 聚合根 ID │ │ + aggregateType: String // 聚合类型 │ │ + version: int // 版本号(乐观锁) │ │ + eventType: String // 事件类型 │ │ + eventData: JSON // 事件数据 │ │ + occurredOn: DateTime // 发生时间 │ │ + metadata: Map // 元数据(操作人等) │ └─────────────────────────────────────────────────────┘【代码示例】
// ✅ 事件溯源中的领域事件设计// 事件基类publicabstractclassDomainEvent{privatefinalStringeventId;privatefinalStringaggregateId;privatefinalStringaggregateType;privatefinalintversion;privatefinalInstantoccurredOn;privatefinalMap<String,Object>metadata;protectedDomainEvent(StringaggregateId,StringaggregateType,intversion){this.eventId=UUID.randomUUID().toString();this.aggregateId=aggregateId;this.aggregateType=aggregateType;this.version=version;this.occurredOn=Instant.now();this.metadata=newHashMap<>();}// Getters...publicabstractStringgetEventType();publicDomainEventwithMetadata(Stringkey,Objectvalue){this.metadata.put(key,value);returnthis;}}// 订单创建事件publicclassOrderCreatedEventextendsDomainEvent{privatefinalStringcustomerId;privatefinalList<OrderItemData>items;privatefinalBigDecimaltotalAmount;privatefinalStringshippingAddress;publicOrderCreatedEvent(StringorderId,StringcustomerId,List<OrderItemData>items,BigDecimaltotalAmount,StringshippingAddress){super(orderId,"Order",1);this.customerId=customerId;this.items=items;this.totalAmount=totalAmount;this.shippingAddress=shippingAddress;}@OverridepublicStringgetEventType(){return"OrderCreated";}// 自描述:包含完整信息,不需要额外查询publicrecordOrderItemData(StringproductId,StringproductName,intquantity,BigDecimalunitPrice){}}// 订单支付事件publicclassOrderPaidEventextendsDomainEvent{privatefinalStringpaymentId;privatefinalBigDecimalpaidAmount;privatefinalStringpaymentMethod;publicOrderPaidEvent(StringorderId,StringpaymentId,BigDecimalpaidAmount,StringpaymentMethod){super(orderId,"Order",0);// 版本在聚合中设置this.paymentId=paymentId;this.paidAmount=paidAmount;this.paymentMethod=paymentMethod;}@OverridepublicStringgetEventType(){return"OrderPaid";}}// 订单取消事件publicclassOrderCancelledEventextendsDomainEvent{privatefinalStringreason;privatefinalStringcancelledBy;publicOrderCancelledEvent(StringorderId,Stringreason,StringcancelledBy){super(orderId,"Order",0);this.reason=reason;this.cancelledBy=cancelledBy;}@OverridepublicStringgetEventType(){return"OrderCancelled";}}2.2 事件存储
【原理】
事件存储是事件溯源的核心组件,负责持久化事件流。
事件存储表设计: ┌─────────────────────────────────────────────────────────────┐ │ events 表 │ ├─────────────────────────────────────────────────────────────┤ │ 列名 │ 类型 │ 说明 │ │───────────────────│───────────────│────────────────────────│ │ event_id │ VARCHAR(36) │ 事件唯一 ID(主键) │ │ aggregate_id │ VARCHAR(36) │ 聚合根 ID │ │ aggregate_type │ VARCHAR(50) │ 聚合类型(Order等) │ │ version │ INT │ 版本号(乐观锁) │ │ event_type │ VARCHAR(100) │ 事件类型 │ │ event_data │ JSON │ 事件数据 │ │ occurred_on │ TIMESTAMP │ 事件发生时间 │ │ metadata │ JSON │ 元数据 │ ├─────────────────────────────────────────────────────────────┤ │ 索引: │ │ - PRIMARY KEY (event_id) │ │ - UNIQUE KEY (aggregate_id, version) -- 防止版本冲突 │ │ - INDEX (aggregate_type, aggregate_id) │ │ - INDEX (occurred_on) │ └─────────────────────────────────────────────────────────────┘ 关键约束: - (aggregate_id, version) 必须唯一:保证事件顺序 - version 必须连续:保证事件流完整性 - 事件只能追加:不能 UPDATE 或 DELETE【代码示例】
// ✅ 事件存储实现// 事件存储接口publicinterfaceEventStore{// 保存事件(带乐观锁)voidsaveEvents(StringaggregateId,List<DomainEvent>events,intexpectedVersion);// 加载事件流List<DomainEvent>loadEvents(StringaggregateId);// 加载事件流(从指定版本开始)List<DomainEvent>loadEvents(StringaggregateId,