-
Type: Bug
-
Status: Resolved
-
Priority: Major
-
Resolution: Fixed
-
Affects Version/s: 10.10
-
Fix Version/s: 10.10-HF37, 11.1, 2021.0
-
Component/s: Streams
-
Release Notes Summary:Commit failures with Kafka are avoided when rebalancing.
-
Tags:
-
Backlog priority:800
-
Team:PLATFORM
-
Sprint:nxplatform 11.1.34, nxplatform 11.1.35, nxplatform 11.1.36, nxplatform #23
-
Story Points:3
This problem may happen since Nuxeo 10.3 and NXP-25600 that upgrade the Kafka Client lib to 2.x.
Kafka 2.0 introduces a new consumer.poll(Duration) method where there is no more guarantee that the partitions assignment during a rebalancing is completed. This means that when the method returns because of a duration timeout we may be in the middle of a rebalancing. This was not the case before Kafka 2.0 when using the now deprecated method consumer.poll(long).
When this happens the tailer returns no new record. If the position needs to be committed a CommitFailedException is raised:
2020-04-25 20:31:42,850 [Nuxeo-Consumer-04-1555-Random message 999] ERROR [AbstractCallablePool] Exception catch in runner: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820) ~[kafka-clients-2.3.1.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692) ~[kafka-clients-2.3.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454) ~[kafka-clients-2.3.1.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1412) ~[kafka-clients-2.3.1.jar:?] at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.commit(KafkaLogTailer.java:346) ~[nuxeo-stream-PR-3930-11.1-SNAPSHOT.jar:?] at
The exception message is wrong in this case because this is not tied to a poll interval that has not been respected, increasing the max.poll.interval.ms will not help in this case.
This problem has been handled in Kafka 2.5.0 where a more specific exception has been introduced: RebalanceInProgressException.
The consequence for Nuxeo is that during tail processing where consumers are waiting for new records, if there is rebalancing and some position needs to be checkpointed, the commit exception will terminate the consumer thread, resulting in failure and duplicate processing.
This problem has been seen on the Nuxeo-stream-importer where the consumer pool terminates when there is no new record to read generating rebalancing, the result is the presence of duplicate documents created.
The fix is to listen to partition revocation and trigger a Nuxeo rebalance exception.
Also, upgrading to Kafka 2.5.0 clients will help to get a better error message.