Uploaded image for project: 'Nuxeo Platform'
  1. Nuxeo Platform
  2. NXP-26691

StreamWorkManager workaround for large work

    XMLWordPrintable

    Details

    • Release Notes Summary:
      StreamWorkManager can manage large works.
    • Release Notes Description:
      Hide

      It is now possible to use the StreamWorkManager implementation with large Work that exceed 1MB when serialized. The value is stored outside of the stream, in an external storage. For now the possible storages are the KeyValue store and the Transient store.

      Here are theĀ nuxeo.conf options to use to activate this feature for the StreamWorkManager:

      # Filter big work to be stored outside of the stream
      nuxeo.stream.work.computation.filter.enabled=true
      # Above this threshold in bytes the record value is stored outside of the stream
      nuxeo.stream.work.computation.filter.thresholdSize=1000000
      nuxeo.stream.work.computation.filter.class=org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter
      nuxeo.stream.work.computation.filter.storeName=default
      nuxeo.stream.work.computation.filter.storeKeyPrefix=bigRecord:
      # An alternative storage using the KeyValue store
      #nuxeo.stream.work.computation.filter.class=org.nuxeo.ecm.core.work.KeyValueStoreOverflowRecordFilter # TTL is only taken in account with the KV impl, for TS impl you need to configure TS garbage collector
      #nuxeo.stream.work.computation.filter.storeTTL=4d
      

      When using the TransientStore its TTL (firstLevelTTL) need to be adapted so the record value is not garbage collected before the work has been processed.

      Note that in Nuxeo 9.10 the nuxeo.stream.work.computation.filter.storeTTL option which is used by the KeyValue store implementation needs to be expressed in number of seconds, while in Nuxeo 10.10 and above it can be expressed using a duration string like "48h" or "4d".

      Note also that this ability of using an external storage for large record value is not tied to the StreamWorkManager and can be used in any StreamProcessor.

      Show
      It is now possible to use the StreamWorkManager implementation with large Work that exceed 1MB when serialized. The value is stored outside of the stream, in an external storage. For now the possible storages are the KeyValue store and the Transient store. Here are theĀ  nuxeo.conf options to use to activate this feature for the StreamWorkManager: # Filter big work to be stored outside of the stream nuxeo.stream.work.computation.filter.enabled= true # Above this threshold in bytes the record value is stored outside of the stream nuxeo.stream.work.computation.filter.thresholdSize=1000000 nuxeo.stream.work.computation.filter.class=org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter nuxeo.stream.work.computation.filter.storeName= default nuxeo.stream.work.computation.filter.storeKeyPrefix=bigRecord: # An alternative storage using the KeyValue store #nuxeo.stream.work.computation.filter.class=org.nuxeo.ecm.core.work.KeyValueStoreOverflowRecordFilter # TTL is only taken in account with the KV impl, for TS impl you need to configure TS garbage collector #nuxeo.stream.work.computation.filter.storeTTL=4d When using the TransientStore its TTL ( firstLevelTTL ) need to be adapted so the record value is not garbage collected before the work has been processed. Note that in Nuxeo 9.10 the nuxeo.stream.work.computation.filter.storeTTL option which is used by the KeyValue store implementation needs to be expressed in number of seconds, while in Nuxeo 10.10 and above it can be expressed using a duration string like "48h" or "4d". Note also that this ability of using an external storage for large record value is not tied to the StreamWorkManager and can be used in any StreamProcessor.
    • Tags:
    • Backlog priority:
      1,000
    • Upgrade notes:
      Hide

      The wrongly named class LogConfigDescriptor.StreamDescriptor has been renamed to LogConfigDescriptor.LogDescriptor.

      Show
      The wrongly named class LogConfigDescriptor.StreamDescriptor has been renamed to LogConfigDescriptor.LogDescriptor .
    • Sprint:
      nxcore 11.1.2, nxcore 11.1.3, nxcore 11.1.4, nxcore 11.1.5, nxcore 11.1.6, nxcore 11.1.7
    • Story Points:
      5

      Description

      When scheduling a Work with the StreamWorkManager the Work is serialized and sent into a Nuxeo stream Record.
      The record has a maximum size limit that depends on the Nuxeo Stream implementation (1MB for Kafka, same for Chronicle Queue by default).
      Even if this limit can be tuned at the backend level, it is not recommended to enable a very big record (bigger than ~10MB) for performance reason.

      This means that we want to make sure that Works are always serialized with limited size.
      Some change has already been made in NXP-25716 but still, there are possible cases of overflow:

      • when the JSF UI Bulk File Import action is used with a large number of files. The AsyncEventExecutor attempting to create an instance of AsyncEventExecutor.ListenerWork with an event bundle containing a large number of events, the size of the work exceeds the default Kafka record size when importing 900 files:
        [Transaction] Unexpected exception from afterCompletion; continuing
        java.lang.RuntimeException: Unable to send record: ProducerRecord ...
            at org.nuxeo.lib.stream.log.kafka.KafkaLogAppender.append(KafkaLogAppender.java:130)
            at org.nuxeo.lib.stream.log.kafka.KafkaLogAppender.append(KafkaLogAppender.java:110)
            at org.nuxeo.ecm.core.work.StreamWorkManager.schedule(StreamWorkManager.java: 155)
            at org.nuxeo.ecm.core.work.WorkManagerImpl.schedule(WorkManagerImpl.java: 717)
            at org.nuxeo.ecm.core.event.impl.AsyncEventExecutor.scheduleListeners(AsyncEventExecutor.java:124)
            at org.nuxeo.ecm.core.event.impl.AsyncEventExecutor.run(AsyncEventExecutor.java:92)
            at org.nuxeo.ecm.core.event.impl.EventServiceImpl.fireEventBundle(EventServiceImpl.java:361)
            at org.nuxeo.ecm.core.event.impl.EventServiceImpl.handleTxCommited(EventServiceImpl.java:531)
            at org.nuxeo.ecm.core.event.impl.EventServiceImpl.afterCompletion(EventServiceImpl.java:512)
            at org.apache.geronimo.transaction.manager.TransactionImpl.afterCompletion(TransactionImpl.java:540)
            ...
        Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 3060227 bytes when serialized which is larger than the maximum requets size you have configured with the max.request.size configuration
            at org.apache.kafka.clients.producer.KafkaProducer$FutureFailuer<init>(KafkaProducer.java:1124)
            at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:823)
            ...
        
      • Any existing custom (non Nuxeo) Work may exceed this limit.

      Because we want backward compatibility we should add a workaround to support this large Work.
      In the case of record overflow:

      • we should warn that the serialized Work size is too big: either the work is serializing too many things by error, either it should be refactored to not pass blob or a large amount of data
      • fallback to a KV storage or transient store for the record value.

      Again this is a workaround until works are fixed because having the record value outside of the record have consequences:

      • the log is not anymore the single source of trust
      • failover based on Log replication will depend on an additional storage replication
      • retention of the additional storage must match the Log retention
      • constant throughput will be impaired by mixing very different record sizes and access

        Attachments

          Issue Links

            Activity

              People

              • Votes:
                0 Vote for this issue
                Watchers:
                4 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved:

                  Time Tracking

                  Estimated:
                  Original Estimate - Not Specified
                  Not Specified
                  Remaining:
                  Remaining Estimate - 0 minutes
                  0m
                  Logged:
                  Time Spent - 1 week, 1 day
                  1w 1d