使用 ElasticJob 实现分布式任务调度
[LOADING...]
1. 简介
在本教程中,我们将深入了解 ElasticJob,它是 Apache ShardingSphere 项目的一部分。我们将探讨它是什么、如何使用它以及它能为我们实现哪些功能。
2. 什么是 ElasticJob?
ElasticJob 是一个分片式的分布式作业调度系统。它让我们能够专注于编写作业逻辑本身,而由 ElasticJob 处理所有其他细节。
ElasticJob 还支持多种作业类型,以满足我们的不同需求:
- Java 作业:作为应用程序中的类存在。
- 脚本作业:允许我们在主机上运行脚本。
- HTTP 作业:向远程端点发起 HTTP 调用。
它将处理调度作业并将其分布到应用程序各节点所需的一切事务。ElasticJob 还能自动处理诸如某个分片故障时的故障转移(failover)以及处理错过触发(misfired)的作业等细节。
在运行作业时,我们定义若干分片来拆分工作负载。ElasticJob 会自动将这些分片分布到集群中所有可用的主机上,以确保负载均衡。如果集群中添加或移除了主机,分片会自动重新分配,以保持负载在所有主机间的平衡。
3. 依赖项
在使用 ElasticJob 之前,我们需要在构建文件中包含最新版本,撰写本文时为 3.0.5。
如果使用 Maven,可以在 pom.xml 文件中添加以下依赖:
此外,运行时还需要一个 ZooKeeper 实例来管理分片之间的协调。
准备就绪后,我们就可以在应用程序中使用它了。
4. 设置 ElasticJob
配置好 ElasticJob 依赖后,我们就可以开始使用了。
首先,我们需要确保有一个可用的 ZooKeeper 安装。 目前,我们可以使用 Docker 来实现:
然后,我们需要一个配置为指向 ZooKeeper 实例的 CoordinatorRegistryCenter 实例:
至此,ElasticJob 已设置完毕,随时可以使用。
5. 编写作业
ElasticJob 准备就绪后,我们需要编写实际的作业来使用它。
5.1. 作业实现
所有的作业都编写为 ElasticJob 子类的实现。 在本例中,我们将继承 SimpleJob:
这提供了一个 execute() 方法,我们可以在其中实现作业逻辑。该方法由 ElasticJob 自动调用。至于作业内部做什么,完全由我们决定。
5.2. 作业配置
一旦有了作业类,就需要对其进行配置。 我们通过构建一个 JobConfiguration 实例来完成:
newBuilder() 方法接收作业名称(不必与类名匹配)和作业运行的分片数量。然后,我们可以提供一个 Cron 表达式来描述如何调度该作业。在本例中,它被设定在每分钟的第 0 秒执行。
我们还可以使用 jobParameter() 方法配置作业参数:
此处传入的任何内容都可以在作业类内部通过 getJobParameter() 方法提取出来。
此外,我们还可以使用 shardingItemParameters() 方法提供分片参数:
在这种情况下,提供的字符串需要采用特殊格式。它是以逗号分隔的“分片ID=值”列表。因此,这里我们将值 "a" 提供给分片 0,将值 "b" 提供给分片 1,以此类推。
在作业内部,调用 getShardingParameter() 将从该结构化字符串中获取正确的值。如果未找到值,则返回 null。
5.3. 调度作业
现在我们有了作业和作业配置,就可以调度作业了。 这是通过 ScheduleJobBootstrap 类完成的:
这里我们需要提供注册中心、作业配置和作业类的实例。ElasticJob 会将作业详情记录到注册中心,并安排它在适当的调度时间执行。
一旦该方法返回,作业就可以按照预期在整个集群中运行了。
6. 作业类型
我们已经了解了如何创建和配置作业。然而,ElasticJob 在作业运行方式上提供了灵活性,以更好地满足我们的需求。
6.1. 简单作业 (Simple Jobs)
简单作业是任何实现了 SimpleJob 接口的作业。这为我们提供了一个单一的方法——void execute(ShardingContext)——我们可以在其中实现整个作业逻辑。 只要作业被触发,它就会执行 Java 代码中的任何逻辑。
提供的 ShardingContext 实例使我们能够访问某些详细信息:
- getShardingTotalCount() -- 该作业配置的总分片数。
- getShardingItem() -- 此特定分片的从 0 开始的索引。
- getJobParameter() -- 配置的作业参数(如有)。
- getShardingParameter() -- 此特定分片的分片参数(如有)。
我们可以在 Java 代码中使用这些信息来影响作业处理过程。例如,我们可以使用 getShardingItem() 的值来获知当前运行的是哪个分片,以及需要处理哪些数据。
6.2. 数据流作业 (Dataflow Jobs)
当我们需要处理项目列表时,数据流作业提供了简单作业之外的另一种选择。 它们实现了 DataflowJob<T> 接口,其中泛型参数 T 是我们要处理的项目类型。
该接口要求我们实现两个方法:一个用于获取待处理数据,另一个用于处理这些数据:
这使我们能够将数据获取与处理解耦。我们还可以配置作业以流式模式运行:
这将导致系统在 fetchData() 和 processData() 之间循环,直到 fetchData() 返回 null 或空列表。
6.3. 脚本作业 (Script Jobs)
除了运行 Java 编写的作业外,我们还可以触发外部脚本来执行所需的操作。这些可以是主机上任何可执行的脚本。
对于这类作业,我们根本不需要编写 Java 作业类。相反,我们提供标记值 "SCRIPT" 以及脚本运行所需的配置:
这将在每次作业运行时执行命令 /script.sh。我们的 ShardingContext 将以 JSON 字符串的形式作为脚本的第一个参数传入。
6.4. HTTP 作业 (HTTP Jobs)
HTTP 作业允许我们向已知服务器发起 HTTP 请求,从而触发远程系统上的功能。 同样,我们提供标记值 "HTTP" 以及关于 HTTP 调用的配置:
这将导致 ElasticJob 在每次触发作业时向该 URL 发起 HTTP POST 调用。请求将包含所提供的数据作为 HTTP 请求体,并在 HTTP 头部 ShardingContext 中提供 JSON 版本的 ShardingContext:
处理此请求的服务器可以根据这些信息执行相应的操作。
7. 总结
在本文中,我们快速浏览了 ElasticJob。它还有许多强大的功能。下次当您需要为应用程序管理定时作业时,不妨尝试一下。
与往常一样,本文中的所有示例都可以在 GitHub 上找到。