Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions docs/content/cdc-ingestion/flink-cdc-paimon-pipeline-connector.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
---
title: "Flink CDC Paimon Pipeline Connector"
weight: 10
type: docs
aliases:
- /cdc-ingestion/flink-cdc-paimon-pipeline-connector.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Flink CDC Paimon Pipeline Connector

Flink CDC is a streaming data integration tool for the Flink engine. It allows users to describe their ETL pipeline
logic via YAML elegantly and help users automatically generating customized Flink operators and submitting job.

The Paimon Pipeline connector can be used as both the Data Source or the Data Sink of the Flink CDC pipeline. This
document describes how to set up the Paimon Pipeline connector as the Data Source. If you are interested in using
Paimon as the Data Sink, please refer to Flink CDC's
[Paimon Pipeline Connector](https://nightlies.apache.org/flink/flink-cdc-docs-release-3.5/docs/connectors/pipeline-connectors/paimon/)
document.

## What can the connector do?

* Synchronizes data from a Paimon warehouse, database or table to an external system supported by Flink CDC
* Synchronizes schema changes
* Automatically discovers newly created tables in the source Paimon warehouse.

How to create Pipeline
----------------

The pipeline for reading data from Paimon and sink to Doris can be defined as follows:

```yaml
source:
type: paimon
name: Paimon Source
database: default
table: test_table
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse

sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass

pipeline:
name: Paimon to Doris Pipeline
parallelism: 2
```

Pipeline Connector Options
----------------

{{< generated/cdc_configuration >}}

Usage Notes
--------

* Data updates for primary key tables (-U, +U) will be replaced with -D and +I.
* Does not support dropping tables. If you need to drop a table from the Paimon warehouse, please restart the Flink CDC job after performing the drop operation. When the job restarts, it will stop reading data from the dropped table, and the target table in the external system will remain unchanged from its state before the job was stopped.
* Data from the same table will be consumed by the same Flink source task. If the amount of data varies significantly across different tables, performance bottlenecks caused by data skew may be observed in Flink CDC jobs.

Data Type Mapping
----------------
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">Paimon type</th>
<th class="text-left">CDC type</th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
</thead>
<tbody>
<tr>
<td>TINYINT</td>
<td>TINYINT</td>
<td></td>
</tr>
<tr>
<td>SMALLINT</td>
<td>SMALLINT</td>
<td></td>
</tr>
<tr>
<td>INT</td>
<td>INT</td>
<td></td>
</tr>
<tr>
<td>BIGINT</td>
<td>BIGINT</td>
<td></td>
</tr>
<tr>
<td>FLOAT</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>DOUBLE</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>DECIMAL(p, s)</td>
<td>DECIMAL(p, s)</td>
<td></td>
</tr>
<tr>
<td>BOOLEAN</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>DATE</td>
<td>DATE</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP</td>
<td>TIMESTAMP</td>
<td></td>
</tr>
<tr>
<td>TIMESTAMP_LTZ</td>
<td>TIMESTAMP_LTZ</td>
<td></td>
</tr>
<tr>
<td>CHAR(n)</td>
<td>CHAR(n)</td>
<td></td>
</tr>
<tr>
<td>VARCHAR(n)</td>
<td>VARCHAR(n)</td>
<td></td>
</tr>
</tbody>
</table>
</div>

{{< top >}}
48 changes: 48 additions & 0 deletions docs/layouts/shortcodes/generated/cdc_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{{/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/}}
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>database</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the database to be scanned. By default, all databases will be scanned.</td>
</tr>
<tr>
<td><h5>table</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Name of the table to be scanned. By default, all tables will be scanned.</td>
</tr>
<tr>
<td><h5>table.discovery-interval</h5></td>
<td style="word-wrap: break-word;">1 min</td>
<td>Duration</td>
<td>The discovery interval of new tables. Only effective when database or table is not set.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class ConfigOptionsDocGenerator {
"paimon-flink/paimon-flink-common", "org.apache.paimon.flink"),
new OptionsClassLocation(
"paimon-flink/paimon-flink-cdc", "org.apache.paimon.flink.kafka"),
new OptionsClassLocation(
"paimon-flink/paimon-flink-cdc", "org.apache.paimon.flink.pipeline.cdc"),
new OptionsClassLocation(
"paimon-hive/paimon-hive-catalog", "org.apache.paimon.hive"),
new OptionsClassLocation(
Expand Down
54 changes: 47 additions & 7 deletions paimon-flink/paimon-flink-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,31 @@ under the License.

<!-- CDC dependencies -->

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${flink.cdc.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>${flink.cdc.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
Expand Down Expand Up @@ -220,13 +245,6 @@ under the License.
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
Expand Down Expand Up @@ -307,8 +325,30 @@ under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${flink.cdc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
<version>${flink.cdc.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<!-- <dependencyManagement>-->
<!-- <dependencies>-->
<!-- <dependency>-->
<!-- <groupId>com.google.guava</groupId>-->
<!-- <artifactId>guava</artifactId>-->
<!-- <version>32.1.2-jre</version>-->
<!-- </dependency>-->
<!-- </dependencies>-->
<!-- </dependencyManagement>-->

<build>
<plugins>
<plugin>
Expand Down
Loading
Loading