Ohhnews

分类导航

$ cd ..
Baeldung原文

使用 Apache Seata 实现分布式事务管理

#分布式事务#apache seata#微服务#spring boot#数据一致性

[LOADING...]

1. 简介

在本教程中,我们将探讨 Apache Seata。它最初源自阿里巴巴,现已成为 Apache 孵化器项目。我们将了解它是什么、如何使用它以及它能为我们实现哪些功能。

2. 为什么需要分布式事务?

为了编写稳健的应用程序,我们通常会利用数据库事务来确保数据变更的原子性。 也就是说,要么所有变更都成功执行,要么全部不执行。这有助于确保我们的数据始终处于有效状态。

当使用单个服务管理数据时,这很容易实现。当请求进入系统时,我们开启一个新的事务。所有数据变更都在此事务内进行,只有在整个请求成功时,我们才会提交。

在这里,如果记录用户账单时出现问题,订单和库存的变更将会回滚,系统保持在正确的状态。

如果我们将其转变为多个分布式服务,我们的事务也随之变得分散:

流程本质上是一样的,但由于我们将库存、订单和账单服务拆分为独立的应用程序,我们也随之将它们拆分为独立的事务。 现在,如果记录账单失败,库存和订单的变更已经提交,无法轻易撤销。

这就是分布式事务的用武之地。如果我们有一种方法能够在多个应用程序之间维护数据库事务,那么我们既能获得拆分系统带来的好处,又能享受单个事务为整个用户操作带来的保障。

3. 什么是 Apache Seata?

Apache Seata 是一个开源项目(最初来自阿里巴巴集团),旨在帮助我们管理 Java 微服务应用中的分布式事务。

使用 Seata 时,我们运行一个额外的服务作为“事务协调者”(Transaction Coordinator)。当请求进入我们的应用程序时,发起请求的服务作为“事务管理器”(Transaction Manager),会在事务协调者中启动一个新的分布式事务。其他所有服务都会参与到同一个事务中,直到该事务被提交或回滚。

在这里,我们的流程基本相同,但我们增加了事务协调者,并将所有操作包裹在一个统一的分布式事务中。 这将确保所有三个数据库要么一起提交,要么一起回滚,从而保证整个系统处于有效状态。

4. Seata Server

在使用 Seata 之前,我们需要确保运行了 Seata Server。它在我们的整个系统中充当事务协调者的角色。

最简单的方法是将其作为 Docker 容器运行。例如,我们可以将其包含在 Docker Compose 文件中:

$ config
services:
  seata-server:
    image: apache/seata-server:2.6.0

默认情况下,它监听 8091 端口,并使用容器内的本地文件系统来跟踪分布式事务。

接下来,我们准备配置应用程序以与 Seata 协同工作。

5. 使用 Spring Boot

Seata 提供了 Spring Boot Starter,我们可以用它来进行配置。 如果使用 Maven,可以在 pom.xml 中添加以下依赖:

$ xml
<dependency>
    <groupId>org.apache.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>2.6.0</version>
</dependency>

5.1. 配置 Seata

我们需要提供一个 Seata 的配置文件。 该文件需要位于类路径(classpath)下,因此我们创建 src/main/resources/seata.conf

transport {
  type = "TCP"
  server = "NIO"
  heartbeat = true
  thread-factory {
    boss-thread-prefix = "NettyBoss"
    worker-thread-prefix = "NettyServerNIOWorker"
    server-executor-thread-size = 100
    share-boss-worker = false
    client-selector-thread-size = 1
    client-selector-thread-prefix = "NettyClientSelector"
    client-worker-thread-prefix = "NettyClientWorkerThread"
  }
  shutdown {
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  vgroupMapping.my_tx_group = "default"
  default.grouplist = "seata-server:8091"
  enableDegrade = false
  disableGlobalTransaction = false
}
client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
    sagaBranchRegisterEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
    defaultGlobalTransactionTimeout = 60000
    degradeCheck = false
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
    compress {
      enable = true
      type = "zip"
      threshold = "64k"
    }
  }
  log {
    exceptionRate = 100
  }
}

其中大部分是标准配置,但请注意,我们需要在 service.default.grouplist 字段中配置 Seata Server 的主机名和端口。

我们还需要在 Spring 中添加一些配置以使其与 Seata 协同工作。 我们在 application.properties 文件中完成此操作:

$ properties
seata.enabled=true
seata.application-id=${spring.application.name}
seata.tx-service-group=my_tx_group
seata.registry.type=file
seata.registry.file.name=seata.conf
seata.config.type=file
seata.config.file.name=seata.conf
seata.service.vgroup-mapping.my_tx_group=default
seata.service.grouplist.default=seata-server:8091
seata.data-source-proxy-mode=AT
seata.enable-auto-data-source-proxy=true

这里同样包含了 seata.service.grouplist.default 属性中的 Seata Server 地址。我们还需要确保多个属性与 Seata 配置文件保持一致,并且 seata.registry.file.nameseata.config.file.name 指向我们的 seata.conf 文件。

最后,如果我们使用 AT 模式(如上配置),则需要在服务数据库中创建一个特殊的 undo_log 表:

$ query
CREATE TABLE IF NOT EXISTS undo_log (
    id            BIGSERIAL    NOT NULL,
    branch_id     BIGINT       NOT NULL,
    xid           VARCHAR(128) NOT NULL,
    context       VARCHAR(128) NOT NULL,
    rollback_info BYTEA        NOT NULL,
    log_status    INT          NOT NULL,
    log_created   TIMESTAMP(0) NOT NULL,
    log_modified  TIMESTAMP(0) NOT NULL,
    CONSTRAINT pk_undo_log PRIMARY KEY (id),
    CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);

我们在 seata.conf 文件中配置确切的表名。此时,Seata 已集成到我们的服务中。启动项目后,将看到相关的日志信息。

5.2. 全局事务

一旦 Spring 与 Seata 完成集成,我们就可以开始使用它了。我们通过 @GlobalTransactional 注解来标记一个需要跨服务分布的事务的开始:

$ java
@PostMapping("/a/{mode}")
@GlobalTransactional
public void handle() {
    // 控制器逻辑
}

我们可以在任何通常使用 @Transactional 注解的地方使用它。该事务会向 Seata 注册,并可以跨越多个服务,而不是仅限于本地。

请注意,我们仅在全局事务的入口处添加此注解。 同一事务中的后续服务不需要包含它。我们将通过其他方式管理它们,如下所述。

如果需要,我们还可以按照与标准 @Transactional 注解类似的方式为事务提供配置:

$ java
@GlobalTransactional(rollbackFor = MyException.class, timeoutMills = 10000)

这里我们指定当出现 MyException 的任何子类时事务应回滚,并设置了 10 秒的超时时间。

5.3. 事务传播

如果我们现在尝试运行,会发现事务无法正确传播。尽管服务日志显示已向 Seata 注册,但后续服务并不会参与事务。

Seata 通过在服务之间传递一个特殊的 XID 值来管理这一点。通常,它被包含在服务间调用的 HTTP 请求头 TX_XID 中。

如果使用标准 Spring,我们需要自行管理。这包括将其添加到所有外发 HTTP 调用中,并在所有传入调用中接收它。

如果我们使用 Spring RestClient,则可以编写一个 ClientHttpRequestInterceptor 实现来自动完成此操作:

$ java
public class SeataXidClientInterceptor implements ClientHttpRequestInterceptor {
    @Override
    public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
        throws IOException {
        String xid = RootContext.getXID();
        if (StringUtils.hasText(xid)) {
            request.getHeaders().add(RootContext.KEY_XID, xid);
        }
        return execution.execute(request, body);
    }
}

这会将 XID 值添加到外发 HTTP 请求中。然后,我们必须确保 RestClient 使用它:

$ java
@Bean
public RestClient restClient() {
    return RestClient.builder()
        .requestInterceptor(new SeataXidClientInterceptor())
        .build();
}

我们也可以对其他 HTTP 客户端(如 WebClientRestTemplate)执行完全相同的操作。

此时,我们所有的出站调用都会指示全局事务的 XID。但是,我们仍然需要在下游服务中消费它。我们可以通过 Servlet 过滤器来实现:

$ java
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class SeataXidFilter implements Filter {
    @Override
    public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain)
        throws IOException, ServletException {
        HttpServletRequest httpRequest = (HttpServletRequest) req;
        String xid = httpRequest.getHeader(RootContext.KEY_XID);
        boolean bound = false;
        if (StringUtils.hasText(xid) && !xid.equals(RootContext.getXID())) {
            RootContext.bind(xid);
            bound = true;
        }
        try {
            chain.doFilter(req, res);
        } finally {
            if (bound) {
                RootContext.unbind();
            }
        }
    }
}

这执行了完全相反的操作——如果传入的 HTTP 请求中存在 XID,则在继续处理请求之前将其绑定到本地服务,并确保在最后将其解绑。

至此,我们的事务现在可以跨越多个服务,并且整个操作集合将作为一个整体提交或回滚。## 6. 使用 Spring Cloud

与 Spring Boot 不同,Spring Cloud 可以自动处理部分流程。

在 Spring Cloud 环境中,我们需要在项目中使用不同的依赖。同时,需要特别注意版本匹配问题:最新的 2025.1.0.0 版本 仅适用于 Spring Boot 4,而 2025.0.0.0 版本 则要求使用 Spring Boot 3。

该依赖以 BOM 的形式提供,我们可以将其导入到 dependencyManagement 部分来管理版本,然后再引入实际的 Starter 依赖:

$ xml
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>2025.0.0.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
...
<dependencies>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
    </dependency>
</dependencies>

我们仍然需要像之前一样使用 seata.confapplication.properties 文件进行配置。不过,框架已经为我们处理了大部分事务传播工作。

Spring Cloud Starter 会自动设置服务,使得任何传入的 HTTP 请求在有需要时都能加入全局事务。这省去了我们手动配置 Servlet 过滤器的麻烦。

该 Starter 还配置了 RestTemplate Bean,以便自动将 XID 值转发给下游服务。因此,如果我们使用 RestTemplate,则无需进行额外设置。遗憾的是,它不支持 RestClientWebClient,如果使用这些组件,仍需手动进行配置。

7. 总结

在本文中,我们简要了解了 Apache Seata。我们探讨了它是什么,以及如何在应用程序中使用它。下次编写事务性服务时,不妨尝试一下?

和往常一样,本文中的所有示例代码均可在 GitHub 上找到。