Skip to content

集成FlinkCDC

Paimon 支持多种通过模式演化将数据提取到 Paimon 表中的方法。这意味着添加的列会实时同步到Paimon表中,并且不会为此重新启动同步作业。 目前支持以下同步方式:

  • MySQL同步表:将MySQL中的一张或多张表同步到一张Paimon表中。
  • MySQL同步数据库:将整个MySQL数据库同步到一个Paimon数据库中。
  • API同步表:将您的自定义DataStream输入同步到一张Paimon表中。
  • Kafka同步表:将一个Kafka topic的表同步到一张Paimon表中。
  • Kafka同步数据库:将一个包含多表的Kafka主题或多个各包含一表的主题同步到一个Paimon数据库中。

1. MySQL

添加Flink CDC连接器,然后重启yarn-session集群和sql-client:

sh
[jack@Node02 ~]$ cd /opt/module/flink-1.20.2/lib
[jack@Node02 lib]$ ll 
总用量 312152
-rw-r--r--. 1 jack jack    198366  6月 13 19:00 flink-cep-1.20.2.jar
-rw-r--r--. 1 jack jack    563714  6月 13 19:01 flink-connector-files-1.20.2.jar
-rw-r--r--. 1 jack jack    412253  9月 10 07:55 flink-connector-mysql-cdc-3.4.0.jar
-rw-r--r--. 1 jack jack    102375  6月 13 19:02 flink-csv-1.20.2.jar
-rw-r--r--. 1 jack jack 125897594  6月 13 19:05 flink-dist-1.20.2.jar
-rw-r--r--. 1 jack jack    204407  6月 13 19:02 flink-json-1.20.2.jar
-rw-r--r--. 1 jack jack  21060634  6月 13 19:04 flink-scala_2.12-1.20.2.jar
-rw-r--r--. 1 jack jack  53226520  9月  1 00:46 flink-sql-connector-hive-3.1.3_2.12-1.20.2.jar
-rw-r--r--. 1 jack jack  15714643  6月 13 19:05 flink-table-api-java-uber-1.20.2.jar
-rw-r--r--. 1 jack jack  38424518  6月 13 19:04 flink-table-planner-loader-1.20.2.jar
-rw-r--r--. 1 jack jack   3548975  6月 13 19:00 flink-table-runtime-1.20.2.jar
-rw-r--r--. 1 jack jack   1832290  8月 31 23:44 hadoop-mapreduce-client-core-3.4.1.jar
-rw-r--r--. 1 jack jack    356379  5月 19 13:13 log4j-1.2-api-2.24.3.jar
-rw-r--r--. 1 jack jack    348513 12月 10  2024 log4j-api-2.24.3.jar
-rw-r--r--. 1 jack jack   1914666 12月 10  2024 log4j-core-2.24.3.jar
-rw-r--r--. 1 jack jack     25319 12月 10  2024 log4j-slf4j-impl-2.24.3.jar
-rw-r--r--. 1 jack jack   2481560  9月  1 06:25 mysql-connector-j-8.0.33.jar
-rw-r--r--. 1 jack jack  53287592  8月 31 23:38 paimon-flink-1.20-1.2.0.jar
-rw-r--r--. 1 jack jack     11628  9月  9 07:42 paimon-flink-action-1.2.0.jar

1.1 同步MySQL的表

语法说明如下:

sh
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    mysql-sync-table
    --warehouse <warehouse-path> \
    --database <database-name> \
    --table <table-name> \
    [--partition-keys <partition-keys>] \
    [--primary-keys <primary-keys>] \
    [--computed-column <'column-name=expr-name(args[, ...])'> [--computed-column ...]] \
    [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
    [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

参数说明:

配置描述
--warehousePaimon仓库路径
--databasePaimon Catalog中的数据库名称
--tablePaimon表名称
--partition-keysPaimon表的分区键。如果有多个分区键,
请用逗号连接,例如"dt,hh,mm"。
--primary-keysPaimon表的主键。如果有多个主键,请用逗号连接,
例如"buyer_id,seller_id"。
--computed-column计算列的定义。参数字段来自MySQL表字段名称。
--mysql-confFlink CDC MySQL源表的配置。每个配置都应以"key=value"的格式指定。
主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
--catalog-confPaimon Catalog的配置。
每个配置都应以"key=value"的格式指定。
--table-confPaimon表sink的配置。
每个配置都应以"key=value"的格式指定。

如果指定的Paimon表不存在,此操作将自动创建该表。其schema将从所有指定的MySQL表派生。如果Paimon表已存在,则其schema将与所有指定MySQL表的schema进行比较。

sh
## MySQL一张表同步到Paimon一张表, =两边不能有空格
bin/flink run \
    -Drest.address=gbasehd112 \
    -Drest.port=44214 \
    /opt/gbaseHD/flink-1.16.2/lib/paimon-flink-action-0.8.2.jar \
    mysql-sync-table \
    --warehouse hdfs://gbasehd111:8020/user/hive/warehouse/ \
    --database test \
    --table sensor_cdc \
    --primary-keys id \
    --mysql-conf hostname=gbasehd111 \
    --mysql-conf username=root \
    --mysql-conf password=gbasehd123456 \
    --mysql-conf database-name=test \
    --mysql-conf table-name='sensor' \
    --mysql-conf jdbc.properties.useSSL=false \
    --mysql-conf jdbc.properties.useUnicode=true \
    --mysql-conf jdbc.properties.characterEncoding=utf8 \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://gbasehd113:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4

:::tip 什么是changelog-producer changelog-producer表示将数据另外写入到变更日志文件,该日志文件可以在读取期间直接读取,配置的选项有:

  • "none":没有变更日志文件。
  • "input":刷新内存表时双写变更日志文件,变更日志来自输入。
  • "full-compaction":每次需要压缩生成变更日志文件。
  • "lookup":在提交数据写入之前通过"lookup"生成变更日志文件。
    ::: 如果报错如下,提示rest.address must be set:
    alt text 解决办法是加入参数:
sh
-Drest.address=gbasehd112 \
-Drest.port=44214 \

执行后会发现Paimon会自动给我们建表:

sh

Flink SQL> use catalog hive_catalog;
[INFO] Execute statement succeeded.

Flink SQL> use test;
[INFO] Execute statement succeeded.

Flink SQL> show tables;
+-------------------+
|        table name |
+-------------------+
| dm_source_tbl_cdc |
+-------------------+
1 row in set

Flink SQL> desc dm_source_tbl_cdc;
+-------------+--------------+-------+---------+--------+-----------+---------+
|        name |         type |  null |     key | extras | watermark | comment |
+-------------+--------------+-------+---------+--------+-----------+---------+
|          id |       BIGINT | FALSE | PRI(id) |        |           |    主键 |
|    tbl_name |  VARCHAR(50) | FALSE |         |        |           |    表名 |
| tbl_comment | VARCHAR(500) |  TRUE |         |        |           |    注释 |
|  tbl_remark | VARCHAR(300) |  TRUE |         |        |           |    备注 |
+-------------+--------------+-------+---------+--------+-----------+---------+
4 rows in set

如果修改MySQL的数据,把id=1的数据修改,发现Paimon会自动变更数据:

sh
Flink SQL> select * from dm_source_tbl_cdc;
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                   id |                       tbl_name |                    tbl_comment |                     tbl_remark |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                    1 |                           test |                           测试 |                           备注 |
| +I |                    4 |                           jack |                           杰哥 |                           杰哥 |
| +I |                    3 |                         tuanzi |                         tuanzi |                         tuanzi |
| +I |                    2 |                           user |                         用户表 |                           test |
| +I |                    5 |                       新网银行 |                           二楼 |                             it |
| +I |                    6 |                       久远银海 |                           22楼 |                           民政 |
| +I |                    8 |                           北京 |                           成都 |                           test |
| +I |                  999 |                           中国 |                           test |                           四川 |
| +I |                    7 |                       直通万联 |                           11楼 |                           车险 |
| +I |                    9 |                           南航 |                     白云山机场 |                          learn |
| -D |                    1 |                           test |                           测试 |                           备注 |
| +I |                    1 |                           圆圆 |                           测试 |                           备注 |

在HDFS上面可以看到不仅存储数据,由于设置的是changelog-producer=input, 所以还保存了changelog的信息:
alt text

1.2 同步数据库

语法说明如下:

sh
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.8.2.jar \
    mysql-sync-database
    --warehouse <warehouse-path> \
    --database <database-name> \
    [--ignore-incompatible <true/false>] \
    [--table-prefix <paimon-table-prefix>] \
    [--table-suffix <paimon-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    [--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...]] \
    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]] \
    [--table-conf <paimon-table-sink-conf> [--table-conf <paimon-table-sink-conf> ...]]

参数说明:

配置描述
--warehousePaimon仓库路径
--databasePaimon Catalog中的数据库名称
--ignore-incompatible默认为false,在这种情况下,如果Paimon中存在MySQL表名,
并且它们的schema不兼容,则会抛出异常。
您可以显式将其指定为true以忽略不兼容的表和异常。
--table-prefix所有需要同步的Paimon表的前缀。
例如希望所有同步表都以"ods_"作为前缀,则可以指定"--table-prefix ods_"。
--table-suffix所有需要同步的Paimon表的后缀。
用法与"--table-prefix"相同。
--including-tables用于指定要同步哪些源表。
您必须使用"
--excluding-tables用于指定哪些源表不同步。用法与"--include-tables"相同。
如果同时指定了"--except-tables",则"--except-tables"的优先级高于"--include-tables"。
--excluding-tables用于指定哪些源表不同步。用法与"--include-tables"相同。
如果同时指定了"--except-tables",
则"--except-tables"的优先级高于"--include-tables"。
--mysql-confFlink CDC MySQL源表的配置。每个配置都应以"key=value"的格式指定。
主机名、用户名、密码、数据库名和表名是必需配置,其他是可选配置。
--catalog-confPaimon Catalog的配置。每个配置都应以"key=value"的格式指定。
--table-confPaimon表sink的配置。每个配置都应以"key=value"的格式指定。

只有具有主键的表才会被同步。对于每个需要同步的MySQL表,如果对应的Paimon表不存在,该操作会自动创建该表。其schema将从所有指定的MySQL表派生。如果 Paimon 表已存在,则其schema将与所有指定MySQL表的schema进行比较。

sh
## MySQL中的数据库同步到Paimon中
bin/flink run \
    -Drest.address=gbasehd112 \
    -Drest.port=44214 \
    /opt/gbaseHD/flink-1.16.2/lib/paimon-flink-action-0.8.2.jar \
    mysql-sync-database \
    --warehouse hdfs://gbasehd111:8020/user/hive/warehouse/ \
    --database test \
    --table-prefix "ods_" \
    --table-suffix "_cdc" \
    --mysql-conf hostname=gbasehd111 \
    --mysql-conf username=root \
    --mysql-conf password=gbasehd123456 \
    --mysql-conf database-name=test \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://gbasehd113:9083 \
    --table-conf bucket=4 \
    --table-conf changelog-producer=input \
    --table-conf sink.parallelism=4 \
    --including-tables 'user_info|order_info|activity_rule'

如果前面同步表的命令已经在执行了, 如何在不停止任务的情况下额外的表呢?效果达到已有的表继续从停止的位置继续同步,新加入的表同步也包含历史数据?可以参考下面的做法:

1.3 添加同步表

sh
/bin/flink run \
    --fromSavepoint /sp/paimon \
    -Drest.address=gbasehd112 \
    -Drest.port=44214 \
    /opt/gbaseHD/flink-1.16.2/lib/paimon-flink-action-0.8.2.jar \
    mysql-sync-database \
    --warehouse hdfs://gbasehd111:8020/user/hive/warehouse/ \
    --database test \
    --mysql-conf hostname=gbasehd111 \
    --mysql-conf username=root \
    --mysql-conf password=gbasehd123456 \
    --mysql-conf database-name=test \
    --catalog-conf metastore=hive \
    --catalog-conf uri=thrift://gbasehd113:9083 \
    --table-conf bucket=4 \
    --including-tables 'product|user|address|order|custom'

使用savepoint, 我们可以通过从作业的先前快照中恢复并从而重用作业的现有状态来实现这一点。恢复的作业将首先对新添加的表进行快照,然后自动从之前的位置继续读取变更日志。

2. Kafka

Flink提供了几种Kafka CDC格式:canal-json、debezium-json、ogg-json、maxwell-json、aws-dms-json、。因为CDC的数据来源实际是CDC工具产生的,Kafka本身不产生CDC数据,针对这些CDC工具数据格式,Paimon都做了支持,Paimon版本1.2列出支持的格式如下: alt text

2.1 添加Kafka连接器