Uploaded image for project: 'Nuxeo Platform'
  1. Nuxeo Platform
  2. NXP-30967

Possible NPE in KafkaLogTailer.close (bis)

    XMLWordPrintable

    Details

    • Type: Bug
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: None
    • Fix Version/s: None
    • Component/s: Streams

      Description

      Despite NXP-30089, we've observed the following error in TestPatternQueuingKafka.killConsumers:

      java.lang.NullPointerException
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:449)
      	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
      	at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
      	at java.base/java.util.concurrent.ConcurrentHashMap$KeySpliterator.forEachRemaining(ConcurrentHashMap.java:3566)
      	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
      	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
      	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
      	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
      	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
      	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
      	at org.nuxeo.lib.stream.log.internals.AbstractLogManager.close(AbstractLogManager.java:221)
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogManager.close(KafkaLogManager.java:148)
      	at org.nuxeo.lib.stream.tests.pattern.TestPatternQueuing.closeManager(TestPatternQueuing.java:68)
      	at org.nuxeo.lib.stream.tests.pattern.TestPatternQueuing.resetManager(TestPatternQueuing.java:74)
      	at org.nuxeo.lib.stream.tests.pattern.TestPatternQueuing.killConsumers(TestPatternQueuing.java:168)
      

      Output:

      2022-04-05 21:37:09,184 [main] INFO  [KafkaUtils] Creating topic: nuxeo-test-1649194629177-killConsumers, partitions: 2, replications: 1
      2022-04-05 21:37:09,222 [main] INFO  [ConsumerPool] Creating consumer pool using Log subscribe on killConsumers
      2022-04-05 21:37:09,222 [Nuxeo-ConsumerPool-1] WARN  [AbstractCallablePool] Start Nuxeo-Consumer Pool on 2 thread(s).
      2022-04-05 21:37:09,230 [Nuxeo-ConsumerPool-1] INFO  [AbstractCallablePool] Pool is up and running
      2022-04-05 21:37:09,441 [Nuxeo-Consumer-1] INFO  [KafkaLogTailer] Rebalance assigned: [killConsumers-00]
      2022-04-05 21:37:09,442 [Nuxeo-Consumer-2] INFO  [KafkaLogTailer] Rebalance assigned: [killConsumers-01]
      2022-04-05 21:37:09,447 [Nuxeo-Consumer-1-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-00, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] WARN  [ConsumerRunner] Rollback current batch because of consumer rebalancing
      2022-04-05 21:37:09,447 [Nuxeo-Consumer-1-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-00, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] INFO  [ConsumerRunner] Rebalance
      2022-04-05 21:37:09,468 [Nuxeo-ConsumerPool-1] ERROR [AbstractCallablePool] End of consumer interrupted.
      2022-04-05 21:37:09,468 [Nuxeo-ConsumerPool-1] ERROR [AbstractCallablePool] End of consumer interrupted.
      2022-04-05 21:37:09,468 [Nuxeo-Consumer-2-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-01, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] WARN  [ConsumerRunner] Rollback batch
      java.lang.InterruptedException: java.lang.InterruptedException
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.poll(KafkaLogTailer.java:220) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.read(KafkaLogTailer.java:175) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.acceptBatch(ConsumerRunner.java:304) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatch(ConsumerRunner.java:256) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatchWithRetry(ConsumerRunner.java:218) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.consumerLoop(ConsumerRunner.java:202) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:147) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?]
      	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      2022-04-05 21:37:09,469 [Nuxeo-ConsumerPool-1] INFO  [ArrayList] Consumer status FAILURE
      2022-04-05 21:37:09,468 [Nuxeo-Consumer-1-3-foo] WARN  [ConsumerRunner] Rollback batch
      java.lang.InterruptedException: java.lang.InterruptedException
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.poll(KafkaLogTailer.java:220) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.read(KafkaLogTailer.java:175) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.acceptBatch(ConsumerRunner.java:304) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatch(ConsumerRunner.java:256) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatchWithRetry(ConsumerRunner.java:218) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.consumerLoop(ConsumerRunner.java:202) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:147) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?]
      	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      2022-04-05 21:37:09,469 [Nuxeo-ConsumerPool-1] INFO  [ArrayList] Consumer status FAILURE
      2022-04-05 21:37:09,469 [Nuxeo-Consumer-2-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-01, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] WARN  [AbstractCoordinator] [Consumer clientId=nuxeo-test-1649194629177-default-30, groupId=nuxeo-test-1649194629177-default] Interrupted while waiting for consumer heartbeat thread to close
      2022-04-05 21:37:09,469 [Nuxeo-Consumer-1-3-foo] WARN  [AbstractCoordinator] [Consumer clientId=nuxeo-test-1649194629177-default-29, groupId=nuxeo-test-1649194629177-default] Interrupted while waiting for consumer heartbeat thread to close
      2022-04-05 21:37:09,469 [Nuxeo-ConsumerPool-1] WARN  [ConsumerPool] Consumers status: threads: 2, failure 2, messages committed: 0, elapsed: 0.00s, throughput: 0.00 msg/s
      2022-04-05 21:37:09,469 [Nuxeo-Consumer-2-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-01, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] INFO  [KafkaLogTailer] Rebalance revoked: [killConsumers-01]
      2022-04-05 21:37:09,469 [Nuxeo-Consumer-1-3-foo] INFO  [KafkaLogTailer] Rebalance revoked: [killConsumers-00]
      2022-04-05 21:37:09,469 [Nuxeo-Consumer-1-4-rebalance-revoked] ERROR [KafkaConsumer] [Consumer clientId=nuxeo-test-1649194629177-default-29, groupId=nuxeo-test-1649194629177-default] Failed to close coordinator
      org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?]
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?]
      	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: java.lang.InterruptedException
      	... 17 more
      2022-04-05 21:37:09,469 [Nuxeo-Consumer-2-0-rebalance-revoked] ERROR [KafkaConsumer] [Consumer clientId=nuxeo-test-1649194629177-default-30, groupId=nuxeo-test-1649194629177-default] Failed to close coordinator
      org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?]
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?]
      	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: java.lang.InterruptedException
      	... 17 more
      2022-04-05 21:37:09,473 [Nuxeo-Consumer-2-0-rebalance-revoked] INFO  [KafkaLogTailer] Discard error while closing consumer: 
      org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?]
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?]
      	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: java.lang.InterruptedException
      	... 17 more
      2022-04-05 21:37:09,473 [Nuxeo-Consumer-1-4-rebalance-revoked] INFO  [KafkaLogTailer] Discard error while closing consumer: 
      org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?]
      	at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?]
      	at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?]
      	at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?]
      	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      	at java.lang.Thread.run(Thread.java:829) [?:?]
      Caused by: java.lang.InterruptedException
      	... 17 more
      2022-04-05 21:37:09,473 [main] INFO  [KafkaLogTailer] Closing tailer from another thread, send wakeup
      2022-04-05 21:37:09,475 [main] INFO  [KafkaUtils] Deleting topic: nuxeo-test-1649194629177-killConsumers
      

        Attachments

          Issue Links

            Activity

              People

              • Votes:
                0 Vote for this issue
                Watchers:
                2 Start watching this issue

                Dates

                • Created:
                  Updated: