Today the Lag for a consumer group is expressed as a number of records to process (org.nuxeo.lib.stream.log.LogManager#getLag), this is used and exposed as metrics by the WorkManager.
For Computations pattern the StreamProcessor exposes the low watermark for the processor node (org.nuxeo.lib.stream.computation.StreamProcessor#getLowWatermark()).
Any record older than the low watermark timestamp can be assumed as being processed.
Taking the minimum low watermark of all nodes represents the global low watermark.
This is interesting but hard to exploit because some streams may receive very few records, in this case the low watermark will stay low and we cannot assume that the latency is now() - lowWatermark.
The algorythm to get the latency for a consumer group in a log is:
- For each partition of the log get the last committed position by the consumer group
- If the position is the end of the partition (i.e the consumer has nothing to do) the latency for this partition is 0
- Else we read the last record and get the watermark timestamp the latency for this partition is now() - watermark
The log the latency is max(partition latency).
This will give the latency even for distributed consumers.
Note that requesting the latency has a higher cost than the lag because we need to seek and read a record for each partition.
The goal is to have:
- a getLatency method in the LogManager, the same way as getLag.
- a latency command for stream.sh the same way as lag