Uploaded image for project: 'Nuxeo Platform'
  1. Nuxeo Platform
  2. NXP-28524

Improve ComputationRunner resiliency when Kafka is not reachable



    • Type: Bug
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: 10.10
    • Fix Version/s: 10.10-HF23, 11.1, 2021.0
    • Component/s: Streams
    • Environment:


      The ComputationRunner is a Runnable in charge of executing a computation when using Kafka we use the following API:

      1. Consumer API to poll records from input topics
      2. Consumer API to commit the consumer position on the input topics
      3. Producer API to send records to output streams

      We need to make sure that all this remote call can tolerate a network partition.

      Let's review the 3 cases:

      1. Polling record

      The loop of KafkaConsumer.poll  invocation tolerates network disconnection, just returning no record so we poll until the broker is back.

      2. Commit consumer position

      The checkpoint when using Kafka is done by committing consumer positions using KafkaConsumer.commitSync API.

      This call can be in failure for different reasons:

      1. Kafka doesn't reply within the default.api.timeout.ms (1 min by default) a TimeoutException is raised, (broker names resolve at DNS level and the connections are not refused).
      2. The computation has been fenced by Group Coordinator on the broker side and raise a CommitFailedException
        1. because the consumer doesn't respect the poll interval: consumes max.poll.records (2) within max.poll.interval.ms (1h).
          This happens after max.poll.interval.ms (1h).
        2. because the broker didn't receive a heartbeat during session.timeout.ms (50s in Nuxeo), a rebalancing is scheduled.
          This can happen after session.timeout.ms (50s).
        3. because a  rebalance is in progress the broker is unable to reach Zookeeper within zookeeper.session.timeout.ms (6s)
      3. The computation has been fenced on the client side even if brokers are not reachable, the are check to ensure the max poll interval is respected or a CommitFailedException is raised.
      4. Other unrecoverable errors like Authorization/Interruption/Deleted topic.

      Case 1 can be improved by increasing default.api.timeout.ms.

      Case 2.1 and Case 3 are a misconfiguration we are asking Kafka to fence consumers that are too slow, failure is the expected behavior.

      Case 2.2 can be improved by increasing the session.timeout.ms

      Case 2.3 may not be taken into account considering that Zookeeper must be within the same network than Kafka

      Case 4 fail fast is the better option

      Increasing default.api.timeout.ms to 2min will do the job, this change also affects positively:

      • moving consumer position (seek, seekToEnd, seekToBeginning, position)
      • getting consumer position (committed)

      The poll and send API are using their own timeout and are not impacted by this parameter.

      session.timeout.ms could be increased to 2min as well.

      3. Send records

      The code is using a KafkaProducer.send that timeouts after delivery.timeout.ms (2 minutes by default)

      Proposed solutions

      1. Expose and update parameters

      Expose or update the following configuration to nuxeo.conf:

      • kafka.session.timeout.ms: 50000 (same)
      • kafka.request.timeout.ms=30000 (reduced fixing NXP-28287)
      • kafka.heartbeat.interval.ms=4000 (increased)
      • kafka.max.poll.records=2 (same)
      • kafka.max.poll.interval.ms=720000 (increased)
      • kafka.default.api.timeout.ms=60000 (new option keep default)
      • kafka.delivery.timeout.ms=120000 (new option keep default)


      • network partition tolerance is 50s without triggering rebalancing or require node restart, the duration can be tuned from nuxeo.conf
      • consumer are fenced if processing is too long (2h for 2 records) and the processing fallback can be applied preventing for looping indefinitely in systematic failure


      • it takes at least 50s  before rebalancing when a Nuxeo is abruptly stopped or in case of computation failure
      • a Nuxeo rolling restart is needed to recover a network partition longer than 50s (tunable)

      2. Expose and update parameters and catch the Commit exception

      In addition to 1 catch the commit exception and reset the consumer so the next poll will generate a rebalancing.


      • in case of long network interruption, everything will be back without restart


      • systematic failure because max poll interval will loop forever without emitting alert

      To remove the cons we need to differentiate commit exception:

      1. related to rebalancing in progress due to session expiration or Kafka error
      2. related to max.poll interval not respected

      This could be done by checking the elapsed time between the last poll and the exception if we are < max.poll.interval. 


          Issue Links



              • Votes:
                0 Vote for this issue
                4 Start watching this issue


                • Created:

                  Time Tracking

                  Original Estimate - Not Specified
                  Not Specified
                  Remaining Estimate - 0 minutes
                  Time Spent - 2 days