-
Type: Bug
-
Status: Resolved
-
Priority: Minor
-
Resolution: Cannot Reproduce
-
Affects Version/s: None
-
Fix Version/s: None
-
Component/s: Streams
-
Tags:
-
Upgrade notes:
Despite NXP-30089, we've observed the following error in TestPatternQueuingKafka.killConsumers:
java.lang.NullPointerException at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:449) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.concurrent.ConcurrentHashMap$KeySpliterator.forEachRemaining(ConcurrentHashMap.java:3566) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497) at org.nuxeo.lib.stream.log.internals.AbstractLogManager.close(AbstractLogManager.java:221) at org.nuxeo.lib.stream.log.kafka.KafkaLogManager.close(KafkaLogManager.java:148) at org.nuxeo.lib.stream.tests.pattern.TestPatternQueuing.closeManager(TestPatternQueuing.java:68) at org.nuxeo.lib.stream.tests.pattern.TestPatternQueuing.resetManager(TestPatternQueuing.java:74) at org.nuxeo.lib.stream.tests.pattern.TestPatternQueuing.killConsumers(TestPatternQueuing.java:168)
Output:
2022-04-05 21:37:09,184 [main] INFO [KafkaUtils] Creating topic: nuxeo-test-1649194629177-killConsumers, partitions: 2, replications: 1 2022-04-05 21:37:09,222 [main] INFO [ConsumerPool] Creating consumer pool using Log subscribe on killConsumers 2022-04-05 21:37:09,222 [Nuxeo-ConsumerPool-1] WARN [AbstractCallablePool] Start Nuxeo-Consumer Pool on 2 thread(s). 2022-04-05 21:37:09,230 [Nuxeo-ConsumerPool-1] INFO [AbstractCallablePool] Pool is up and running 2022-04-05 21:37:09,441 [Nuxeo-Consumer-1] INFO [KafkaLogTailer] Rebalance assigned: [killConsumers-00] 2022-04-05 21:37:09,442 [Nuxeo-Consumer-2] INFO [KafkaLogTailer] Rebalance assigned: [killConsumers-01] 2022-04-05 21:37:09,447 [Nuxeo-Consumer-1-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-00, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] WARN [ConsumerRunner] Rollback current batch because of consumer rebalancing 2022-04-05 21:37:09,447 [Nuxeo-Consumer-1-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-00, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] INFO [ConsumerRunner] Rebalance 2022-04-05 21:37:09,468 [Nuxeo-ConsumerPool-1] ERROR [AbstractCallablePool] End of consumer interrupted. 2022-04-05 21:37:09,468 [Nuxeo-ConsumerPool-1] ERROR [AbstractCallablePool] End of consumer interrupted. 2022-04-05 21:37:09,468 [Nuxeo-Consumer-2-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-01, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] WARN [ConsumerRunner] Rollback batch java.lang.InterruptedException: java.lang.InterruptedException at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.poll(KafkaLogTailer.java:220) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.read(KafkaLogTailer.java:175) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.acceptBatch(ConsumerRunner.java:304) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatch(ConsumerRunner.java:256) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatchWithRetry(ConsumerRunner.java:218) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.consumerLoop(ConsumerRunner.java:202) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:147) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] 2022-04-05 21:37:09,469 [Nuxeo-ConsumerPool-1] INFO [ArrayList] Consumer status FAILURE 2022-04-05 21:37:09,468 [Nuxeo-Consumer-1-3-foo] WARN [ConsumerRunner] Rollback batch java.lang.InterruptedException: java.lang.InterruptedException at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.poll(KafkaLogTailer.java:220) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.read(KafkaLogTailer.java:175) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.acceptBatch(ConsumerRunner.java:304) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatch(ConsumerRunner.java:256) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.processBatchWithRetry(ConsumerRunner.java:218) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.consumerLoop(ConsumerRunner.java:202) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:147) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] 2022-04-05 21:37:09,469 [Nuxeo-ConsumerPool-1] INFO [ArrayList] Consumer status FAILURE 2022-04-05 21:37:09,469 [Nuxeo-Consumer-2-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-01, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] WARN [AbstractCoordinator] [Consumer clientId=nuxeo-test-1649194629177-default-30, groupId=nuxeo-test-1649194629177-default] Interrupted while waiting for consumer heartbeat thread to close 2022-04-05 21:37:09,469 [Nuxeo-Consumer-1-3-foo] WARN [AbstractCoordinator] [Consumer clientId=nuxeo-test-1649194629177-default-29, groupId=nuxeo-test-1649194629177-default] Interrupted while waiting for consumer heartbeat thread to close 2022-04-05 21:37:09,469 [Nuxeo-ConsumerPool-1] WARN [ConsumerPool] Consumers status: threads: 2, failure 2, messages committed: 0, elapsed: 0.00s, throughput: 0.00 msg/s 2022-04-05 21:37:09,469 [Nuxeo-Consumer-2-0-rebalance-assigned-KafkaLogTailer{id=default:killConsumers-01, closed=false, codec=org.nuxeo.lib.stream.codec.NoCodec@ca66933}] INFO [KafkaLogTailer] Rebalance revoked: [killConsumers-01] 2022-04-05 21:37:09,469 [Nuxeo-Consumer-1-3-foo] INFO [KafkaLogTailer] Rebalance revoked: [killConsumers-00] 2022-04-05 21:37:09,469 [Nuxeo-Consumer-1-4-rebalance-revoked] ERROR [KafkaConsumer] [Consumer clientId=nuxeo-test-1649194629177-default-29, groupId=nuxeo-test-1649194629177-default] Failed to close coordinator org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?] at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.InterruptedException ... 17 more 2022-04-05 21:37:09,469 [Nuxeo-Consumer-2-0-rebalance-revoked] ERROR [KafkaConsumer] [Consumer clientId=nuxeo-test-1649194629177-default-30, groupId=nuxeo-test-1649194629177-default] Failed to close coordinator org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?] at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.InterruptedException ... 17 more 2022-04-05 21:37:09,473 [Nuxeo-Consumer-2-0-rebalance-revoked] INFO [KafkaLogTailer] Discard error while closing consumer: org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?] at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.InterruptedException ... 17 more 2022-04-05 21:37:09,473 [Nuxeo-Consumer-1-4-rebalance-revoked] INFO [KafkaLogTailer] Discard error while closing consumer: org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.maybeLeaveGroup(AbstractCoordinator.java:1013) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:977) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:895) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2373) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2340) ~[kafka-clients-2.6.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2290) ~[kafka-clients-2.6.0.jar:?] at org.nuxeo.lib.stream.log.kafka.KafkaLogTailer.close(KafkaLogTailer.java:442) ~[nuxeo-stream-2021.19.2.jar:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:151) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.ConsumerRunner.call(ConsumerRunner.java:60) ~[classes/:?] at org.nuxeo.lib.stream.pattern.consumer.internals.AbstractCallablePool.lambda$runPool$1(AbstractCallablePool.java:98) ~[classes/:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?] at java.lang.Thread.run(Thread.java:829) [?:?] Caused by: java.lang.InterruptedException ... 17 more 2022-04-05 21:37:09,473 [main] INFO [KafkaLogTailer] Closing tailer from another thread, send wakeup 2022-04-05 21:37:09,475 [main] INFO [KafkaUtils] Deleting topic: nuxeo-test-1649194629177-killConsumers
- is related to
-
NXP-30089 Possible NPE in KafkaLogTailer.close
- Resolved