Ohhnews

分类导航

$ cd ..
Spring Blog原文

Spring Boot 4.1 新特性:MongoDB 驱动的 Spring Batch 任务及其他

#spring boot#spring batch#mongodb#批处理#etl

Spring Batch 在 MongoDB 诞生前许多年就已问世,其设计默认依赖 SQL 数据库来存储 Spring Batch 作业的状态。

但那是几十年前的事了,每个刚接触 Spring Batch 的人都会问:“为什么这东西需要一个 SQL 数据库?”答案自然是:Spring Batch 会在 JobRepository 中详尽记录每一个作业、步骤和执行的元数据,而多年来,这个仓库只会说一种方言:SQL。如果你愉快地沉浸在 MongoDB 的世界里,还得拖着 Postgres 或 MySQL 实例,只为让 Batch 记下它上周二做了什么。

在最近的 Spring Batch 版本中,其 JobRepository 与 JDBC 解耦了,Spring Boot 4.1 更是通过一个完善的 spring-boot-starter-batch-data-mongodb 自动配置为这种体验画上了点睛之笔。你可以获得与 JDBC 用户从一开始就享受到的零配置 Boot 体验,来管理你的批处理元数据。

有趣的事实:Spring Boot 的联合创始人 Dave Syer 博士曾是 Spring Batch 的创始人和长期负责人。自然,他为 Spring Boot 编写的第一个自动配置就是针对 Spring Batch 的!所以当我说 Spring Boot 用户从最初就开始享受 JDBC 支持的 Spring Batch 时,我确实是字面意思。

本文通过一个短小但完整的示例进行讲解。它将:

  • 把 Spring Batch 的 JobRepository 存储在 MongoDB 中(通过新的 4.1 启动器)。
  • 从类路径读取 customers.csv
  • 将行写入 PostgreSQLcustomers 表。
  • 所有操作均针对项目根目录中 compose.yaml 启动的服务运行。

启动基础设施

在接触任何 Java 代码之前,先启动两个后端服务:

$ bash
docker compose up

compose.yaml 启动一个配置为单节点副本集的 MongoDB 实例(Batch 的 MongoDB 支持需要事务,而事务需要副本集)、一个用于目标表的 PostgreSQL 实例,以及一个 Grafana LGTM 容器(如果你稍后需要可观测性的话)。

我们要定义的作业是一个简单的 ETL(提取、转换、加载)作业,它从 customers.csv 文件中读取数据,并写入 PostgreSQL 数据库中的 customers 表。

我们需要初始化 Postgres 的 customers 表;这项工作由 src/main/resources/schema.sql 负责:

$ query
create table if not exists customers (
    id    serial primary key,
    name  varchar(255),
    email varchar(255)
);

Spring Boot 的 SQL 初始化(spring.sql.init.mode=always)会在启动时运行它。MongoDB 侧也是类似的自助方式——spring.batch.data.mongodb.schema.initialize=true 告诉新的启动器创建 JobRepository 所需的集合。

application.properties 中的相关配置:

$ properties
spring.mongodb.host=localhost
spring.mongodb.port=27017
spring.mongodb.database=mydatabase

spring.batch.data.mongodb.schema.initialize=true

spring.datasource.url=jdbc:postgresql://localhost/mydatabase
spring.datasource.username=myuser
spring.datasource.password=secret

我们同时有 JDBC 和 MongoDB 连接。由于我们希望使用 MongoDB 支持的 JobRepository 而不是 JDBC 版本,我们需要让 Spring Boot 的自动配置退让:

$ java
@SpringBootApplication(exclude = {BatchJdbcAutoConfiguration.class})
public class BatchApplication { ... }

这一行排除代码就是让仓库从 JDBC 切换到 MongoDB 的关键。其他所有东西——Mongo 客户端、集合、事务管理器——都来自新的启动器。

作业

我们的作业名为 etl,它按顺序执行两个步骤:

$ java
@Bean
Job job(@Qualifier(STEP_RESET) Step stepReset,
        @Qualifier(STEP_FILES_TO_DB) Step stepFilesToDb) {
    return new JobBuilder("etl", this.repository)
            .start(stepReset)
            .next(stepFilesToDb)
            .incrementer(new RunIdIncrementer())
            .build();
}

首先是 reset(一个清空目标表的 tasklet),然后是 files-to-db(实际移动数据的 reader → processor → writer)。RunIdIncrementer 是一个虽小但重要的细节:它会在每次启动时自增一个 run.id 参数,使得 Spring Batch 将每次调用视为新的作业实例,而不是拒绝重新运行一个“已完成”的作业。

第一步:一个 tasklet

Spring Batch 中最简单的一类步骤是 Tasklet——一个没有读取或写入项概念的工作单元。当你只需要在步骤之间某件事时,它就是正确的工具。这里它用来清理数据:

$ java
@Bean(STEP_RESET)
Step cleanTableStep(JdbcClient db, JobRepository repository) {
    return new StepBuilder("reset", repository)
            .tasklet((contribution, chunkContext) -> {
                db.sql("delete from customers").update();
                return RepeatStatus.FINISHED;
            })
            .build();
}

tasklet 运行一次,返回 RepeatStatus.FINISHED,然后我们继续。

第二步:reader、processor、writer

有意思的步骤是分块步骤。Spring Batch 的核心模式是:读取一个项,处理它,累积一个块,写入这个块。reader 从 customers.csv 中提取行:

$ java
@Bean
FlatFileItemReader<Customer> customerFlatFileItemReader(
        @Value("classpath:/customers.csv") Resource csv) {
    return new FlatFileItemReaderBuilder<Customer>()
            .name("customer-reader")
            .resource(csv)
            .delimited(c -> c.delimiter(",").names("id", "name", "email"))
            .fieldSetMapper(fs -> new Customer(
                    fs.readInt("id"),
                    fs.readString("name"),
                    fs.readString("email")))
            .build();
}

CSV 内容如下:

$ csv
id,name,email
1,josh,josh@joshlong.com
2,dashaun,dashaun@dashaun.com
3,james,james@jamesward.dev

你懂的。

writer 将每个块推入 Postgres,使用 ON CONFLICT DO NOTHING 使得重跑时不会因为主键冲突而爆掉:

$ java
@Bean
JdbcBatchItemWriter<Customer> customerJdbcBatchItemWriter(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Customer>()
            .dataSource(dataSource)
            .sql("INSERT INTO customers(id, name, email) VALUES (:id, :name, :email) on conflict do nothing")
            .itemSqlParameterSourceProvider(item -> new MapSqlParameterSource(
                    Map.of("id", item.id(), "name", item.name(), "email", item.email())))
            .build();
}

而步骤本身通过一个简单的透传 processor(以后可以方便地添加转换、丰富或过滤逻辑)将它们绑定在一起,块大小设为 10:

$ java
@Bean(STEP_FILES_TO_DB)
Step step(FlatFileItemReader<Customer> reader,
          JdbcBatchItemWriter<Customer> writer) {
    return new StepBuilder("files-to-db", this.repository)
            .<Customer, Customer>chunk(10)
            .reader(reader)
            .processor(customer -> {
                IO.println("processing " + customer);
                return customer;
            })
            .writer(writer)
            .faultTolerant()
            .retryLimit(10)
            .retry(IllegalArgumentException.class)
            .build();
}

注意 faultTolerant() 开关和重试策略——Spring Batch 会安静地重试抛出 IllegalArgumentException 的项最多十次,然后再让整个块失败。这只是一行代码的事,因为框架在替你进行所有的记账工作,而这份记账现在正好存储在 MongoDB 中。

运行它

启动 Docker Compose:

$ bash
./mvnw spring-boot:run

作业启动,reset 清空 Postgres,files-to-db 将 CSV 通过块管道流式写入 Postgres,每一个步骤转换、项计数、退出状态和执行时间戳都会写入 MongoDB。打开 mongosh,你会看到熟悉的 Batch 集合——BATCH_JOB_INSTANCEBATCH_JOB_EXECUTIONBATCH_STEP_EXECUTION——但它们现在是文档,而不是表。

可观测性

Spring Batch 作业会触发很多有趣的 Spring ApplicationEvent!我监听其中一个 JobExecutionEvent,它在作业完成(无论成功与否)时发布。

$ java
    @EventListener
    void after(JobExecutionEvent event) {
        IO.println("Job execution #" + event.getJobExecution() + " finished");
    }

当我在 Spring Initializr 中初始化程序时,我确保添加了 OpenTelemetry Spring Boot 启动器。Spring Boot 和 Micrometer 早就支持 OpenTelemetry,但以前总需要一些调整。现在,如果你将 OpenTelemetry 启动器放在类路径上,它会自动将指标发布到任何 OpenTelemetry 端点(默认假设是端口 3000)。如果你在 Spring Initializr 上选择了 Docker Compose 支持,它还会提供一个 Grafana 配置,该配置也会在端口 3000 上监听 OpenTelemetry 信息!

所以,运行应用程序,然后访问 localhost:3000,点击 Drilldown,再点击 Metrics,然后在搜索字段中搜索 spring_batch

或者,你也可以访问 http://localhost:8080/actuator/metrics 查看同样的指标。但我喜欢鲜艳多彩的界面,所以 Grafana 页面对我来说更棒。

奖励:使用 GraalVM 构建原生镜像

GraalVM 原生镜像技术有潜力降低总体内存使用量。Spring Batch 已经基本能与 GraalVM 原生镜像配合工作,但有一些我需要额外处理的新类。以及一些新的 schema 文件。

$ java
    static class Hints implements RuntimeHintsRegistrar {

        @Override
        public void registerHints(@NonNull RuntimeHints hints, @Nullable ClassLoader classLoader) {
            for (var c : new Class[]{
                    org.springframework.batch.core.repository.persistence.JobInstance.class,
                    org.springframework.batch.core.repository.persistence.ExecutionContext.class,
                    org.springframework.batch.core.repository.persistence.ExitStatus.class,
                    org.springframework.batch.core.repository.persistence.StepExecution.class,
                    org.springframework.batch.core.repository.persistence.JobExecution.class,
                    org.springframework.batch.core.repository.persistence.JobParameter.class,
            }) {
                hints.reflection().registerType(c, MemberCategory.values());
            }

            var prefix = "org/springframework/batch/core/";
            for (var r : new String[]{
                    "schema-mongodb", //
                    "schema-drop-mongodb"}) {
                for (var suffix : "jsonl,js".split(",")) {
                    var path = prefix + r + "." + suffix;
                    var resource = new ClassPathResource(path);
                    if (resource.exists()) {
                        hints.resources().registerResource(resource);
                    }
                }
            }
        }
    }

我们还需要告诉 GraalVM 关于 customers.csv 的信息。

$ java

    static class ResourceHints implements RuntimeHintsRegistrar {

        @Override
        public void registerHints(RuntimeHints hints, @Nullable ClassLoader classLoader) {
            hints.resources().registerResource(new ClassPathResource("/customers.csv"));
        }
    }

以通常的方式在 BatchConfiguration 类(或任何带有 @Configuration 的类)上添加这两者:

$ java
@ImportRuntimeHints({BatchConfiguration.ResourceHints.class, BatchConfiguration.Hints.class})

完成后,你就可以像往常一样构建 GraalVM 原生镜像了。我已经将步骤写在仓库根目录的 native.sh 脚本中:

$ shell
#!/usr/bin/env bash
ls -la target && rm -rf target
./mvnw -DskipTests -Pnative native:compile
./target/batch

运行应用程序:./target/batch,并观察它启动迅速,占用的 RAM 远小于在 JVM 上运行时。在我的机器上——一台搭载 Apple M5 芯片的 macOS 设备——它在大约十分之一秒内启动,使用约 150 MB 的 RAM。长时间运行的批处理作业通常并不需要快速启动,但节省 RAM 是件好事,而启动时间的减少也不赖!

懒加载数据源连接

另一个优化(虽然对于这种特定工作负载来说,整体上变化不大,但它非常棒)是:在 Spring Boot 4.1 中,我们现在支持懒加载连接获取。记住,默认情况下 Spring Boot 会初始化 DataSource,并在每次启动事务时创建一个连接,即使不能保证你会使用该连接。你可以通过新的 Spring Boot 配置属性 spring.datasource.connection-fetch=lazy 来避免这种开销。

获取源码

和往常一样,本示例的完整代码可以在这里找到。

为什么这很重要

Spring Batch 与关系型数据库在历史上的耦合始终是一种务实的妥协,而不是设计理想。框架需要某个持久化位置来记住它的所作所为,而 SQL 是阻力最小的路径。现在 JobRepository 抽象已经正确解耦——加上 Spring Boot 4.1 为 MongoDB 提供的一流自动配置——运行在文档存储上的团队不再需要仅仅为了迎合批处理层而保留一个 JDBC 数据库。

选择适合你数据的数据库。Spring Batch 会围绕你的选择自我适配。