The computation policy should be part of the configuration so it can be customized on any Nuxeo node.
At the computation level this means moving the policy to the settings (along with the concurrency and partitioning),
this should enable to configure a simple back off retry mechanism that catch any exception:
The delay between retries, exponentially backing off to the maxDelay and multiplying successive delays by a 2 factor
the maxRetries sets the max number of retries to perform.
If continueOnFailure is true then the computation will checkpoint after retries and continue processing new records.
The default policy when unspecified is: no retry and abort on failure (continueOnFailure is false).
Other policy must be explicitely defined.
At the nuxeo-runtime-stream level the settings is contributed when defining a processor:
<streamProcessor name="myStreamProcessor" logConfig="default" defaultConcurrency="4" defaultPartitions="12" class="org.nuxeo.runtime.stream.tests.MyStreamProcessor"> <!-- options to configure streams --> <stream name="output" partitions="1" /> <!-- option to configure computation --> <computation name="myComputation" concurrency="8" /> <!-- option specific to the processor --> <option name="customOption">value</option> <!-- policy --> <!-- default policy for all the computations of the processor --> <policy name="default" continueOnFailure="false" maxRetries="0" /> <!-- specific policy for a computation: backoff retry and skip failure --> <policy name="myComputation" continueOnFailure="true" maxRetries="3" delay="500ms" maxDelay="10s" /> <!-- secific custom policy for a computation --> <policy name="myComputation" continueOnFailure="true" maxRetries="10" class="org.nuxeo.runtime.stream.tests.MyPolicy" /> </streamProcessor>
To build a custom policy that can catch only specific exception or use different retry mechanism you can pass a class that returns a policy like this:
public class MyPolicy implements StreamComputationPolicy { @Override public ComputationPolicy getPolicy(StreamProcessorDescriptor.PolicyDescriptor descriptor) { // here we can use the full failsafe RetryPolicy API RetryPolicy retry = new RetryPolicy().withMaxRetries(descriptor.maxRetries); return new ComputationPolicyBuilder().retryPolicy(retry).continueOnFailure(descriptor.continueOnFailure).build(); } }