Skip to content

Commit 7ab3e03

Browse files
yunfengzhou-hubSxnan
authored andcommitted
[FLINK-38581][model] Model Function supports error handling strategy
1 parent 5e54fa3 commit 7ab3e03

File tree

10 files changed

+475
-19
lines changed

10 files changed

+475
-19
lines changed

docs/layouts/shortcodes/generated/model_openai_common_section.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
<td>String</td>
2727
<td>Full URL of the OpenAI API endpoint, e.g., <code class="highlighter-rouge">https://api.openai.com/v1/chat/completions</code> or <code class="highlighter-rouge">https://api.openai.com/v1/embeddings</code></td>
2828
</tr>
29+
<tr>
30+
<td><h5>error-handling-strategy</h5></td>
31+
<td style="word-wrap: break-word;">RETRY</td>
32+
<td><p>Enum</p></td>
33+
<td>Strategy for handling errors during model requests.<br /><br />Possible values:<ul><li>"RETRY": Retry sending the request.</li><li>"FAILOVER": Throw exceptions and fail the Flink job.</li><li>"IGNORE": Ignore the input that caused the error and continue. The error itself would be recorded in log.</li></ul></td>
34+
</tr>
2935
<tr>
3036
<td><h5>max-context-size</h5></td>
3137
<td style="word-wrap: break-word;">(none)</td>
@@ -38,5 +44,17 @@
3844
<td>String</td>
3945
<td>Model name, e.g., <code class="highlighter-rouge">gpt-3.5-turbo</code>, <code class="highlighter-rouge">text-embedding-ada-002</code>.</td>
4046
</tr>
47+
<tr>
48+
<td><h5>retry-fallback-strategy</h5></td>
49+
<td style="word-wrap: break-word;">FAILOVER</td>
50+
<td><p>Enum</p></td>
51+
<td>Fallback strategy to employ if the retry attempts are exhausted. This strategy is applied when error-handling-strategy is set to retry.<br /><br />Possible values:<ul><li>"FAILOVER": Throw exceptions and fail the Flink job.</li><li>"IGNORE": Ignore the input that caused the error and continue. The error itself would be recorded in log.</li></ul></td>
52+
</tr>
53+
<tr>
54+
<td><h5>retry-num</h5></td>
55+
<td style="word-wrap: break-word;">100</td>
56+
<td>Integer</td>
57+
<td>Number of retry for OpenAI client requests.</td>
58+
</tr>
4159
</tbody>
4260
</table>

docs/layouts/shortcodes/generated/openai_configuration.html

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,12 @@
3232
<td>String</td>
3333
<td>Full URL of the OpenAI API endpoint, e.g., <code class="highlighter-rouge">https://api.openai.com/v1/chat/completions</code> or <code class="highlighter-rouge">https://api.openai.com/v1/embeddings</code></td>
3434
</tr>
35+
<tr>
36+
<td><h5>error-handling-strategy</h5></td>
37+
<td style="word-wrap: break-word;">RETRY</td>
38+
<td><p>Enum</p></td>
39+
<td>Strategy for handling errors during model requests.<br /><br />Possible values:<ul><li>"RETRY": Retry sending the request.</li><li>"FAILOVER": Throw exceptions and fail the Flink job.</li><li>"IGNORE": Ignore the input that caused the error and continue. The error itself would be recorded in log.</li></ul></td>
40+
</tr>
3541
<tr>
3642
<td><h5>max-context-size</h5></td>
3743
<td style="word-wrap: break-word;">(none)</td>
@@ -68,6 +74,18 @@
6874
<td><p>Enum</p></td>
6975
<td>The format of the response, e.g., 'text' or 'json_object'.<br /><br />Possible values:<ul><li>"text"</li><li>"json_object"</li></ul></td>
7076
</tr>
77+
<tr>
78+
<td><h5>retry-fallback-strategy</h5></td>
79+
<td style="word-wrap: break-word;">FAILOVER</td>
80+
<td><p>Enum</p></td>
81+
<td>Fallback strategy to employ if the retry attempts are exhausted. This strategy is applied when error-handling-strategy is set to retry.<br /><br />Possible values:<ul><li>"FAILOVER": Throw exceptions and fail the Flink job.</li><li>"IGNORE": Ignore the input that caused the error and continue. The error itself would be recorded in log.</li></ul></td>
82+
</tr>
83+
<tr>
84+
<td><h5>retry-num</h5></td>
85+
<td style="word-wrap: break-word;">100</td>
86+
<td>Integer</td>
87+
<td>Number of retry for OpenAI client requests.</td>
88+
</tr>
7189
<tr>
7290
<td><h5>seed</h5></td>
7391
<td style="word-wrap: break-word;">(none)</td>

flink-models/flink-model-openai/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ under the License.
127127
<version>${project.version}</version>
128128
<scope>test</scope>
129129
</dependency>
130+
<dependency>
131+
<groupId>org.apache.flink</groupId>
132+
<artifactId>flink-test-utils-junit</artifactId>
133+
<version>${project.version}</version>
134+
<scope>test</scope>
135+
</dependency>
130136
</dependencies>
131137

132138
<repositories>

flink-models/flink-model-openai/src/main/java/org/apache/flink/model/openai/AbstractOpenAIModelFunction.java

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818

1919
package org.apache.flink.model.openai;
2020

21+
import org.apache.flink.configuration.DescribedEnum;
2122
import org.apache.flink.configuration.ReadableConfig;
22-
import org.apache.flink.table.api.config.ExecutionConfigOptions;
23+
import org.apache.flink.configuration.description.InlineElement;
2324
import org.apache.flink.table.catalog.Column;
2425
import org.apache.flink.table.catalog.ResolvedSchema;
2526
import org.apache.flink.table.data.RowData;
@@ -41,13 +42,17 @@
4142
import java.util.concurrent.CompletableFuture;
4243
import java.util.stream.Collectors;
4344

45+
import static org.apache.flink.configuration.description.TextElement.text;
46+
4447
/** Abstract parent class for {@link AsyncPredictFunction}s for OpenAI API. */
4548
public abstract class AbstractOpenAIModelFunction extends AsyncPredictFunction {
4649
private static final Logger LOG = LoggerFactory.getLogger(AbstractOpenAIModelFunction.class);
4750

4851
protected transient OpenAIClientAsync client;
4952

53+
private final ErrorHandlingStrategy errorHandlingStrategy;
5054
private final int numRetry;
55+
private final RetryFallbackStrategy retryFallbackStrategy;
5156
private final String baseUrl;
5257
private final String apiKey;
5358
private final String model;
@@ -59,19 +64,16 @@ public AbstractOpenAIModelFunction(
5964
String endpoint = config.get(OpenAIOptions.ENDPOINT);
6065
this.baseUrl = endpoint.replaceAll(String.format("/%s/*$", getEndpointSuffix()), "");
6166
this.apiKey = config.get(OpenAIOptions.API_KEY);
62-
// The model service enforces rate-limiting constraints, necessitating retry mechanisms in
63-
// most operational scenarios. Within the asynchronous operator framework, the system is
64-
// designed to process up to
65-
// config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY) concurrent
66-
// requests in parallel. To mitigate potential performance degradation from simultaneous
67-
// requests, a dynamic retry strategy is implemented where the maximum retry count is
68-
// directly proportional to the configured parallelism level, ensuring robust error
69-
// resilience while maintaining throughput efficiency.
67+
68+
this.errorHandlingStrategy = config.get(OpenAIOptions.ERROR_HANDLING_STRATEGY);
7069
this.numRetry =
71-
config.get(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY) * 10;
70+
this.errorHandlingStrategy == ErrorHandlingStrategy.RETRY
71+
? config.get(OpenAIOptions.RETRY_NUM)
72+
: 0;
7273
this.model = config.get(OpenAIOptions.MODEL);
7374
this.maxContextSize = config.get(OpenAIOptions.MAX_CONTEXT_SIZE);
7475
this.contextOverflowAction = config.get(OpenAIOptions.CONTEXT_OVERFLOW_ACTION);
76+
this.retryFallbackStrategy = config.get(OpenAIOptions.RETRY_FALLBACK_STRATEGY);
7577

7678
validateSingleColumnSchema(
7779
factoryContext.getCatalogModel().getResolvedInputSchema(),
@@ -148,4 +150,58 @@ protected void validateSingleColumnSchema(
148150
column.getDataType().getLogicalType()));
149151
}
150152
}
153+
154+
protected Collection<RowData> handleErrorsAndRespond(Throwable t) {
155+
ErrorHandlingStrategy finalErrorHandlingStrategy =
156+
this.errorHandlingStrategy == ErrorHandlingStrategy.RETRY
157+
? this.retryFallbackStrategy.strategy
158+
: this.errorHandlingStrategy;
159+
160+
if (finalErrorHandlingStrategy == ErrorHandlingStrategy.FAILOVER) {
161+
throw new RuntimeException(t);
162+
} else if (finalErrorHandlingStrategy == ErrorHandlingStrategy.IGNORE) {
163+
return Collections.emptyList();
164+
} else {
165+
throw new UnsupportedOperationException(
166+
"Unsupported error handling strategy: " + finalErrorHandlingStrategy);
167+
}
168+
}
169+
170+
/** Strategy for handling errors during model requests. */
171+
public enum ErrorHandlingStrategy implements DescribedEnum {
172+
RETRY("Retry sending the request."),
173+
FAILOVER("Throw exceptions and fail the Flink job."),
174+
IGNORE(
175+
"Ignore the input that caused the error and continue. The error itself would be recorded in log.");
176+
177+
private final String description;
178+
179+
ErrorHandlingStrategy(String description) {
180+
this.description = description;
181+
}
182+
183+
@Override
184+
public InlineElement getDescription() {
185+
return text(description);
186+
}
187+
}
188+
189+
/**
190+
* The fallback strategy for when retry attempts are exhausted. It should be identical to {@link
191+
* ErrorHandlingStrategy} except that it does not support {@link ErrorHandlingStrategy#RETRY}.
192+
*/
193+
public enum RetryFallbackStrategy implements DescribedEnum {
194+
FAILOVER(ErrorHandlingStrategy.FAILOVER),
195+
IGNORE(ErrorHandlingStrategy.IGNORE);
196+
private final ErrorHandlingStrategy strategy;
197+
198+
RetryFallbackStrategy(ErrorHandlingStrategy strategy) {
199+
this.strategy = strategy;
200+
}
201+
202+
@Override
203+
public InlineElement getDescription() {
204+
return text(strategy.description);
205+
}
206+
}
151207
}

flink-models/flink-model-openai/src/main/java/org/apache/flink/model/openai/OpenAIChatModelFunction.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
import java.util.Arrays;
3636
import java.util.Collection;
37-
import java.util.List;
3837
import java.util.concurrent.CompletableFuture;
3938
import java.util.stream.Collectors;
4039

@@ -87,13 +86,15 @@ public CompletableFuture<Collection<RowData>> asyncPredictInternal(String input)
8786
.getOptional(OpenAIOptions.RESPONSE_FORMAT)
8887
.ifPresent(x -> builder.responseFormat(x.getResponseFormat()));
8988

90-
return client.chat()
91-
.completions()
92-
.create(builder.build())
93-
.thenApply(this::convertToRowData);
89+
return client.chat().completions().create(builder.build()).handle(this::convertToRowData);
9490
}
9591

96-
private List<RowData> convertToRowData(ChatCompletion chatCompletion) {
92+
private Collection<RowData> convertToRowData(
93+
ChatCompletion chatCompletion, Throwable throwable) {
94+
if (throwable != null) {
95+
return handleErrorsAndRespond(throwable);
96+
}
97+
9798
return chatCompletion.choices().stream()
9899
.map(
99100
choice ->

flink-models/flink-model-openai/src/main/java/org/apache/flink/model/openai/OpenAIEmbeddingModelFunction.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import javax.annotation.Nullable;
3434

3535
import java.util.Collection;
36-
import java.util.List;
3736
import java.util.concurrent.CompletableFuture;
3837
import java.util.stream.Collectors;
3938

@@ -73,10 +72,15 @@ public CompletableFuture<Collection<RowData>> asyncPredictInternal(String input)
7372
builder.dimensions(dimensions);
7473
}
7574

76-
return client.embeddings().create(builder.build()).thenApply(this::convertToRowData);
75+
return client.embeddings().create(builder.build()).handle(this::convertToRowData);
7776
}
7877

79-
private List<RowData> convertToRowData(CreateEmbeddingResponse response) {
78+
private Collection<RowData> convertToRowData(
79+
CreateEmbeddingResponse response, Throwable throwable) {
80+
if (throwable != null) {
81+
return handleErrorsAndRespond(throwable);
82+
}
83+
8084
return response.data().stream()
8185
.map(
8286
embedding ->

flink-models/flink-model-openai/src/main/java/org/apache/flink/model/openai/OpenAIModelProviderFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public Set<ConfigOption<?>> optionalOptions() {
6969
Set<ConfigOption<?>> set = new HashSet<>();
7070
set.add(OpenAIOptions.MAX_CONTEXT_SIZE);
7171
set.add(OpenAIOptions.CONTEXT_OVERFLOW_ACTION);
72+
set.add(OpenAIOptions.ERROR_HANDLING_STRATEGY);
73+
set.add(OpenAIOptions.RETRY_NUM);
74+
set.add(OpenAIOptions.RETRY_FALLBACK_STRATEGY);
7275
set.add(OpenAIOptions.SYSTEM_PROMPT);
7376
set.add(OpenAIOptions.TEMPERATURE);
7477
set.add(OpenAIOptions.TOP_P);

flink-models/flink-model-openai/src/main/java/org/apache/flink/model/openai/OpenAIOptions.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,34 @@ public class OpenAIOptions {
8383
.text("Action to handle context overflows.")
8484
.build());
8585

86+
@Documentation.Section({Documentation.Sections.MODEL_OPENAI_COMMON})
87+
public static final ConfigOption<AbstractOpenAIModelFunction.ErrorHandlingStrategy>
88+
ERROR_HANDLING_STRATEGY =
89+
ConfigOptions.key("error-handling-strategy")
90+
.enumType(AbstractOpenAIModelFunction.ErrorHandlingStrategy.class)
91+
.defaultValue(AbstractOpenAIModelFunction.ErrorHandlingStrategy.RETRY)
92+
.withDescription("Strategy for handling errors during model requests.");
93+
94+
// The model service enforces rate-limiting constraints, necessitating retry mechanisms in
95+
// most operational scenarios.
96+
@Documentation.Section({Documentation.Sections.MODEL_OPENAI_COMMON})
97+
public static final ConfigOption<Integer> RETRY_NUM =
98+
ConfigOptions.key("retry-num")
99+
.intType()
100+
.defaultValue(100)
101+
.withDescription("Number of retry for OpenAI client requests.");
102+
103+
@Documentation.Section({Documentation.Sections.MODEL_OPENAI_COMMON})
104+
public static final ConfigOption<AbstractOpenAIModelFunction.RetryFallbackStrategy>
105+
RETRY_FALLBACK_STRATEGY =
106+
ConfigOptions.key("retry-fallback-strategy")
107+
.enumType(AbstractOpenAIModelFunction.RetryFallbackStrategy.class)
108+
.defaultValue(
109+
AbstractOpenAIModelFunction.RetryFallbackStrategy.FAILOVER)
110+
.withDescription(
111+
"Fallback strategy to employ if the retry attempts are exhausted."
112+
+ " This strategy is applied when error-handling-strategy is set to retry.");
113+
86114
// ------------------------------------------------------------------------
87115
// Options for Chat Completion Model Functions
88116
// ------------------------------------------------------------------------

0 commit comments

Comments
 (0)