Ohhnews

分类导航

$ cd ..
Baeldung原文

KafkaConsumer中subscribe()与assign()方法的区别

#kafka#java#消息消费#分区分配#api

[LOADING...]

1. 概述

Apache Kafka 提供的客户端库允许开发者使用底层的 Java API(应用程序编程接口)以及其他编程语言来生产和消费消息。该 API 中的 KafkaConsumer 类提供了两种读取消息的方法:subscribe()assign()

在本教程中,我们将讨论 Kafka Java 客户端 API 中 subscribe()assign() 方法之间的区别。我们将看到它们之间的主要区别在于自动和手动 分区 分配。我们在示例中使用的 Kafka 版本是 4.1.1。

2. 使用 subscribe() 进行自动分区分配

在本节中,我们将讨论 KafkaConsumer 类的 subscribe() 方法。

2.1. KafkaConsumer.subscribe()

KafkaConsumer 类的 subscribe() 方法用于订阅一个或多个 主题。如果消费者是 消费者组 的一部分,Kafka 集群会自动为消费者分配分区。因此,当新消费者加入或现有消费者离开时,它提供了动态扩展和负载均衡。因此,管理分区分配变得更加简单。

下面是 subscribe() 方法的定义:

$ java
public void subscribe(Collection<String> topics)

它订阅传递给它的主题列表。它还有其他重载形式。然而,主要思想保持不变:获取动态分配的分区。

2.2. 示例

让我们看一个使用 subscribe() 方法的示例。为了简单起见,我们将使用一个单独的未分组消费者,即一个不属于任何消费者组的消费者。下面是 Java 代码片段:

$ java
// Create Kafka Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe Consumer to Our topics
String topics = "test-topic";
consumer.subscribe(List.of(topics));

首先,我们创建一个 Kafka 消费者,然后订阅一个名为 test-topic 的单个主题。

然后,我们在一个无限的 while 循环中获取该主题的传入样本:

$ java
logger.info("Waiting for messages...");
// Poll the data
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        logger.info("Value: " + record.value() + " -- Partition: " + record.partition());
    }
}

我们使用 KafkaConsumer 类的 poll() 方法来获取接收到的样本。如果有样本,它会立即返回。否则,它会等待超时时间,在本例中为 1000 毫秒。如果超时到期,poll() 方法将返回一个空记录。我们通过遍历记录来打印接收到的样本的值和分区。

2.3. 测试示例

现在,让我们测试消费者的行为。首先,我们需要使用 kafka-topics.sh 脚本创建主题 test-topic

$ bash
$ kafka-topics.sh --bootstrap-server localhost:9092 --topic test-topic --create --partitions 3
Created topic test-topic.

我们使用 --partitions 选项明确指定分区数为 3。实际上,默认情况下它也是 3。现在,让我们使用 kafka-console-producer.sh 脚本启动一个生产者:

$ bash
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>

箭头符号 > 表示我们已准备好向 test-topic 发送消息。我们使用 RoundRobinPartitioner 策略,通过 --producer-property 选项让生产者以轮询方式写入主题。否则,写入主题的键为 null,并且主题只会被写入到一个随机选择的分区中。

现在,让我们发送六条消息:

$ bash
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>Message1
>Message2
>Message3
>Message4
>Message5
>Message6
>

随后,让我们检查消费者应用程序的输出:

$ bash
Waiting for messages...
Value: Message1 -- Partition: 0
Value: Message2 -- Partition: 1
Value: Message3 -- Partition: 2
Value: Message4 -- Partition: 0
Value: Message5 -- Partition: 1
Value: Message6 -- Partition: 2

从输出中可以明显看出,由于该消费者不属于特定的消费者组,我们接收到了所有分区中的所有消息。正如预期的那样,有三个分区。Message1Message4 位于第一个分区 Partition 0 中。同样,Message2Message5 在第二个分区 Partition 1 中,而 Message3Message6 在最后一个分区 Partition 2 中。

3. 使用 assign() 进行手动分区分配

在本节中,我们将讨论 KafkaConsumer 类的 assign() 方法。

3.1. KafkaConsumer.assign()

我们使用 KafkaConsumer 类的 assign() 方法来手动将分区分配给消费者。因此,它提供了对分区的完全控制。由于当新消费者加入或现有消费者离开时没有任何自动重平衡,因此由于从同一分区读取,它可能会提供更稳定的消费体验。但是,没有自动扩展或容错能力。

下面是 assign() 方法的定义:

$ java
public void assign(Collection<TopicPartition> partitions)

它获取一个分区列表作为输入,并将它们分配给消费者。

3.2. 示例

让我们看一个使用 assign() 方法的示例。下面是 Java 代码片段:

$ java
// Create Kafka Consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Subscribe Consumer to Our topics
String topics = "test-topic";
consumer.assign(Arrays.asList(new TopicPartition(topics, 1)));

创建 Kafka 消费者后,我们将 test-topic 的第二个分区分配给此消费者。正如我们在上一节的示例中所见,分区编号从 0 开始。因此,在调用 TopicPartition 的构造函数时,第二个参数即 1,对应于第二个分区。

我们使用上一节中相同的 while 循环来获取接收到的主题样本。

3.3. 测试示例

为了测试应用程序,让我们使用 kafka-console-producer.sh 脚本启动一个生产者并写入六条消息:

$ bash
$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic --producer-property partitioner.class=org.apache.kafka.clients.producer.RoundRobinPartitioner
>Message11
>Message12
>Message13
>Message14
>Message15
>Message16
>

写入消息后,让我们检查消费者应用程序的输出:

$ bash
Waiting for messages...
Value: Message12 -- Partition: 1
Value: Message15 -- Partition: 1

现在,正如预期的那样,我们没有接收所有消息,而是只读取了 Partition 1 中的消息,即 Message12Message15

4. 结论

在本文中,我们讨论了 Kafka Java 客户端 API 中 subscribe()assign() 方法之间的区别。首先,我们了解了 subscribe() 方法,它获取一个主题名称列表。消费者订阅列表中的主题。

然后,我们讨论了 assign() 方法,它获取一个分区列表。消费者订阅由主题名称和相应分区号组成的分区。