The Processor API now serves as a unified replacement for all these methods. It simplifies the API surface
while maintaining support for both stateless and stateful operations.
-
@@ -3499,26 +3504,26 @@ Migration Examples
| Categorizing Logs by Severity |
flatTransform |
- process |
+ process with Processor |
Stateless |
| Replacing Slang in Text Messages |
flatTransformValues |
- processValues |
+ process with FixedKeyProcessor |
Stateless |
| Cumulative Discounts for a Loyalty Program
|
transform |
- process |
+ process with Processor |
Stateful |
| Traffic Radar Monitoring Car Count |
transformValues |
- processValues |
+ process with FixedKeyProcessor |
Stateful |
@@ -3615,8 +3620,8 @@ Categorizing Logs by Severity
Replacing Slang in Text Messages
- Below, methods replaceWithFlatTransformValues and replaceWithProcessValues show how you can
- migrate from flatTransformValues to processValues.
+ Below, methods replaceWithFlatTransformValues and replaceWithProcess show how you can
+ migrate from flatTransformValues to process.
public class ReplacingSlangTextInMessagesExample {
private static final Map<String, String> SLANG_DICTIONARY = Map.of(
"u", "you",
@@ -3632,9 +3637,9 @@ Replacing Slang in Text Messag
messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
}
- public static void replaceWithProcessValues(final StreamsBuilder builder) {
+ public static void replaceWithProcess(final StreamsBuilder builder) {
KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC);
- messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
+ messageStream.process(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
}
private static class SlangReplacementTransformer implements ValueTransformer<String, Iterable<String>> {
@@ -3680,6 +3685,8 @@ Replacing Slang in Text Messag
}
}
Cumulative Discounts for a Loyalty Program
+ Below, methods applyDiscountWithTransform and applyDiscountWithProcess show how you can
+ migrate from transform to process.
public class CumulativeDiscountsForALoyaltyProgramExample {
private static final double DISCOUNT_THRESHOLD = 100.0;
private static final String CUSTOMER_SPENDING_STORE = "customer-spending-store";
@@ -3798,8 +3805,8 @@ Cumulative Discounts
}
}
Traffic Radar Monitoring Car Count
- Below, methods countWithTransformValues and countWithProcessValues show how you can migrate
- from transformValues to processValues.
+ Below, methods countWithTransformValues and countWithProcess show how you can migrate
+ from transformValues to process.
public class TrafficRadarMonitoringCarCountExample {
private static final String DAILY_COUNT_STORE = "price-state-store";
private static final String DAILY_COUNT_TOPIC = "price-state-topic";
@@ -3831,7 +3838,7 @@ Traffic Radar Monitoring Car
);
final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC);
// Apply the FixedKeyProcessor with the state store
- radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
+ radarStream.process(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
.to(DAILY_COUNT_TOPIC);
}
@@ -3919,6 +3926,11 @@ Keynotes
Unified API: Consolidates multiple methods into a single, versatile API.
Future-Proof: Ensures compatibility with the latest Kafka Streams releases.
+ Deprecation of processValues Method
+ Note that processValues was deprecated in Kafka Streams 4.3.0 release, in favor of a new
+ overload process(FixedKeyProcessorSupplier) to provide a backward compatible upgrade
+ for Kafka Streams 3.x for users of [flat]TransformValues methods
+ (cf. KAFKA-19668).
Removal of Old process Method
It is worth mentioning that, in addition to the methods mentioned above, the process method, which
integrated the 'old' Processor API (i.e., Processor as opposed to the new
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index acbe1a6b7e251..2410be398344c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -52,8 +52,7 @@
* {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
* A {@link KStream} can also be directly {@link KStream#toTable() converted} into a {@code KTable}.
* Kafka Streams DSL can be mixed-and-matched with the Processor API (PAPI) (cf. {@link Topology}) via
- * {@link #process(ProcessorSupplier, String...) process(...)} and {@link #processValues(FixedKeyProcessorSupplier,
- * String...) processValues(...)}.
+ * {@link #process(ProcessorSupplier, String...) process(...)}.
*
* @param the key type of this stream
* @param the value type of this stream
@@ -63,7 +62,7 @@ public interface KStream {
/**
* Create a new {@code KStream} that consists of all records of this stream which satisfy the given predicate.
* All records that do not satisfy the predicate are dropped.
- * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
+ * This is a stateless record-by-record operation (cf. {@link #process(FixedKeyProcessorSupplier, String...)}
* for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
*
* @param predicate
@@ -86,7 +85,7 @@ public interface KStream {
* Create a new {@code KStream} that consists all records of this stream which do not satisfy the given
* predicate.
* All records that do satisfy the predicate are dropped.
- * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
+ * This is a stateless record-by-record operation (cf. {@link #process(FixedKeyProcessorSupplier, String...)}
* for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
*
* @param predicate
@@ -156,7 +155,7 @@ KStream selectKey(final KeyValueMapper super K, ? super V, ? e
* Thus, an input record {@code } can be transformed into an output record {@code }.
* If you need read access to the input record key, use {@link #mapValues(ValueMapperWithKey)}.
* This is a stateless record-by-record operation (cf.
- * {@link #processValues(FixedKeyProcessorSupplier, String...)} for stateful value processing or if you need access
+ * {@link #process(FixedKeyProcessorSupplier, String...)} for stateful value processing or if you need access
* to the record's timestamp, headers, or other metadata).
*
* The example below counts the number of token of the value string.
@@ -320,7 +319,7 @@ KStream flatMap(final KeyValueMapper super K, ? super V,
* (possibly of a different type) for it.
* Thus, an input record {@code } can be transformed into output records {@code , , ...}.
* If you need read access to the input record key, use {@link #flatMapValues(ValueMapperWithKey)}.
- * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}
+ * This is a stateless record-by-record operation (cf. {@link #process(FixedKeyProcessorSupplier, String...)}
* for stateful record processing or if you need access to the record's timestamp, headers, or other metadata).
*
* The example below splits input records {@code } containing sentences as values into their words.
@@ -1540,7 +1539,7 @@ KStream leftJoin(final GlobalKTable KStream process(
* However, because the key cannot be modified, some restrictions apply to a {@link FixedKeyProcessor} compared
* to a {@link Processor}: for example, forwarding result records from a {@link Punctuator} is not possible.
*/
+ KStream process(
+ final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
+ final String... stateStoreNames
+ );
+
+ /**
+ * See {@link #process(FixedKeyProcessorSupplier, String...)}.
+ *
+ * Takes an additional {@link Named} parameter that is used to name the processor in the topology.
+ */
+ KStream process(
+ final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
+ final Named named,
+ final String... stateStoreNames
+ );
+
+ /**
+ * Process all records in this stream, one record at a time, by applying a {@link FixedKeyProcessor} (provided by
+ * the given {@link FixedKeyProcessorSupplier}) to each input record.
+ * This method is similar to {@link #process(ProcessorSupplier, String...)}, however the key of the input
+ * {@link Record} cannot be modified.
+ *
+ * Because the key cannot be modified, this method is not a key changing operation and preserves data
+ * co-location with respect to the key (cf. {@link #flatMapValues(ValueMapper)}).
+ * Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join)
+ * is applied to the result {@code KStream}.
+ *
+ *
However, because the key cannot be modified, some restrictions apply to a {@link FixedKeyProcessor} compared
+ * to a {@link Processor}: for example, forwarding result records from a {@link Punctuator} is not possible.
+ *
+ * @deprecated Since 4.3. Use {@link #process(FixedKeyProcessorSupplier, String...)} instead.
+ * Note, that upgrading from {@code processValues()} to {@code process()} is not always backward compatibly.
+ * Please consult the Kafka Streams upgrade documentation for more details.
+ */
+ @Deprecated
KStream processValues(
final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
final String... stateStoreNames
@@ -1586,7 +1620,12 @@ KStream processValues(
* See {@link #processValues(FixedKeyProcessorSupplier, String...)}.
*
* Takes an additional {@link Named} parameter that is used to name the processor in the topology.
+ *
+ * @deprecated Since 4.3. Use {@link #process(FixedKeyProcessorSupplier, Named, String...)} instead.
+ * Note, that upgrading from {@code processValues()} to {@code process()} is not always backward compatibly.
+ * Please consult the Kafka Streams upgrade documentation for more details.
*/
+ @Deprecated
KStream processValues(
final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 02342a9de4a75..1db3d46d5fae9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -1326,6 +1326,33 @@ public KStream process(
builder);
}
+ @Override
+ public KStream process(
+ final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
+ final String... stateStoreNames
+ ) {
+ return process(
+ processorSupplier,
+ Named.as(builder.newProcessorName(PROCESSVALUES_NAME)),
+ stateStoreNames
+ );
+ }
+
+ @Override
+ public KStream process(
+ final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
+ final Named named,
+ final String... stateStoreNames
+ ) {
+ return processValuesInternal(
+ true,
+ processorSupplier,
+ named,
+ stateStoreNames
+ );
+ }
+
+ @Deprecated
@Override
public KStream processValues(
final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
@@ -1338,11 +1365,26 @@ public KStream processValues(
);
}
+ @Deprecated
@Override
public KStream processValues(
final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
final Named named,
final String... stateStoreNames
+ ) {
+ return processValuesInternal(
+ builder.processProcessValueFixEnabled(),
+ processorSupplier,
+ named,
+ stateStoreNames
+ );
+ }
+
+ private KStream processValuesInternal(
+ final boolean enableProcessProcessValueFix,
+ final FixedKeyProcessorSupplier super K, ? super V, ? extends VOut> processorSupplier,
+ final Named named,
+ final String... stateStoreNames
) {
ApiUtils.checkSupplier(processorSupplier);
Objects.requireNonNull(named, "named cannot be null");
@@ -1357,7 +1399,7 @@ public KStream processValues(
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames
);
- if (builder.processProcessValueFixEnabled()) {
+ if (enableProcessProcessValueFix) {
processNode.setValueChangingOperation(true);
}