The records used by Nuxeo Stream Processor are computation Record, with the following fields:
- a key (string) use as rooting or partition key
- a watermark (timestamp and sequence)
- some internal flags (stored on a single byte)
- the data (a byte array) representing the message
This record can be encoded using different codecs:
- Avro Message using a local fs schema registry
- Avro binary, in this case, you need to have a hard-coded schema
- Avro in JSON mostly for debug purpose
- Java Externalizable for complex messages or legacy code
- Avro Confluent message that uses the Confluent Schema registry.
Also, the data byte array can represent different things depending on the convention between the producer and consumer. This message can be encoded using any of the above codecs. The record and its message don't have to use the same codec.
So when we have an encoded Record, we need to decode it to get the binary data, then we need to decode the data to get the inner message, it is a 2 levels codec.
The record envelope is nice to get stats on latency because all records are homogeneous but for interoperability, it is hard to get the double decoding logic in place. For instance, when using KSQL on a stream with the Avro Confluent topic only the record fields are accessible (so the key and the watermark roughly).
So we need a new codec that could be used when the message is encoded in Avro, the result will be a flat record holding the computation Record fields and the message fields encoded using the Avro Confluent format.
Not only KSQL should be able to access all the fields (record and message) but we should also be able to read from a KSQL output stream using normal computation.
Also, this format will be handy for other tools: