KafkaConsumer中subscribe()与assign()方法的区别
[LOADING...]
1. 概述
Apache Kafka 提供的客户端库允许开发者使用底层的 Java API(应用程序编程接口)以及其他编程语言来生产和消费消息。该 API 中的 KafkaConsumer 类提供了两种读取消息的方法:subscribe() 和 assign()。
在本教程中,我们将讨论 Kafka Java 客户端 API 中 subscribe() 和 assign() 方法之间的区别。我们将看到它们之间的主要区别在于自动和手动 分区 分配。我们在示例中使用的 Kafka 版本是 4.1.1。
2. 使用 subscribe() 进行自动分区分配
在本节中,我们将讨论 KafkaConsumer 类的 subscribe() 方法。
2.1. KafkaConsumer.subscribe()
KafkaConsumer 类的 subscribe() 方法用于订阅一个或多个 主题。如果消费者是 消费者组 的一部分,Kafka 集群会自动为消费者分配分区。因此,当新消费者加入或现有消费者离开时,它提供了动态扩展和负载均衡。因此,管理分区分配变得更加简单。
下面是 subscribe() 方法的定义:
它订阅传递给它的主题列表。它还有其他重载形式。然而,主要思想保持不变:获取动态分配的分区。
2.2. 示例
让我们看一个使用 subscribe() 方法的示例。为了简单起见,我们将使用一个单独的未分组消费者,即一个不属于任何消费者组的消费者。下面是 Java 代码片段:
首先,我们创建一个 Kafka 消费者,然后订阅一个名为 test-topic 的单个主题。
然后,我们在一个无限的 while 循环中获取该主题的传入样本:
我们使用 KafkaConsumer 类的 poll() 方法来获取接收到的样本。如果有样本,它会立即返回。否则,它会等待超时时间,在本例中为 1000 毫秒。如果超时到期,poll() 方法将返回一个空记录。我们通过遍历记录来打印接收到的样本的值和分区。
2.3. 测试示例
现在,让我们测试消费者的行为。首先,我们需要使用 kafka-topics.sh 脚本创建主题 test-topic:
我们使用 --partitions 选项明确指定分区数为 3。实际上,默认情况下它也是 3。现在,让我们使用 kafka-console-producer.sh 脚本启动一个生产者:
箭头符号 > 表示我们已准备好向 test-topic 发送消息。我们使用 RoundRobinPartitioner 策略,通过 --producer-property 选项让生产者以轮询方式写入主题。否则,写入主题的键为 null,并且主题只会被写入到一个随机选择的分区中。
现在,让我们发送六条消息:
随后,让我们检查消费者应用程序的输出:
从输出中可以明显看出,由于该消费者不属于特定的消费者组,我们接收到了所有分区中的所有消息。正如预期的那样,有三个分区。Message1 和 Message4 位于第一个分区 Partition 0 中。同样,Message2 和 Message5 在第二个分区 Partition 1 中,而 Message3 和 Message6 在最后一个分区 Partition 2 中。
3. 使用 assign() 进行手动分区分配
在本节中,我们将讨论 KafkaConsumer 类的 assign() 方法。
3.1. KafkaConsumer.assign()
我们使用 KafkaConsumer 类的 assign() 方法来手动将分区分配给消费者。因此,它提供了对分区的完全控制。由于当新消费者加入或现有消费者离开时没有任何自动重平衡,因此由于从同一分区读取,它可能会提供更稳定的消费体验。但是,没有自动扩展或容错能力。
下面是 assign() 方法的定义:
它获取一个分区列表作为输入,并将它们分配给消费者。
3.2. 示例
让我们看一个使用 assign() 方法的示例。下面是 Java 代码片段:
创建 Kafka 消费者后,我们将 test-topic 的第二个分区分配给此消费者。正如我们在上一节的示例中所见,分区编号从 0 开始。因此,在调用 TopicPartition 的构造函数时,第二个参数即 1,对应于第二个分区。
我们使用上一节中相同的 while 循环来获取接收到的主题样本。
3.3. 测试示例
为了测试应用程序,让我们使用 kafka-console-producer.sh 脚本启动一个生产者并写入六条消息:
写入消息后,让我们检查消费者应用程序的输出:
现在,正如预期的那样,我们没有接收所有消息,而是只读取了 Partition 1 中的消息,即 Message12 和 Message15。
4. 结论
在本文中,我们讨论了 Kafka Java 客户端 API 中 subscribe() 和 assign() 方法之间的区别。首先,我们了解了 subscribe() 方法,它获取一个主题名称列表。消费者订阅列表中的主题。
然后,我们讨论了 assign() 方法,它获取一个分区列表。消费者订阅由主题名称和相应分区号组成的分区。