Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 45 additions & 33 deletions docs/streams/developer-guide/dsl-api.html
Original file line number Diff line number Diff line change
Expand Up @@ -3127,15 +3127,12 @@ <h3>
<h4>Operations and concepts</h4>
<ul>
<li><code>KStream#process</code>: Process all records in a stream, one record at a time, by applying a
<code>Processor</code> (provided by a given <code>ProcessorSupplier</code>);
<code>Processor</code/<code>FixedKeyProcessor</code> (provided by a given <code>ProcessorSupplier</code> or <code>FixedKeyProcessorSupplier</code>);
</li>
<li><code>KStream#processValues</code>: Process all records in a stream, one record at a time, by applying a
<code>FixedKeyProcessor</code> (provided by a given <code>FixedKeyProcessorSupplier</code>)
[<b>CAUTION:</b> If you are deploying a new Kafka Streams application, and you are using the
"merge repartition topics" optimization, you should enable the fix for
<a href="https://issues.apache.org/jira/browse/KAFKA-19668">KAFKA-19668</a> to avoid compatibility
issues for future upgrades to newer versions of Kafka Streams;
For more details, see the <a href="#transformers-removal-and-migration-to-processors">migration guide</a> below];
<li>If you are upgrading from an older version of Kafka Streams, and migrate your code from
<code>[flat]TransformValues()</code> or <code>processValues()</code>, see the
<a href="#transformers-removal-and-migration-to-processors">migration guide</a> below,
to ensure backward compatibility.
</li>
<li><code>Processor</code>: A processor of key-value pair records;</li>
<li><code>ContextualProcessor</code>: An abstract implementation of <code>Processor</code> that manages the
Expand Down Expand Up @@ -3166,23 +3163,23 @@ <h4>Examples</h4>
<tbody>
<tr>
<td><a href="#categorizing-logs-by-severity">Categorizing Logs by Severity</a></td>
<td><code>process</code></td>
<td><code>process</code> with <code>Processor</code></td>
<td>Stateless</td>
</tr>
<tr>
<td><a href="#replacing-slang-in-text-messages">Replacing Slang in Text Messages</a></td>
<td><code>processValues</code></td>
<td><code>process</code> with <code>FixedKeyProcessor</code></td>
<td>Stateless</td>
</tr>
<tr>
<td><a href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a Loyalty Program</a>
</td>
<td><code>process</code></td>
<td><code>process with <code>Processor</code></code></td>
<td>Stateful</td>
</tr>
<tr>
<td><a href="#traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</a></td>
<td><code>processValues</code></td>
<td><code>process</code> with <code>FixedKeyProcessor</code></td>
<td>Stateful</td>
</tr>
</tbody>
Expand All @@ -3205,7 +3202,7 @@ <h5 id="categorizing-logs-by-severity">Categorizing Logs by Severity</h5>
private static final String UNKNOWN_LOGS_TOPIC = &quot;unknown-logs-topic&quot;;
private static final String WARN_LOGS_TOPIC = &quot;warn-logs-topic&quot;;

public static void categorizeWithProcess(final StreamsBuilder builder) {
public static void categorizeWithProcessor(final StreamsBuilder builder) {
final KStream&lt;String, String&gt; logStream = builder.stream(INPUT_LOGS_TOPIC);
logStream.process(LogSeverityProcessor::new)
.to((key, value, recordContext) -&gt; {
Expand Down Expand Up @@ -3267,9 +3264,9 @@ <h5 id="replacing-slang-in-text-messages">Replacing Slang in Text Messages</h5>
private static final String INPUT_MESSAGES_TOPIC = &quot;input-messages-topic&quot;;
private static final String OUTPUT_MESSAGES_TOPIC = &quot;output-messages-topic&quot;;

public static void replaceWithProcessValues(final StreamsBuilder builder) {
public static void replaceWithFixedKeyProcessor(final StreamsBuilder builder) {
KStream&lt;String, String&gt; messageStream = builder.stream(INPUT_MESSAGES_TOPIC);
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
messageStream.process(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
}

private static class SlangReplacementProcessor extends ContextualFixedKeyProcessor&lt;String, String, String&gt; {
Expand Down Expand Up @@ -3307,7 +3304,7 @@ <h5 id="cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a L
private static final String DISCOUNT_NOTIFICATIONS_TOPIC = &quot;discount-notifications-topic&quot;;
private static final String PURCHASE_EVENTS_TOPIC = &quot;purchase-events-topic&quot;;

public static void applyDiscountWithProcess(final StreamsBuilder builder) {
public static void applyDiscountWithStatefulProcessor(final StreamsBuilder builder) {
// Define the state store for tracking cumulative spending
builder.addStateStore(
Stores.keyValueStoreBuilder(
Expand Down Expand Up @@ -3375,7 +3372,7 @@ <h5 id="traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</
private static final String DAILY_COUNT_TOPIC = &quot;price-state-topic&quot;;
private static final String RADAR_COUNT_TOPIC = &quot;car-radar-topic&quot;;

public static void countWithProcessValues(final StreamsBuilder builder) {
public static void countWithStatefulFixedKeyProcessor(final StreamsBuilder builder) {
// Define a state store for tracking daily car counts
builder.addStateStore(
Stores.keyValueStoreBuilder(
Expand All @@ -3386,7 +3383,7 @@ <h5 id="traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</
);
final KStream&lt;Void, String&gt; radarStream = builder.stream(RADAR_COUNT_TOPIC);
// Apply the FixedKeyProcessor with the state store
radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
radarStream.process(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
.to(DAILY_COUNT_TOPIC);
}

Expand Down Expand Up @@ -3428,7 +3425,7 @@ <h5 id="traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</

<h4>Keynotes</h4>
<ul>
<li><strong>Type Safety and Flexibility:</strong> The process and processValues APIs utilize
<li><strong>Type Safety and Flexibility:</strong> The process APIs utilize
<code>ProcessorContext</code> and <code>Record</code> or <code>FixedKeyRecord</code> objects for better type
safety and flexibility of custom processing logic.
</li>
Expand Down Expand Up @@ -3462,7 +3459,14 @@ <h3>
<p>The Processor API now serves as a unified replacement for all these methods. It simplifies the API surface
while maintaining support for both stateless and stateful operations.</p>

<p><b>CAUTION:</b> If you are using <code>KStream.transformValues()</code> and you have the "merge repartition topics"
<p><b>Notes for 4.3 and newer releases:</b>
If you are using <code>KStream.transformValues()</code> or <code>KStream.flatTransformValues()</code>
and migrate to <code>KStream.process(FixedKeyProcessorSupplier)</code> the code is fully backward compatible,
and no further action is needed.
If you are using <code>KStream.processValues()</code> and migrate to <code>KStream.process(FixedKeyProcessorSupplier)</code>
might <b>not</b> be backward compatible; compare the below paragraph for more details.
</p>
<p><b>CAUTION (for 4.0, 4.1) releases:</b> If you are using <code>KStream.transformValues()</code> and you have the "merge repartition topics"
optimization enabled, rewriting your program to <code>KStream.processValues()</code> might not be safe due to
<a href="https://issues.apache.org/jira/browse/KAFKA-19668">KAFKA-19668</a>. For this case, you should not upgrade
to Kafka Streams 4.0.0 or 4.1.0, but use Kafka Streams 4.0.1 instead, which contains a fix.
Expand All @@ -3477,13 +3481,14 @@ <h3>
final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(properties)));</code></pre>

<p>It is recommended, that you compare the output of <code>Topology.describe()</code> for the old and new topology,
to verify if the rewrite to <code>processValues()</code> is correct, and that it does not introduce any incompatibilities.
to verify if the rewrite to <code>processValues()</code> or <code>process(FixedKeyProcessorSupplier)</code> is correct,
and that it does not introduce any incompatibilities.
You should also test the upgrade in a non-production environment.</p>

<h4>Migration Examples</h4>
<p>To migrate from the deprecated <code>transform</code>, <code>transformValues</code>, <code>flatTransform</code>, and
<code>flatTransformValues</code> methods to the Processor API (PAPI) in Kafka Streams, let&#39;s resume the
previouss examples. The new <code>process</code> and <code>processValues</code> methods enable a more flexible
previous examples. The new <code>process</code> method enable a more flexible
and reusable approach by requiring implementations of the <code>Processor</code>
or <code>FixedKeyProcessor</code> interfaces.</p>
<table>
Expand All @@ -3499,26 +3504,26 @@ <h4>Migration Examples</h4>
<tr>
<td><a href="#categorizing-logs-by-severity-removal">Categorizing Logs by Severity</a></td>
<td><code>flatTransform</code></td>
<td><code>process</code></td>
<td><code>process</code> with <code>Processor</code></td>
<td>Stateless</td>
</tr>
<tr>
<td><a href="#replacing-slang-in-text-messages-removal">Replacing Slang in Text Messages</a></td>
<td><code>flatTransformValues</code></td>
<td><code>processValues</code></td>
<td><code>process</code> with <code>FixedKeyProcessor</code></td>
<td>Stateless</td>
</tr>
<tr>
<td><a href="#cumulative-discounts-for-a-loyalty-program-removal">Cumulative Discounts for a Loyalty Program</a>
</td>
<td><code>transform</code></td>
<td><code>process</code></td>
<td><code>process</code> with <code>Processor</code></td>
<td>Stateful</td>
</tr>
<tr>
<td><a href="#traffic-radar-monitoring-car-count-removal">Traffic Radar Monitoring Car Count</a></td>
<td><code>transformValues</code></td>
<td><code>processValues</code></td>
<td><code>process</code> with <code>FixedKeyProcessor</code></td>
<td>Stateful</td>
</tr>
</tbody>
Expand Down Expand Up @@ -3615,8 +3620,8 @@ <h5 id="categorizing-logs-by-severity-removal">Categorizing Logs by Severity</h5
}
}</code></pre>
<h5 id="replacing-slang-in-text-messages-removal">Replacing Slang in Text Messages</h5>
<p>Below, methods <code>replaceWithFlatTransformValues</code> and <code>replaceWithProcessValues</code> show how you can
migrate from <code>flatTransformValues</code> to <code>processValues</code>.</p>
<p>Below, methods <code>replaceWithFlatTransformValues</code> and <code>replaceWithProcess</code> show how you can
migrate from <code>flatTransformValues</code> to <code>process</code>.</p>
<pre class="line-numbers"><code class="language-java">public class ReplacingSlangTextInMessagesExample {
private static final Map&lt;String, String&gt; SLANG_DICTIONARY = Map.of(
&quot;u&quot;, &quot;you&quot;,
Expand All @@ -3632,9 +3637,9 @@ <h5 id="replacing-slang-in-text-messages-removal">Replacing Slang in Text Messag
messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
}

public static void replaceWithProcessValues(final StreamsBuilder builder) {
public static void replaceWithProcess(final StreamsBuilder builder) {
KStream&lt;String, String&gt; messageStream = builder.stream(INPUT_MESSAGES_TOPIC);
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
messageStream.process(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
}

private static class SlangReplacementTransformer implements ValueTransformer&lt;String, Iterable&lt;String&gt;&gt; {
Expand Down Expand Up @@ -3680,6 +3685,8 @@ <h5 id="replacing-slang-in-text-messages-removal">Replacing Slang in Text Messag
}
}</code></pre>
<h5 id="cumulative-discounts-for-a-loyalty-program-removal">Cumulative Discounts for a Loyalty Program</h5>
<p>Below, methods <code>applyDiscountWithTransform</code> and <code>applyDiscountWithProcess</code> show how you can
migrate from <code>transform</code> to <code>process</code>.</p>
<pre class="line-numbers"><code class="language-java">public class CumulativeDiscountsForALoyaltyProgramExample {
private static final double DISCOUNT_THRESHOLD = 100.0;
private static final String CUSTOMER_SPENDING_STORE = &quot;customer-spending-store&quot;;
Expand Down Expand Up @@ -3798,8 +3805,8 @@ <h5 id="cumulative-discounts-for-a-loyalty-program-removal">Cumulative Discounts
}
}</code></pre>
<h5 id="traffic-radar-monitoring-car-count-removal">Traffic Radar Monitoring Car Count</h5>
<p>Below, methods <code>countWithTransformValues</code> and <code>countWithProcessValues</code> show how you can migrate
from <code>transformValues</code> to <code>processValues</code>.</p>
<p>Below, methods <code>countWithTransformValues</code> and <code>countWithProcess</code> show how you can migrate
from <code>transformValues</code> to <code>process</code>.</p>
<pre class="line-numbers"><code class="language-java">public class TrafficRadarMonitoringCarCountExample {
private static final String DAILY_COUNT_STORE = &quot;price-state-store&quot;;
private static final String DAILY_COUNT_TOPIC = &quot;price-state-topic&quot;;
Expand Down Expand Up @@ -3831,7 +3838,7 @@ <h5 id="traffic-radar-monitoring-car-count-removal">Traffic Radar Monitoring Car
);
final KStream&lt;Void, String&gt; radarStream = builder.stream(RADAR_COUNT_TOPIC);
// Apply the FixedKeyProcessor with the state store
radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
radarStream.process(DailyCarCountProcessor::new, DAILY_COUNT_STORE)
.to(DAILY_COUNT_TOPIC);
}

Expand Down Expand Up @@ -3919,6 +3926,11 @@ <h4>Keynotes</h4>
<li><strong>Unified API:</strong> Consolidates multiple methods into a single, versatile API.</li>
<li><strong>Future-Proof:</strong> Ensures compatibility with the latest Kafka Streams releases.</li>
</ul>
<h4>Deprecation of <code>processValues</code> Method</h4>
<p>Note that <code>processValues</code> was deprecated in Kafka Streams 4.3.0 release, in favor of a new
overload <code>process(FixedKeyProcessorSupplier)</code> to provide a backward compatible upgrade
for Kafka Streams 3.x for users of <code>[flat]TransformValues</code> methods
(cf. <a href="https://issues.apache.org/jira/browse/KAFKA-19668">KAFKA-19668</a>).</p>
<h4>Removal of Old <code>process</code> Method</h4>
<p>It is worth mentioning that, in addition to the methods mentioned above, the <code>process</code> method, which
integrated the &#39;old&#39; Processor API (i.e., <code>Processor</code> as opposed to the new
Expand Down
Loading
Loading