Uploaded image for project: 'Nuxeo Platform'
  1. Nuxeo Platform
  2. NXP-30709

Support MSK rolling upgrade to avoid stream failure

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Resolved
    • Priority: Critical
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 10.10-HF57, 2021.15
    • Component/s: Streams
    • Release Notes Summary:
      Avoid Stream failures during Kafka rolling upgrade
    • Team:
      PLATFORM
    • Sprint:
      nxplatform #50, nxplatform #51, nxplatform #52
    • Story Points:
      3

      Description

      A rolling upgrade on Kafka MSK generates a stream failures during checkpoint.
      The consumer is not able to commit the consumer position because the coordinator responsible for the topic partition has restarted and is not aware of the consumer.
      The exception is too generic and points to a poll interval problem:

      org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:840) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:790) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:870) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:850) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:662) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1409) ~[kafka-clients-2.1.1.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1370) ~[kafka-clients-2.1.1.jar:?]
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.commit(KafkaLogTailer.java:334) ~[nuxeo-stream-10.10-HF45.jar:?]
      	at org.nuxeo.lib.stream.computation.log.ComputationRunner.saveOffsets(ComputationRunner.java:472) ~[nuxeo-stream-10.10-HF45.jar:?]
      	at org.nuxeo.lib.stream.computation.log.ComputationRunner.checkpoint(ComputationRunner.java:454) ~[nuxeo-stream-10.10-HF45.jar:?]
      

      In this specific case, the processing of the record will be duplicated (this is expected during checkpoint failure) but we don't want to generate a stream failure (terminating the thread) and have to manually restart nodes.

      The exception can be disambiguated if we check the last record read time and make sure it is within the poll interval, in this case, we could just restart a consumer thread.

        Attachments

          Issue Links

            Activity

              People

              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: