A computation that fails to process a record has multiple choice if the exception is catched:
1. log an error and continue (skip the record)
2. log an error and abort
3. retry using a delay that backs off exponentially
4. append the record to an error stream and continue, this requires an additional consumer to handle errors, the record processing order is not respected in this case.
All the above behavior are valid solutions depending on cases.
Note that in case of an uncaught exception the computation thread die and its partitions are reassigned to other computation threads, this is equivalent to a retry but it if the failure is systematic all computation threads will abort, this is a safe behavior because a human intervention is required to fix this.
Some integration with a retry lib like failsafe should be provided
https://github.com/jhalterman/failsafe
It could solve easily 1, 2 and 3:
The goal is that the retry policy is explicit on each computation.