-
Type: Bug
-
Status: Resolved
-
Priority: Critical
-
Resolution: Fixed
-
Affects Version/s: None
-
Fix Version/s: 10.10-HF57, 2021.15
-
Component/s: Streams
-
Release Notes Summary:Avoid Stream failures during Kafka rolling upgrade
-
Tags:
-
Team:PLATFORM
-
Sprint:nxplatform #50, nxplatform #51, nxplatform #52
-
Story Points:3
A rolling upgrade on Kafka MSK generates a stream failures during checkpoint.
The consumer is not able to commit the consumer position because the coordinator responsible for the topic partition has restarted and is not aware of the consumer.
The exception is too generic and points to a poll interval problem:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:790) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:870) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:850) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:662) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1409) ~[kafka-clients-2.1.1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1370) ~[kafka-clients-2.1.1.jar:?]
at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.commit(KafkaLogTailer.java:334) ~[nuxeo-stream-10.10-HF45.jar:?]
at org.nuxeo.lib.stream.computation.log.ComputationRunner.saveOffsets(ComputationRunner.java:472) ~[nuxeo-stream-10.10-HF45.jar:?]
at org.nuxeo.lib.stream.computation.log.ComputationRunner.checkpoint(ComputationRunner.java:454) ~[nuxeo-stream-10.10-HF45.jar:?]
In this specific case, the processing of the record will be duplicated (this is expected during checkpoint failure) but we don't want to generate a stream failure (terminating the thread) and have to manually restart nodes.
The exception can be disambiguated if we check the last record read time and make sure it is within the poll interval, in this case, we could just restart a consumer thread.