Ohhnews

分类导航

$ cd ..
Baeldung原文

理解并避免Kafka中的CommitFailedException

#kafka#commitfailedexception#偏移提交#消费者组#异步处理

[LOADING...]

1. 概述

Kafka 在消费者提交已处理消息的偏移量方面提供了高度灵活性。它包含同步、异步和自动提交选项。

虽然这降低了提交过程的复杂性,但意外错误仍可能发生。

在本教程中,我们将了解 Kafka 消费者应用程序中与提交偏移量相关的故障。我们将调试导致 CommitFailedException 的根本原因,并实现几种解决方案来尽可能避免该异常。

2. 实现一个消费者服务来演示问题

让我们构建一个简单的消费者应用程序,顺序处理记录。

我们将在 KafkaConsumerService 类中实现 consume 方法:

$ java
public class KafkaConsumerService {
    private final KafkaConsumer<String, String> consumer;
    private final AtomicBoolean running = new AtomicBoolean(true);
    public void consume() {
        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                if (records.isEmpty()) {
                    continue;
                }
                records.forEach(this::simulateDBCall);
                consumer.commitSync();
            }
        } catch (WakeupException ex) {
            if (running.get()) {
                log.error("Kafka 消费者出错,异常", ex);
                throw ex;
            }
        } finally {
            consumer.close();
        }
    }
}

在上面的代码中,我们顺序处理记录,然后提交偏移量。

为了模拟数据库调用,我们将在上面的 simulateDBCall 方法中添加一个延迟:

$ java
private void simulateDBCall(ConsumerRecord<String, String> record) {
    try {
        log.info("模拟数据库调用 - 记录 key {} value {}", record.key(), record.value());
        Thread.sleep(150L);
    } catch (InterruptedException ex) {
        Thread.currentThread()
          .interrupt();
        throw new RuntimeException(ex);
    }
}

该延迟会导致消费者在下一条记录之前等待大约 150 毫秒。

3. 测试服务

接下来,让我们测试消费者服务。请注意,我们在测试框架中使用了 Testcontainers 来提供本地 Kafka 代理进行测试;完整细节可在本文末尾链接的支持 GitHub 仓库中找到。

首先,我们在测试类中包含与消费者相关的配置:

$ java
private static Properties getConsumerConfig() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
    consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-app");
    consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
    consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 50);
    
    return consumerProperties;
}

在上述配置中,我们轮询一条记录,最大轮询间隔为 50 毫秒,并手动提交偏移量。

我们将通过生产、消费一条消息并验证偏移量来实现一个测试用例:

$ java
@Test
void givenProducerMessageIsSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() {
    KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic");
    Thread th = new Thread(kafkaConsumerService::consume);
    th.start();
    try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) {
        producer.send(new ProducerRecord<>("test-topic", "x1", "test"));
        producer.flush();
    }
    Awaitility.await()
      .atMost(30, TimeUnit.SECONDS)
      .pollInterval(2, TimeUnit.SECONDS)
      .untilAsserted(() -> {
          TopicPartition topicPartition = new TopicPartition("test-topic", 0);
          Map<TopicPartition, OffsetAndMetadata> committedOffsets;
          try (AdminClient adminClient = AdminClient.create(getAdminProps())) {
              ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app");
              committedOffsets = result.partitionsToOffsetAndMetadata()
                .get();
          }
          assertNotNull(committedOffsets);
          assertNotNull(committedOffsets.get(topicPartition));
          assertEquals(1L, committedOffsets.get(topicPartition)
            .offset());
      });
    kafkaConsumerService.shutdown();
}

但偏移量并未提交,我们会看到以下错误日志,测试用例也会失败:

$ java
org.apache.kafka.clients.consumer.CommitFailedException: 偏移量提交无法完成,因为消费者不是活动组的一部分以进行自动分区分配;消费者很可能已被踢出组。
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1280)
	...
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:918)
	at com.baeldung.kafka.commitfailure.sequential.KafkaConsumerService.consume(KafkaConsumerService.java:34)

从上面的日志可以明显看出,CommitFailedException 是在 consume 方法中提交偏移量时抛出的。同时,它指出消费者不再是组的一部分。Kafka 在其类文档中给出了可能原因的提示。

4. CommitFailedException 的根本原因

我们可以通过多种方式监控和调试根本原因,包括容器日志、CLI 和 Kafka UI 工具。

首先,检查 Kafka 容器日志中的错误:

$ bash
[2026-06-08 16:53:05,349] INFO [GroupCoordinator 1]: 准备在状态 PreparingRebalance 下重新平衡组 consumer-app,旧 generation 1 (__consumer_offsets-0) (原因: 移除成员 consumer-consumer-app-1-3792090b-be55-4e19-bdbd-a3c66b968168 通过 LeaveGroup; 客户端原因: 消费者轮询超时已过期。)
...
[2026-06-08 16:53:05,354] INFO [GroupCoordinator 1]: 成员 MemberMetadata(memberId=consumer-consumer-app-1-3792090b-be55-4e19-bdbd-a3c66b968168, groupInstanceId=None, clientId=consumer-consumer-app-1, clientHost=/172.17.0.1, sessionTimeoutMs=45000, rebalanceTimeoutMs=50, supportedProtocols=List(range, cooperative-sticky)) 已通过显式 `LeaveGroup` 离开组 consumer-app;客户端原因: 消费者轮询超时已过期...

我们还可以使用 kafka-consumer-groups 命令确认消费者组的状态:

$ bash
sh-4.4$ /usr/bin/kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group consumer-app --state
Consumer group 'consumer-app' has no active members.
GROUP                     COORDINATOR (ID)          ASSIGNMENT-STRATEGY  STATE           #MEMBERS
consumer-app              8db993586dab:9092  (1)                         Empty           0

根据上述日志和状态,我们可以看到消费者因客户端特定错误被移除:消费者轮询超时已过期。

Kafka 有多种方式检查消费者是否健康并正常运行。这包括后台心跳检查,该检查应在会话超时之前运行。

此外,Kafka 期望消费者在指定的最大轮询间隔时间周期内轮询一次。 如果消费者因任何原因未轮询,它将被视为死亡,分区将重新分配给其他消费者

通过进一步调试并重新验证应用程序日志,我们观察到记录处理明显花费了更多时间,并且消费者未能在指定的 MAX_POLL_INTERVAL_MS_CONFIG 时间周期内调用 poll 方法。因此,Kafka 组协调器移除了该消费者。

因此,任何进一步提交偏移量的尝试都会失败并出现该错误。这是有道理的,因为消费者不再是分区所有者,提交可能会破坏偏移量状态。

我们将探讨几种实际的方法来防止该错误。## 5. 避免问题的方法

在理解消费者轮询后,我们将通过分析消费者代码和配置来尝试解决该问题。

我们可以考虑启用自动提交配置,但这会带来数据丢失和提交时机不规律的可能性。此外,此方法无法确保至少一次处理。

相反,我们将探讨一些相关的手动提交解决方案。

5.1. 调整轮询时长和批量大小

通过调试代码,我们发现实际处理时间显著长于指定的最大轮询间隔。

我们将增加最大轮询间隔配置:

$ java
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300);

我们还可以根据批处理时间和吞吐量需求来微调批量大小。

现在测试更新后的消费者,确认不再抛出 CommitFailedException

需要注意的是,通常将轮询间隔设置为批处理时间的3倍左右,以确保有安全的缓冲时间。

虽然调整轮询间隔有效,但在更复杂的场景下(例如涉及数据库事务或网络调用的处理)仍可能遇到问题。

5.2. 异步处理

如果我们在执行 I/O 操作的同一线程中处理记录,上述解决方案会降低吞吐量并增加延迟。

在生产环境中,无法保证数据库执行时间可预测,因此仅配置轮询时长可能不足

我们可以考虑对记录进行异步处理,这样消费者线程的阻塞时间会大大缩短。

在 Kafka 中,可以通过虚拟线程异步处理记录来实现。

首先,我们在消费者服务中包含一个工作线程池和每个分区的偏移量跟踪映射:

$ java
public class KafkaConsumerService {
    private final KafkaConsumer<String, String> consumer;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final ExecutorService workers;
    private final Map<TopicPartition, AtomicLong> committableOffsets = new ConcurrentHashMap<>();
    public KafkaConsumerService(Properties consumerProps, String topic) {
        this.consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(List.of(topic));
        workers = Executors.newVirtualThreadPerTaskExecutor();
    }
}

接下来,我们更新 consume 方法,使用工作线程进行异步处理:

$ java
public void consume() {
    try {
        while (running.get()) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            if (records.isEmpty()) {
                continue;
            }
            List<CompletableFuture<Void>> futures = processAsync(records);
            try {
                CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new))
                  .orTimeout(700L, TimeUnit.MILLISECONDS)
                  .join();
            } catch (CompletionException ex) {
                log.error("Batch processing timed out or failed", ex);
            }
            commitOffsets();
        }
    } catch (WakeupException ex) {
        if (running.get()) {
            log.error("Error in the Kafka Consumer with exception", ex);
            throw ex;
        }
    } finally {
        commitOffsets();
        consumer.close();
    }
}

我们实现 processAsync 方法来异步处理记录:

$ java
private List<CompletableFuture<Void>> processAsync(ConsumerRecords<String, String> records) {
    return StreamSupport.stream(records.spliterator(), false)
      .map(record -> CompletableFuture.runAsync(() -> simulateDBUpdate(record), workers)
        .whenComplete((ignored, ex) -> {
            if (ex == null) {
                markComplete(record);
            } else {
                log.error("Failed offset and send to DLQ {} {} {}", 
                  record.offset(), record.key(), ex.getMessage());
            }
        })
        .exceptionally(ex -> null))
      .toList();
}

在上述代码中,消费者等待工作线程完成处理,然后提交已处理的偏移量。如果处理过程中任何记录失败,我们会记录失败消息并将其发送到死信队列(DLQ)主题。

接下来,实现 markComplete 方法,将已处理记录的偏移量更新到 committableOffsets 映射中:

$ java
private void markComplete(ConsumerRecord<String, String> record) {
    TopicPartition tp = new TopicPartition(record.topic(), record.partition());
    committableOffsets.computeIfAbsent(tp, k -> new AtomicLong(-1L))
      .accumulateAndGet(record.offset() + 1L, Math::max);
}

最后,实现 commitOffsets 方法来提交已处理的偏移量:

$ java
private void commitOffsets() {
    Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
    committableOffsets.forEach((tp, atomicOffset) -> {
        long val = atomicOffset.get();
        if (val != -1L) {
            toCommit.put(tp, new OffsetAndMetadata(val));
        }
    });
    if (toCommit.isEmpty()) {
        return;
    }
    consumer.commitSync(toCommit);
    toCommit.forEach((tp, meta) -> {
        AtomicLong ref = committableOffsets.get(tp);
        if (ref != null) {
            ref.compareAndSet(meta.offset(), -1L);
        }
    });
}

在上述方法中,我们为所有已处理的记录构建提交偏移量的快照映射并提交。同时,以原子方式将已提交的偏移量标记为完成。

现在,在测试类中配置消费者,增加批量大小和最大轮询间隔:

$ java
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 1000);

最后,我们通过生产并消费多条消息来验证偏移量,实现一个测试用例:

$ java
@Test
void givenProducerMessagesAreSent_whenConsumerIsRunning_thenConsumerOffsetIsCommitted() {
    KafkaConsumerService kafkaConsumerService = new KafkaConsumerService(getConsumerConfig(), "test-topic");
    Thread th = new Thread(kafkaConsumerService::consume);
    th.start();
    try (KafkaProducer<String, String> producer = new KafkaProducer<>(getProducerConfig())) {
        for (int num = 0; num < 100; num++) {
            producer.send(new ProducerRecord<>("test-topic", "x1" + num, "test" + num));
        }
        producer.flush();
    }
    Awaitility.await()
      .atMost(30, TimeUnit.SECONDS)
      .pollInterval(2, TimeUnit.SECONDS)
      .untilAsserted(() -> {
          TopicPartition topicPartition = new TopicPartition("test-topic", 0);
          Map<TopicPartition, OffsetAndMetadata> committedOffsets;
          try (AdminClient adminClient = AdminClient.create(getAdminProps())) {
              ListConsumerGroupOffsetsResult result = adminClient.listConsumerGroupOffsets("consumer-app");
              committedOffsets = result.partitionsToOffsetAndMetadata()
                .get();
          }
          assertNotNull(committedOffsets);
          assertNotNull(committedOffsets.get(topicPartition));
          assertEquals(100L, committedOffsets.get(topicPartition)
            .offset());
      });
    kafkaConsumerService.shutdown();
}

运行上述测试后,我们成功验证了提交的偏移量与消费的消息总数一致。

5.3. 实现 ConsumerRebalanceListener

应当注意,在消费者中实现 ConsumerRebalanceListener 始终是良好的实践,用于提交任何正在处理的偏移量并清理数据

我们在 KafkaConsumerService 类的 KafkaConsumer 对象中实现 ConsumerRebalanceListener

$ java
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        try {
            commitOffsets(partitions);
        } catch (Exception ex) {
            log.error("Commit failed during rebalance", ex);
        } finally {
            partitions.forEach(committableOffsets::remove);
        }
    }
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        partitions.forEach(committableOffsets::remove);
    }
});

还需要实现 commitOffsets 方法,为特定分区提交偏移量:

$ java
private void commitOffsets(Collection<TopicPartition> partitions) {
    Map<TopicPartition, OffsetAndMetadata> toCommit = new HashMap<>();
    partitions.forEach(tp -> {
        AtomicLong ref = committableOffsets.get(tp);
        if (ref != null) {
            long val = ref.get();
            if (val != -1L) {
                toCommit.put(tp, new OffsetAndMetadata(val));
            }
        }
    });
    if (toCommit.isEmpty()) {
        return;
    }
    consumer.commitSync(toCommit);
}

在上述代码中,我们通过从特定分区的偏移量创建快照映射来提交偏移量。

现在,运行上一节的测试方法,验证更新后的消费者实现的偏移量:

$ bash
13:01:21.571 [virtual-158] INFO  c.b.k.c.f.async.KafkaConsumerService - Simulating a db call - record key x199 value test99

从上述测试可以看出,所有消息都已处理并提交。

5.4. 最佳实践

虽然无法完全避免异常,但我们可以保护系统免受重复处理、数据丢失或静默消息丢弃的影响。

我们应该在生产者端和消费者端实现幂等性,验证数据库约束,并确保无副作用的重试处理。此外,应将失败的消息发送到重试或死信队列主题。

6. 结论

在本文中,我们学习了消费者应用中一种常见的偏移量提交相关异常。我们探讨了 CommitFailedException 发生的原因以及如何预防。我们还通过调整最大轮询间隔和切换到异步处理逻辑实现了预防方案。

与往常一样,示例代码可在 GitHub 上找到。