Uploaded image for project: 'Nuxeo Platform'
  1. Nuxeo Platform
  2. NXP-26145

Make computation retry policy configurable


    • Type: Improvement
    • Status: Resolved
    • Priority: Minor
    • Resolution: Fixed
    • Affects Version/s: None
    • Fix Version/s: 10.3
    • Component/s: Streams


      The computation policy should be part of the configuration so it can be customized on any Nuxeo node.

      At the computation level this means moving the policy to the settings (along with the concurrency and partitioning),
      this should enable to configure a simple back off retry mechanism that catch any exception:

      The delay between retries, exponentially backing off to the maxDelay and multiplying successive delays by a 2 factor
      the maxRetries sets the max number of retries to perform.
      If continueOnFailure is true then the computation will checkpoint after retries and continue processing new records.

      The default policy when unspecified is: no retry and abort on failure (continueOnFailure is false).
      Other policy must be explicitely defined.

      At the nuxeo-runtime-stream level the settings is contributed when defining a processor:

        <streamProcessor name="myStreamProcessor" logConfig="default" defaultConcurrency="4" defaultPartitions="12"
            <!-- options to configure streams -->
            <stream name="output" partitions="1" />
            <!-- option to configure computation -->
            <computation name="myComputation" concurrency="8" />
            <!-- option specific to the processor -->
            <option name="customOption">value</option>
            <!-- policy -->
            <!-- default policy for all the computations of the processor -->
            <policy name="default" continueOnFailure="false" maxRetries="0" />
            <!-- specific policy for a computation: backoff retry and skip failure -->
            <policy name="myComputation" continueOnFailure="true" maxRetries="3" delay="500ms" maxDelay="10s" />
            <!-- secific custom policy for a computation -->
            <policy name="myComputation" continueOnFailure="true" maxRetries="10" class="org.nuxeo.runtime.stream.tests.MyPolicy" />

      To build a custom policy that can catch only specific exception or use different retry mechanism you can pass a class that returns a policy like this:

      public class MyPolicy implements StreamComputationPolicy {
          public ComputationPolicy getPolicy(StreamProcessorDescriptor.PolicyDescriptor descriptor) {
              // here we can use the full failsafe RetryPolicy API
              RetryPolicy retry = new RetryPolicy().withMaxRetries(descriptor.maxRetries);
              return new ComputationPolicyBuilder().retryPolicy(retry).continueOnFailure(descriptor.continueOnFailure).build();




            • Votes:
              0 Vote for this issue
              3 Start watching this issue


              • Created:

                Time Tracking

                Original Estimate - Not Specified
                Not Specified
                Remaining Estimate - 0 minutes
                Time Spent - 3 days


                  Error rendering 'com.pagerduty.jira-server-plugin:PagerDuty'. Please contact your Jira administrators.