Ohhnews

分类导航

$ cd ..
DZone Java原文

用事件溯源和CQRS模式重新思考Java CRUD

#事件溯源#cqrs#java#crud#软件架构

传统的增删改查(CRUD)系统只存储实体的当前状态。当记录更新时,旧值会被覆盖并永久丢失。事件溯源(Event Sourcing)颠覆了这一模型:系统不持久化状态,而是持久化导致每次状态转换的事件序列。当前状态永远不会被直接存储,而是通过重放事件历史来推导得出。命令查询职责分离(CQRS)将写入模型与读取模型分离。命令表达更改状态的意图,例如 PlaceOrderAddItemShipOrder。查询则读取状态而不修改它。两侧使用独立的模型、独立的逻辑,在完整实现中,还使用独立的存储。CQRS 与事件溯源相辅相成:事件流是写入侧的真实来源,而一个或多个投影(读取模型)从这些事件中派生,用于快速查询。

本文旨在展示如何实际应用这些概念,并以 Markus Eisele 的文章(2025年12月27日发布于 Substack)的修改版作为说明示例。Markus 在文章中展示了一个基于 Quarkus 的简化订单管理系统项目。在此,我介绍该系统的 Spring Boot 实现,作为变体。你可以在这里找到它。

术语

经典订单管理系统中,通过分析关联的数据模型,我们可以收集大量关于订单及其在组织中流转的信息。但虽然我们能够了解任何订单的当前状态,数据和数据模型分析却无法让我们重建每个订单如何到达当前状态的完整故事。

事件溯源

事件溯源模式将时间维度引入数据模型。基于事件溯源的系统不存储反映订单当前状态的模式,而是持久化记录订单生命周期中每次变更的事件。然后,通过查询这些事件,我们可以重建给定订单(或任何其他通用聚合)从初始创建到当前状态的完整历史。

CQRS

唯一的问题是,一次查询单个聚合实例的事件历史,无法让我们检索和整合数据模型中其他聚合的相关数据。因此,CQRS 模式与事件溯源模式紧密相关,旨在提供将投影模型物化为逻辑数据结构的能力,这些结构足够可靠以支持灵活的查询选项。

命令

CQRS 将命令专门用于执行修改系统状态的操作。因此,基于命令的执行模型是唯一能够实现业务逻辑、验证规则并强制执行不变量的模型。

投影

系统可以根据需要定义任意数量的模型,以向用户或其他系统提供数据。因此,读取模型是一个快速、反规范化且预缓存的投影,包含应用程序回答查询所需的只读数据。系统将命令执行模型的变更投射到所有读取模型中。投影的概念类似于关系数据库中的物化视图,即每当源表更新时,变更必须反映在所有读取模型视图中。

模型分离

在 CQRS 架构中,系统模型的职责根据其类型进行分离。命令只能操作自己的执行模型,而查询不能直接修改系统的任何持久化状态。

一个用例

这里展示的用例是一个真正的 CQRS 实现(而不仅仅是命名约定),因为:

  1. 写入路径从不读取读取模型。CommandHandler 仅通过 EventProjection.replayEvents() 重放事件存储中的事件来重建状态。它从不接触 OrderReadModelOrderRepository
  2. 读取路径从不接触事件存储。OrderResource.getOrderReadModel() 直接从反规范化的 ORDERS 表中读取。这是纯粹的查询,没有任何业务逻辑。
  3. 存在两个物理上独立的存储表:EVENT_STORE(写入侧)和 ORDERS(读取侧)。
  4. 读取模型是投影,而不是视图。OrderProjection 监听领域事件并增量重建读取模型。可以丢弃 ORDERS 表并通过重放事件存储从头重建。
  5. 命令返回 CommandResult,这是一个密封类型,传达成功或失败而不泄漏状态。调用者如果需要当前状态,必须单独查询读取模型。

现在让我们看看项目的关键实现细节:

使用记录建模状态

OrderState 是一个 Java 记录,本质上是不可变的。没有设置器,没有突变。每个命令都会产生一个的状态对象:

$ java
public record OrderState(
    UUID orderId,
    String customerEmail,
    List<OrderLine> items,
    OrderStatus status,
    BigDecimal total
) {
    public static OrderState initial(UUID orderId, String email) { ... }
    public static OrderState empty() { ... }
}

OrderLine 同样是一个带有派生字段的记录:

$ java
public record OrderLine(String productName, int quantity, BigDecimal price) {
    public BigDecimal lineTotal() {
        return price.multiply(BigDecimal.valueOf(quantity));
    }
}

lineTotal() 是一个派生的记录组件:它是计算出来的,而不是存储的,演示了记录可以同时携带行为。

事件作为密封类型层次结构

OrderEvent 是一个密封接口,将所有允许的实现限制在一个已知的封闭集合中:

$ java
public sealed interface OrderEvent permits 
    OrderEvent.OrderPlaced, OrderEvent.ItemAdded,
    OrderEvent.ItemRemoved, OrderEvent.OrderCancelled, 
    OrderEvent.OrderShipped {

    UUID orderId();
    OrderState applyTo(OrderState current);

    record OrderPlaced(UUID orderId, String customerEmail) implements OrderEvent {
        public OrderState applyTo(OrderState s) {
            return OrderState.initial(orderId, customerEmail);
        }
    }
    // ... 其他事件类型
}

使用密封接口意味着编译器在 switch 表达式中强制执行穷举性。添加新事件类型而不处理它是一个编译错误,而不是运行时意外。每个事件只携带所需的数据,并知道如何通过 applyTo(OrderState) 应用到当前状态。这是自描述事件模式。

折叠(事件重放)

折叠,也称为左规约,是通过事件流中的事件列表重建状态的过程:

$ java
// EventProjection.java
public OrderState replayEvents(List<OrderEvent> events) {
    return events.stream()
        .reduce(OrderState.empty(), this::apply, (a, b) -> b);
}

private OrderState apply(OrderState state, OrderEvent event) {
    return event.applyTo(state);
}

OrderState.empty() 是恒等元素或种子。每个事件都是一个步骤函数,将一个不可变状态转换为下一个。这是纯函数式编程:无副作用,无共享可变状态,完全确定且可独立测试。

命令作为密封记录

命令是密封记录,分组在一个容器接口中:

$ java
public sealed interface Command permits 
    Command.PlaceOrderCommand, Command.AddItemCommand,
    Command.ShipOrderCommand, Command.CancelOrderCommand {

    record PlaceOrderCommand(String customerEmail) implements Command {}
    record AddItemCommand(UUID orderId, String productName, int quantity, BigDecimal price) implements Command {}
    // ...
}

密封记录赋予命令值语义(按内容相等,免费获得 toString)和类型安全(处理器中的穷举模式匹配)。

命令结果作为密封类型

CommandResult 是一个密封接口,表达了所有可能的结果,无需异常:

$ java
public sealed interface CommandResult permits 
    CommandResult.Success, CommandResult.InvalidState,
    CommandResult.NotFound, CommandResult.ValidationError {

    record Success(UUID aggregateId) implements CommandResult {}
    record InvalidState(String message) implements CommandResult {}
    record NotFound(String message) implements CommandResult {}
    record ValidationError(String message) implements CommandResult {}
}

调用者可以穷举地 switch 结果。没有受检异常,没有可空返回,类型系统记录了所有可能的失败模式。

事件存储

EventStore 是写入侧基础设施。它原子性地做两件事:

  1. 将事件持久化到 EVENT_STORE(通过 EventRepository 的 JPA)。
  2. 将事件发布到 Spring 应用程序事件总线。
$ java
public void append(UUID aggregateId, String aggregateType, OrderEvent event) {
    int version = nextVersion(aggregateId);
    String json = objectMapper.writeValueAsString(event);
    StoredEvent entity = new StoredEvent(aggregateId, aggregateType, version, eventType, json);
    eventRepository.save(entity);
    applicationEventPublisher.publishEvent(event);
}

版本控制提供了一种轻量级的乐观并发保护,通过唯一值 aggregateId + version 防止并发写入损坏流。

读取侧投影

OrderProjection 是一个 Spring 组件,监听领域事件并更新读取模型:

$ java
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void on(OrderEvent event) {
    OrderReadModel model = orderRepository.findByOrderId(event.orderId())
        .orElse(new OrderReadModel());
    // 根据事件更新字段 ...
    orderRepository.save(model);
}

@TransactionalEventListener(phase = AFTER_COMMIT) 确保仅在事件存储事务成功提交后才更新读取模型,从而防止写入侧事务回滚时出现幻影更新。

运行应用程序

前提条件:Java 21、Maven、Docker(测试中用于 PostgreSQL 的 TestContainers)。

$ bash
# 构建并运行所有测试(需要 Docker)
./mvnw clean package

# 运行应用程序(需要运行中的 PostgreSQL 实例)
./mvnw spring-boot:run

# 跳过测试
./mvnw clean package -DskipTests

API 参考

方法路径描述
POST/orders下新订单
POST/orders/{id}/items为订单添加商品
POST/orders/{id}/ship发货
POST/orders/{id}/cancel取消订单
GET/orders/{id}从事件重建当前状态
GET/orders/{id}/events获取完整事件流
GET/orders/{id}/read-model获取反规范化的读取模型

下单:

$ cat
POST /orders
{
  "customerEmail": "alice@example.com"
}

添加商品:

$ cat
POST /orders/{id}/items
{
  "productName": "Widget",
  "quantity": 3,
  "price": 9.99
}

发货:

$ cat
POST /orders/{id}/ship
{
  "trackingNumber": "TRACK-001"
}

DZone 贡献者表达的观点是他们自己的。