-
Type: Improvement
-
Status: Resolved
-
Priority: Minor
-
Resolution: Fixed
-
Affects Version/s: 9.10
-
Component/s: Streams
-
Epic Link:
-
Sprint:nxcore 10.1.6
-
Story Points:2
Since NXP-23954 we can set the consumer position to a timestamp when using Kafka.
The timestamp used will be by default the LogAppendTime of Kafka (the time at which the record is received by the Kafka broker).
If the Kafka cluster is mirrored with Kafka Mirror, the LogAppendTime in the target cluster is not the same as in the source cluster and we better rely on our internal timestamp (Record.watermark) when using Nuxeo stream.
So we can extend again the position command to have an option --to-watermark-timestamp, this will work only with a Log of org.nuxeo.lib.stream.computation.Record. The logic will be to read records until a watermark with a greater value is found and commit this position.
Also if the latencies has been tracked using NXP-24512, the command should use this _consumer_latencies log to set consumer positions to the latest or choosen time. This is much more precise than juste using a timestamp, because here we are doing an exact positionning per partition, like PITR.