Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions docs/content.zh/docs/dev/table/functions/udfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ under the License.
当前 Flink 有如下几种函数:

- *标量函数* 将标量值转换成一个新标量值;
- *Asynchronous scalar functions* asynchronously map scalar values to a new scalar value.
- *异步标量函数* 异步地将标量值映射为新的标量值。
- *表值函数* 将标量值转换成新的行数据;
- *Async Table functions* asynchronously map scalar values to new rows and can be used for table sources that perform a lookup.
- *聚合函数* 将多行数据里的标量值转换成一个新标量值;
Expand Down Expand Up @@ -976,26 +976,26 @@ env.sqlQuery("SELECT HashFunction(myField) FROM MyTable")

{{< top >}}

Asynchronous Scalar Functions
异步标量函数
----------------

When interacting with external systems (for example when enriching stream events with data stored in a database), one needs to take care that network or other latency does not dominate the streaming application’s running time.
当与外部系统交互时(例如使用数据库中存储的数据来丰富流事件时),需要注意网络或其他延迟不要主导流应用程序的运行时间。

Naively accessing data in the external database, for example using a `ScalarFunction`, typically means **synchronous** interaction: A request is sent to the database and the `ScalarFunction` waits until the response has been received. In many cases, this waiting makes up the vast majority of the function’s time.
简单地访问外部数据库中的数据,例如使用 `ScalarFunction`,通常意味着**同步**交互:向数据库发送请求,然后 `ScalarFunction` 等待直到收到响应。在许多情况下,这种等待占据了函数执行时间的绝大部分。

To address this inefficiency, there is an `AsyncScalarFunction`. Asynchronous interaction with the database means that a single function instance can handle many requests concurrently and receive the responses concurrently. That way, the waiting time can be overlaid with sending other requests and receiving responses. At the very least, the waiting time is amortized over multiple requests. This leads in most cases to much higher streaming throughput.
为了解决这种低效问题,引入了 `AsyncScalarFunction`。与数据库的异步交互意味着单个函数实例可以并发处理多个请求并并发接收响应。这样,等待时间可以与发送其他请求和接收响应重叠。至少,等待时间会分摊到多个请求上。这在大多数情况下会带来更高的流处理吞吐量。

{{< img src="/fig/async_io.svg" width="50%" >}}

#### Defining an AsyncScalarFunction
#### 定义 AsyncScalarFunction

A user-defined asynchronous scalar function maps zero, one, or multiple scalar values to a new scalar value. Any data type listed in the [data types section]({{< ref "docs/dev/table/types" >}}) can be used as a parameter or return type of an evaluation method.
用户自定义的异步标量函数将零个、一个或多个标量值映射为新的标量值。[数据类型部分]({{< ref "docs/dev/table/types" >}})中列出的任何数据类型都可以用作求值方法的参数或返回类型。

In order to define an asynchronous scalar function, extend the base class `AsyncScalarFunction` in `org.apache.flink.table.functions` and implement one or more evaluation methods named `eval(...)`. The first argument must be a `CompletableFuture<...>` which is used to return the result, with subsequent arguments being the parameters passed to the function.
要定义异步标量函数,需要继承 `org.apache.flink.table.functions` 中的基类 `AsyncScalarFunction`,并实现一个或多个名为 `eval(...)` 的求值方法。第一个参数必须是 `CompletableFuture<...>`,用于返回结果,后续参数是传递给函数的参数。

The number of outstanding calls to `eval` may be configured by [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}).
未完成的 `eval` 调用数量可以通过 [`table.exec.async-scalar.max-concurrent-operations`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-concurrent-operations" >}}) 配置。

The following example shows how to do work on a thread pool in the background, though any libraries exposing an async interface may be directly used to complete the `CompletableFuture` from a callback. See the [Implementation Guide](#implementation-guide) for more details.
以下示例展示了如何在后台线程池上执行工作,不过任何暴露异步接口的库都可以直接用于从回调中完成 `CompletableFuture`。更多详细信息请参阅[实现指南](#implementation-guide)

```java
import org.apache.flink.table.api.*;
Expand All @@ -1009,29 +1009,29 @@ import java.util.concurrent.Executors;
import static org.apache.flink.table.api.Expressions.*;

/**
* A function which simulates looking up a beverage name from a database.
* Since such lookups are often slow, we use an AsyncScalarFunction.
* 一个模拟从数据库查询饮料名称的函数。
* 由于这类查询通常很慢,我们使用 AsyncScalarFunction
*/
public static class BeverageNameLookupFunction extends AsyncScalarFunction {
private transient Executor executor;

@Override
public void open(FunctionContext context) {
// Create a thread pool for executing the background lookup.
// 创建一个线程池用于执行后台查询。
executor = Executors.newFixedThreadPool(10);
}

// The eval method takes a future for the result and the beverage ID to lookup.
// eval 方法接收一个用于返回结果的 future 和要查询的饮料 ID。
public void eval(CompletableFuture<String> future, Integer beverageId) {
// Submit a task to the thread pool. We don't want to block this main
// thread since that would prevent concurrent execution. The future can be
// completed from another thread when the lookup is done.
// 向线程池提交任务。我们不想阻塞主线程,
// 因为这会阻止并发执行。当查询完成时,
// 可以从另一个线程完成 future。
executor.execute(() -> {
// Simulate a database lookup by sleeping for 1s.
// 通过睡眠 1 秒来模拟数据库查询。
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
// Complete the future with the right beverage name.
// 用正确的饮料名称完成 future
switch (beverageId) {
case 0:
future.complete("Latte");
Expand All @@ -1043,7 +1043,7 @@ public static class BeverageNameLookupFunction extends AsyncScalarFunction {
future.complete("Espresso");
break;
default:
// In the exceptional case, return an error.
// 在异常情况下,返回错误。
future.completeExceptionally(new IllegalArgumentException("Bad beverageId: " + beverageId));
}
});
Expand All @@ -1054,28 +1054,28 @@ TableEnvironment env = TableEnvironment.create(...);
env.getConfig().set("table.exec.async-scalar.max-concurrent-operations", "5");
env.getConfig().set("table.exec.async-scalar.timeout", "1m");

// call function "inline" without registration in Table API
// Table API 中"内联"调用函数,无需注册
env.from("Beverages").select(call(BeverageNameLookupFunction.class, $("beverageId")));

// register function
// 注册函数
env.createTemporarySystemFunction("GetBeverageName", BeverageNameLookupFunction.class);

// call registered function in Table API
// Table API 中调用已注册的函数
env.from("Beverages").select(call("GetBeverageName", $("beverageId")));

// call registered function in SQL
// SQL 中调用已注册的函数
env.sqlQuery("SELECT GetBeverageName(beverageId) FROM Beverages");

```

#### Asynchronous Semantics
While calls to an `AsyncScalarFunction` may be completed out of the original input order, to maintain correct semantics, the outputs of the function are guaranteed to maintain that input order to downstream components of the query. The data itself could reveal completion order (e.g. by containing fetch timestamps), so the user should consider whether this is acceptable for their use-case.
#### 异步语义
虽然对 `AsyncScalarFunction` 的调用可能以与原始输入不同的顺序完成,但为了保持正确的语义,函数的输出会保证按照输入顺序传递给查询的下游组件。数据本身可能会显示完成顺序(例如包含获取时间戳),因此用户应该考虑这对于他们的使用场景是否可接受。

#### Error Handling
The primary way for a user to indicate an error is to call `CompletableFuture.completeExceptionally(Throwable)`. Similarly, if an exception is encountered by the system when invoking `eval`, that will also result in an error. When an error occurs, the system will consider the retry strategy, configured by [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}). If this is `NO_RETRY`, the job is failed. If it is set to `FIXED_DELAY`, a period of [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) will be waited, and the function call will be retried. If there have been [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) failed attempts or if the timeout [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}}) expires (including all retry attempts), the job will fail.
#### 错误处理
用户指示错误的主要方式是调用 `CompletableFuture.completeExceptionally(Throwable)`。同样,如果系统在调用 `eval` 时遇到异常,也会导致错误。当发生错误时,系统会考虑重试策略,该策略由 [`table.exec.async-scalar.retry-strategy`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-strategy" >}}) 配置。如果设置为 `NO_RETRY`,作业将失败。如果设置为 `FIXED_DELAY`,将等待 [`table.exec.async-scalar.retry-delay`]({{< ref "docs/dev/table/config#table-exec-async-scalar-retry-delay" >}}) 时间段后重试函数调用。如果已经有 [`table.exec.async-scalar.max-attempts`]({{< ref "docs/dev/table/config#table-exec-async-scalar-max-attempts" >}}) 次失败尝试,或者超时 [`table.exec.async-scalar.timeout`]({{< ref "docs/dev/table/config#table-exec-async-scalar-timeout" >}})(包括所有重试尝试)到期,作业将失败。

#### AsyncScalarFunction vs. ScalarFunction
One thing to consider is if the UDF contains CPU intensive logic with no blocking calls. If so, it likely doesn't require asynchronous functionality and could use a `ScalarFunction`. If the logic involves waiting for things like network or background operations (e.g. database lookups, RPCs, or REST calls), this may be a useful way to speed things up. There are also some queries that don't support `AsyncScalarFunction`, so when in doubt, `ScalarFunction` should be used.
需要考虑的一点是 UDF 是否包含 CPU 密集型逻辑且没有阻塞调用。如果是这样,它可能不需要异步功能,可以使用 `ScalarFunction`。如果逻辑涉及等待网络或后台操作(例如数据库查询、RPC 或 REST 调用),这可能是加速的有效方法。还有一些查询不支持 `AsyncScalarFunction`,因此在不确定时,应该使用 `ScalarFunction`

{{< top >}}

Expand Down