Ohhnews

分类导航

$ cd ..
Baeldung原文

使用 ElasticJob 实现分布式任务调度

#elasticjob#分布式系统#任务调度#java#apache shardingsphere

[LOADING...]

1. 简介

在本教程中,我们将深入了解 ElasticJob,它是 Apache ShardingSphere 项目的一部分。我们将探讨它是什么、如何使用它以及它能为我们实现哪些功能。

2. 什么是 ElasticJob?

ElasticJob 是一个分片式的分布式作业调度系统。它让我们能够专注于编写作业逻辑本身,而由 ElasticJob 处理所有其他细节。

ElasticJob 还支持多种作业类型,以满足我们的不同需求:

  • Java 作业:作为应用程序中的类存在。
  • 脚本作业:允许我们在主机上运行脚本。
  • HTTP 作业:向远程端点发起 HTTP 调用。

它将处理调度作业并将其分布到应用程序各节点所需的一切事务。ElasticJob 还能自动处理诸如某个分片故障时的故障转移(failover)以及处理错过触发(misfired)的作业等细节。

在运行作业时,我们定义若干分片来拆分工作负载。ElasticJob 会自动将这些分片分布到集群中所有可用的主机上,以确保负载均衡。如果集群中添加或移除了主机,分片会自动重新分配,以保持负载在所有主机间的平衡。

3. 依赖项

在使用 ElasticJob 之前,我们需要在构建文件中包含最新版本,撰写本文时为 3.0.5

如果使用 Maven,可以在 pom.xml 文件中添加以下依赖:

$ xml
<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-bootstrap</artifactId>
    <version>3.0.5</version>
</dependency>

此外,运行时还需要一个 ZooKeeper 实例来管理分片之间的协调。

准备就绪后,我们就可以在应用程序中使用它了。

4. 设置 ElasticJob

配置好 ElasticJob 依赖后,我们就可以开始使用了。

首先,我们需要确保有一个可用的 ZooKeeper 安装。 目前,我们可以使用 Docker 来实现:

$ bash
$ docker run --rm -d -p 127.0.0.1:2181:2181 --name elasticjob-zookeeper zookeeper
ZooKeeper JMX enabled by default
Using config: /conf/zoo.cfg
....
2026-02-23 06:33:06,106 [myid:1] - INFO  [main:o.a.z.s.ZooKeeperServer@588] - Snapshot taken in 0 ms
2026-02-23 06:33:06,110 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::o.a.z.s.PrepRequestProcessor@138] - PrepRequestProcessor (sid:0) started, reconfigEnabled=false

然后,我们需要一个配置为指向 ZooKeeper 实例的 CoordinatorRegistryCenter 实例:

$ java
CoordinatorRegistryCenter registryCenter = 
  new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-service"));
registryCenter.init();

至此,ElasticJob 已设置完毕,随时可以使用。

5. 编写作业

ElasticJob 准备就绪后,我们需要编写实际的作业来使用它。

5.1. 作业实现

所有的作业都编写为 ElasticJob 子类的实现。 在本例中,我们将继承 SimpleJob

$ java
public class MyJob implements SimpleJob {
    @Override
    public void execute(ShardingContext context) {
        // 作业实现
    }
}

这提供了一个 execute() 方法,我们可以在其中实现作业逻辑。该方法由 ElasticJob 自动调用。至于作业内部做什么,完全由我们决定。

5.2. 作业配置

一旦有了作业类,就需要对其进行配置。 我们通过构建一个 JobConfiguration 实例来完成:

$ java
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyJob", 3)
  .cron("0 * * * * ?")
  .build();

newBuilder() 方法接收作业名称(不必与类名匹配)和作业运行的分片数量。然后,我们可以提供一个 Cron 表达式来描述如何调度该作业。在本例中,它被设定在每分钟的第 0 秒执行。

我们还可以使用 jobParameter() 方法配置作业参数:

$ java
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyJob", 3)
  .jobParameter("Hello")
  // ... 其他配置

此处传入的任何内容都可以在作业类内部通过 getJobParameter() 方法提取出来。

此外,我们还可以使用 shardingItemParameters() 方法提供分片参数:

$ java
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyJob", 3)
  .shardingItemParameters("0=a,1=b,2=c")
  // ... 其他配置

在这种情况下,提供的字符串需要采用特殊格式。它是以逗号分隔的“分片ID=值”列表。因此,这里我们将值 "a" 提供给分片 0,将值 "b" 提供给分片 1,以此类推。

在作业内部,调用 getShardingParameter() 将从该结构化字符串中获取正确的值。如果未找到值,则返回 null

5.3. 调度作业

现在我们有了作业和作业配置,就可以调度作业了。 这是通过 ScheduleJobBootstrap 类完成的:

$ java
new ScheduleJobBootstrap(registryCenter, new MyJob(), jobConfig)
  .schedule();

这里我们需要提供注册中心、作业配置和作业类的实例。ElasticJob 会将作业详情记录到注册中心,并安排它在适当的调度时间执行。

一旦该方法返回,作业就可以按照预期在整个集群中运行了。

6. 作业类型

我们已经了解了如何创建和配置作业。然而,ElasticJob 在作业运行方式上提供了灵活性,以更好地满足我们的需求。

6.1. 简单作业 (Simple Jobs)

简单作业是任何实现了 SimpleJob 接口的作业。这为我们提供了一个单一的方法——void execute(ShardingContext)——我们可以在其中实现整个作业逻辑。 只要作业被触发,它就会执行 Java 代码中的任何逻辑。

提供的 ShardingContext 实例使我们能够访问某些详细信息:

  • getShardingTotalCount() -- 该作业配置的总分片数。
  • getShardingItem() -- 此特定分片的从 0 开始的索引。
  • getJobParameter() -- 配置的作业参数(如有)。
  • getShardingParameter() -- 此特定分片的分片参数(如有)。

我们可以在 Java 代码中使用这些信息来影响作业处理过程。例如,我们可以使用 getShardingItem() 的值来获知当前运行的是哪个分片,以及需要处理哪些数据。

6.2. 数据流作业 (Dataflow Jobs)

当我们需要处理项目列表时,数据流作业提供了简单作业之外的另一种选择。 它们实现了 DataflowJob<T> 接口,其中泛型参数 T 是我们要处理的项目类型。

该接口要求我们实现两个方法:一个用于获取待处理数据,另一个用于处理这些数据:

$ java
public static class MyDataflowJob implements DataflowJob<MyItem> {
    private MyItemRepository repository;
    @Override
    public List<String> fetchData(ShardingContext shardingContext) {
        return repository.getUnprocessedItems();
    }
    @Override
    public void processData(ShardingContext shardingContext, List<MyItem> list) {
        LOG.info("Processing data {} for job {}", list, shardingContext);
    }
}

这使我们能够将数据获取与处理解耦。我们还可以配置作业以流式模式运行:

$ java
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyDataflowJob", 3)
  .setProperty("DataflowJobProperties.STREAM_PROCESS_KEY", "true")
  // ... 其他配置

这将导致系统在 fetchData()processData() 之间循环,直到 fetchData() 返回 null 或空列表。

6.3. 脚本作业 (Script Jobs)

除了运行 Java 编写的作业外,我们还可以触发外部脚本来执行所需的操作。这些可以是主机上任何可执行的脚本。

对于这类作业,我们根本不需要编写 Java 作业类。相反,我们提供标记值 "SCRIPT" 以及脚本运行所需的配置:

$ java
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyScriptJob", 3)
  .cron("0/5 * * * * ?")
  .setProperty(ScriptJobProperties.SCRIPT_KEY, "/script.sh")
  .build();
new ScheduleJobBootstrap(registryCenter, "SCRIPT", jobConfig)
  .schedule();

这将在每次作业运行时执行命令 /script.sh。我们的 ShardingContext 将以 JSON 字符串的形式作为脚本的第一个参数传入。

6.4. HTTP 作业 (HTTP Jobs)

HTTP 作业允许我们向已知服务器发起 HTTP 请求,从而触发远程系统上的功能。 同样,我们提供标记值 "HTTP" 以及关于 HTTP 调用的配置:

$ java
JobConfiguration jobConfig = JobConfiguration.newBuilder("MyHttpJob", 3)
  .cron("0/5 * * * * ?")
  .setProperty(HttpJobProperties.URI_KEY, "https://example.com/job")
  .setProperty(HttpJobProperties.METHOD_KEY, "POST")
  .setProperty(HttpJobProperties.DATA_KEY, "source=Baeldung")
  .build()
new ScheduleJobBootstrap(registryCenter, "HTTP", jobConfig)
  .schedule();

这将导致 ElasticJob 在每次触发作业时向该 URL 发起 HTTP POST 调用。请求将包含所提供的数据作为 HTTP 请求体,并在 HTTP 头部 ShardingContext 中提供 JSON 版本的 ShardingContext

$ http
POST /job HTTP/1.1
Content-Type: application/x-www-form-urlencoded
Content-Length: 15
Host: example.com
ShardingContext: {"jobName":"MyHttpJob","taskId":"MyHttpJob@-@0,1,2@-@READY@-@192.168.1.100@-@8253","shardingTotalCount":3,"jobParameter":"Hello","shardingItem":1,"shardingParameter":"b"}
source=Baeldung

处理此请求的服务器可以根据这些信息执行相应的操作。

7. 总结

在本文中,我们快速浏览了 ElasticJob。它还有许多强大的功能。下次当您需要为应用程序管理定时作业时,不妨尝试一下。

与往常一样,本文中的所有示例都可以在 GitHub 上找到。