Skip to content

与FlinkSQL集成

Apache Iceberg同时支持Apache Flink的DataStream API和Table API。

1. 环境准备

1.1 下载iceberg包

访问https://repo1.maven.org/maven2/org/apache/iceberg/, 下载对应Flink版本的iceberg运行jar包: Alt text

提示

目前最新版本为1.8.0,但是iceberg从1.7.0开始不再支持jdk1.8, iceberg1.6.1支持jdk1.8

sh
cp /opt/software/iceberg-flink-runtime-1.19-1.6.1.jar /opt/module/flink-1.17.2/lib/

1.3 支持HiveCatalog

访问https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/1.17.2/, 下载对应Hive版本的sql驱动jar包: Alt text 拷贝hive驱动jar包到Flink

sh
cp /opt/software/flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar /opt/module/flink-1.17.2/lib/

1.4 配置环境变量

  1. 修改my_env.sh配置
sh
sudo vim /etc/profile.d/my_env.sh
## 添加如下内容
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  1. 修改conf.yaml配置
yml
## 关闭类加载警告
classloader: 
  check-leaked-classloader: false
taskmanager: 
  numberOfTaskSlots: 4
execution:
  checkpointing:
    interval: 30s
## 开启checkpoint特性
state:
  backend:
    type: rocksdb
    incremental: true
  checkpoints:
    dir: hdfs://hadoop102:8020/flink/ckps

提示

从Flink 1.19开始,默认配置文件已改为config.yaml, 并使用YAML 1.2语法

1.5 启动环境依赖

启动Hadoop集群和Hive的metastore服务

2. 启动sql-client

2.1 使用local模式

修改workers文件

sh
vim /opt/module/flink-1.17.2/conf/workers
#表示:会在本地启动3个TaskManager的 local集群
localhost
localhost
localhost
sh
[jack@hadoop102 flink-1.17.2]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop102.
Starting taskexecutor daemon on host hadoop102.
[INFO] 1 instance(s) of taskexecutor are already running on hadoop102.
Starting taskexecutor daemon on host hadoop102.
[INFO] 2 instance(s) of taskexecutor are already running on hadoop102.
Starting taskexecutor daemon on host hadoop102.

查看webui:http://hadoop102:8081

2.3 启动sql-client

sh
./bin/sql-client.sh embedded

Alt text

3. 创建和使用Catalog

3.1 语法说明

iceberg使用flink的语法创建catalog:

sql
CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg', -- 这是必须显示配置
  `<config_key>`=`<config_value>`
);

其中的config_key有以下这些属性可选配置:

  • catalog-type: 内置了hive和hadoop两种catalog,也可以使用catalog-impl来自定义catalog。
  • catalog-impl: 自定义catalog实现的全限定类名。如果未设置catalog-type,则必须设置。
  • property-version: 描述属性版本的版本号。此属性可用于向后兼容,以防属性格式更改。当前属性版本为1。
  • cache-enabled: 是否启用目录缓存,默认值为true。
  • cache.expiration-interval-ms: 本地缓存catalog条目的时间(以毫秒为单位);负值,如-1表示没有时间限制,不允许设为0。默认值为-1。
  • uri: Hive metastore的thrift uri。
  • clients:Hive metastore客户端池大小,默认为2。
  • warehouse: 数仓目录。
  • hive-conf-dir:包含hive-site.xml配置文件的目录路径,hive-site.xml中hive.metastore.warehouse.dir的值会被warehouse覆盖。
  • hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目录路径。

3.2 Hive Catalog

  1. 创建hive catalog
sh
Flink SQL> CREATE CATALOG hive_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hive',
>   'uri'='thrift://hadoop103:9083',
>   'clients'='3',
>   'property-version'='1',
>   'warehouse'='hdfs://hadoop102:8020/flink/warehouse/'
> );
[INFO] Execute statement succeed.
Flink SQL> use catalog hive_catalog;
[INFO] Execute statement succeed.

3.3 Hadoop Catalog

Iceberg还支持HDFS中基于目录的catalog,可以使用'catalog-type'='hadoop'配置。

sh
Flink SQL> CREATE CATALOG hadoop_catalog WITH (
>   'type'='iceberg',
>   'catalog-type'='hadoop',
>   'warehouse'='hdfs://hadoop102:8020/warehouse/flink-iceberg',
>   'property-version'='1'
> );
[INFO] Execute statement succeed.

3.4 配置sql-client初始化文件

如果退出flink-sql客户端,之前创建的catalog都会被自动清空,可以添加初始化脚本:

sh
vim /opt/module/flink-1.17.2/conf/sql-client-init.sql
## 添加初始化脚本内容
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://hadoop103:9083',
  'clients'='3',
  'property-version'='1',
  'warehouse'='hdfs://hadoop102:8020/flink/warehouse/'
);
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://hadoop102:8020/warehouse/flink-iceberg',
  'property-version'='1'
);
USE CATALOG hive_catalog;
-- 结果显示模式设置为tableau
SET sql-client.execution.result-mode=tableau;

后续启动sql-client时,加上-i sql文件路径 即可完成catalog的初始化。

sh
./bin/sql-client.sh embedded -i conf/sql-client-init.sql
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|  hadoop_catalog |
|    hive_catalog |
+-----------------+
3 rows in set

4. 创建数据库

sh
Flink SQL> create database flink_db;
[INFO] Execute statement succeed.

Flink SQL> use flink_db;
[INFO] Execute statement succeed.

5. 创建表

截至iceberg1.8,仍然不支持计算列、watermark(支持主键)。flink建表不区分内部外部表,也就是没有external关键字的使用,默认创建的就是外部表。

5.1 创建普通表

sql
-- 默认使用hive_catalog
CREATE TABLE ice_flink_sample1 (
    id BIGINT COMMENT 'unique id',
    data STRING
);
-- 创建hadoop_catalog中的表
CREATE TABLE hadoop_catalog.default_database.ice_sample2 (
    id INT,
    name STRING
);
-- 如果没有在iceberg相关的catalog里, 需要使用connector关键字指明创建iceberg表
CREATE TABLE default_catalog.default_database.ice_sample3 (
    id INT,
    name STRING
)
WITH (
    'connector' = 'iceberg',
    'catalog-name' = 'default_catalog',
    'catalog-type' = 'hadoop',
    'warehouse' = 'hdfs://hadoop102:8020/warehouse/flink-iceberg/',
    'format-version' = '2'
);

5.2 创建分区表

sql
CREATE TABLE ice_flink_sample2 (
    id BIGINT COMMENT 'unique id',
    category STRING
) 
partitioned by (category);
-- 创建主键分区表
CREATE TABLE ice_flink_sample3 (
    id BIGINT COMMENT 'unique id',
    category STRING,
    ts  timestamp(3),
    primary key (id) not enforced -- 支持设置主键
) 
partitioned by (category);

提示

Apache Iceberg支持隐藏分区特性,但目前Apache flink不支持在列上通过函数进行分区,因而Flink集成Iceberg不支持隐藏分区。

5.3 CTAL建表

LIKE语法用于创建一个与另一个表具有相同schema、分区和属性的表。

sql
create table ice_flink_sample5 like ice_flink_sample3;

5.4 CTAS建表

sql
 create table ice_flink_sample6 as select * from ice_flink_sample3;

5.5 创建watermark表

Alt text

5.6 创建含有计算列的表

Alt text

6. 修改表

6.1 修改表名

sh
Flink SQL> alter table ice_flink_sample6 rename to new_ice_flink_sample6;
[INFO] Execute statement succeed.

6.2 修改表属性

sh
Flink SQL> alter table new_ice_flink_sample6 set ('write.format.default'='avro');
[INFO] Execute statement succeed.

6.3 添加表字段

sh
Flink SQL> alter table ice_flink_sample1 add catagory string;
[INFO] Execute statement succeed.
Flink SQL> show create table ice_flink_sample1;
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                                                                                                 result |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| CREATE TABLE `hive_catalog`.`flink_db`.`ice_flink_sample1` (
  `id` BIGINT,
  `data` VARCHAR(2147483647),
  `catagory` VARCHAR(2147483647)
) WITH (
  'write.parquet.compression-codec' = 'zstd',
  'write.upsert.enabled' = 'true'
)
 |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set

6.4 修改表字段

sh
## 修改表字段名称
Flink SQL> alter table ice_flink_sample1 rename catagory to category;
[INFO] Execute statement succeed.
## 将id类型从int改为bigint
Flink SQL> alter table ice_flink_sample1 modify id bigint;
[INFO] Execute statement succeed.

6.5 删除表字段

sh
Flink SQL> alter table ice_flink_sample1 drop category;
[INFO] Execute statement succeed.

7. 删除表

对new_ice_flink_sample6插入数据后,可见HDFS上data目录下有文件:
Alt text

sql
drop table new_ice_flink_sample6;

查看HDFS上表目录还在,但是data目录下已经被清空 Alt text

8. 插入数据

8.1 INSERT INTO

sql
INSERT INTO sample1 VALUES (1, 'a');

8.2 INSERT OVERWRITE

仅支持Flink的Batch模式, 默认是streaming

sh
Flink SQL> insert overwrite sample1 values(3, 'china');
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Unbounded data stream doesn't support overwrite operation.
Flink SQL> SET execution.runtime-mode = batch;
[INFO] Execute statement succeed.
Flink SQL> insert overwrite sample1 values(3, 'china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 77fdc0e6f11cafa42e9fb39e1745c0e4
Flink SQL> select * from sample1;
+----+-------+
| id |  data |
+----+-------+
|  3 | china |
+----+-------+
1 row in set (2.51 seconds)

8.3 UPSERT

当将数据写入v2表格式时,Iceberg支持基于主键的UPSERT(没有主键的表肯定不得行🤔)。有两种方法可以启用upsert。

  1. 建表时指定v2表
sql
-- 建表不指定版本默认是v1表
CREATE TABLE ice_flink_sample6 (
  `id`  INT COMMENT 'unique id',
  `data` STRING NOT NULL,
  PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);

插入数据发现数据条数只有2条:

sh
Flink SQL> insert into ice_flink_sample6 values (1, 'china'),(2, 'russia'),(2, 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 210a266aa8fa1bfb93d5d075fe09afb9

Flink SQL> select * from ice_flink_sample6;
+----+-------+
| id |  data |
+----+-------+
|  1 | china |
|  2 | japan |
+----+-------+
2 rows in set (2.06 seconds)
  1. 插入时指定
sql
-- 表要求仍然是v2表,只不过没配置属性write.upsert.enabled而已,想用upsert可以使用hint语法
INSERT INTO sample1 /*+ OPTIONS('upsert-enabled'='true') */
values (3, 'test');
  1. 通过修改表属性
sql
create table ice_flink_sample7(
id int,
category string,
primary key(id) not ENFORCED
) partitioned by (id);
alter table ice_flink_sample7 set ('format-version'='2');
alter table ice_flink_sample7 set ('write.upsert.enabled'='true');
insert into ice_flink_sample7 values (1, 'china'),(2, 'russia'),(2, 'japan');

查询ice_flink_sample7表数据:
Alt text

提示

OVERWRITE和UPSERT不能同时设置。在UPSERT模式下,如果对表进行分区,则分区字段必须也是主键。uperset在流式处理中iceberg中会出现大量的小文件,容易出现性能问题。

9. 查询数据

Iceberg支持Flink的流式和批量读取。 需要将flinksql集成kafka的jark包提前放到lib目录下:
Alt text

sh
cp /opt/software/flink-sql-connector-kafka-3.3.0-1.19.jar /opt/module/flink-1.17.2/lib/

然后创建一张流式表在flink的数据库中(不用到iceberg特性的表都不建议建在iceberg的数据库中):

sql
-- 默认的catalog是基于flink内部的,但是重启会丢失
create table default_catalog.default_database.kafka_data(
  id int,
  data string
) with (
  'connector' = 'kafka'
  ,'topic' = 'test111'
  ,'properties.bootstrap.servers' = 'hadoop102:9092'
  ,'format' = 'json'
  ,'properties.group.id'='iceberg'
  ,'scan.startup.mode'='earliest-offset'
);
INSERT INTO hive_catalog.flink_db.sample1 SELECT * FROM default_catalog.default_database.kafka_data;

9.1 Batch模式

sql
-- 设置批量读取,flink查询数据后直接返回返回
SET execution.runtime-mode = batch;

向主题test111添加数据:
Alt text
消费数据返回结果:

sh
Flink SQL> select * from sample1;
+----+-------+
| id |  data |
+----+-------+
|  3 | china |
|  3 | Korea |
+----+-------+
2 rows in set (2.77 seconds)

9.2 Streaming模式

sql
SET execution.runtime-mode = streaming;
-- 流式读取会使用sqlhint语法,需要开启sqlhint功能
SET table.dynamic-table-options.enabled=true;
  1. 从当前快照读取所有记录,然后从该快照读取增量数据 Alt text 但是会发现查询并没有隔1s就读出结果,延迟比较大🐌
  2. 读取指定快照id(不包含)后的增量数据
sh
SELECT * FROM sample1 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='1480938328156972338')*/ ;

执行结果:
Alt text

  • monitor-interval: 连续监控新提交数据文件的时间间隔(默认为10s)。
  • start-snapshot-id: 流作业开始的快照id。

9.3 读取元数据

  1. 查询表历史
sh
Flink SQL> SELECT * FROM sample1$history;
+----+----------------------------+----------------------+----------------------+---------------------+
| op |            made_current_at |          snapshot_id |            parent_id | is_current_ancestor |
+----+----------------------------+----------------------+----------------------+---------------------+
| +I | 2025-02-20 16:27:42.219000 |  7624841869579953721 |               <NULL> |                TRUE |
| +I | 2025-02-20 18:25:48.871000 |  7746000265018688527 |  7624841869579953721 |                TRUE |
| +I | 2025-02-20 23:39:53.333000 |   331542884841832756 |  7746000265018688527 |                TRUE |
| +I | 2025-02-20 23:44:52.346000 |  7842208569105275211 |   331542884841832756 |                TRUE |
| +I | 2025-02-20 23:48:52.545000 |  4972667660659945835 |  7842208569105275211 |                TRUE |
| +I | 2025-02-20 23:49:52.551000 |  8765394301691897863 |  4972667660659945835 |                TRUE |
| +I | 2025-02-20 23:50:52.567000 |   888884632272991439 |  8765394301691897863 |                TRUE |
| +I | 2025-02-20 23:51:52.518000 |  1480938328156972338 |   888884632272991439 |                TRUE |
| +I | 2025-02-20 23:56:52.421000 |  8015754396930444358 |  1480938328156972338 |                TRUE |
+----+----------------------------+----------------------+----------------------+---------------------+
Received a total of 9 rows (2.53 seconds)
  1. 查询表快照
sql
SELECT * FROM sample1$snapshots;

执行结果:
Alt text 3. 查询表分区

sql
SELECT * FROM sample1$partitions;

10. 与Flink集成的不足

当前Flink-Iceberg集成工作中尚不支持以下功能:

  • 不支持使用隐藏分区创建Iceberg表。
  • 不支持使用计算列创建Iceberg表。
  • 不支持使用watermark创建Iceberg表。