集成Flink
1. 版本选择
Paimon目前支持Flink1.15~20,我们使用Flink1.16.2
2. 环境准备
2.1 安装Flink
请参考Flink部署模式
2.2 上传Paimon包
- jar包下载地址:https://paimon.apache.org/docs/0.8/project/download/, 选择Flink16的版本:
- 拷贝paimon的jar包到flink的lib目录下:
cp paimon-flink-1.16-0.8.2.jar /opt/gbaseHD/flink-1.16.2/lib
2.3 启动sql-client
- 修改flink-conf.yaml配置
vim /opt/gbaseHD/flink-1.16.2/conf/flink-conf.yaml
# 解决中文乱码,1.17之后参数是env.java.opts.all
env.java.opts: -Dfile.encoding=UTF-8
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4
execution.checkpointing.interval: 10s
state.backend: rocksdb
state.checkpoints.dir: hdfs://hadoop102:8020/ckps
state.backend.incremental: true
- 解决依赖问题
cp /opt/gbaseHD/hadoop-3.3.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.3.jar /opt/gbaseHD/flink-1.16.2/lib/
- 以Yarn-Session模式启动
## 开启yarn-session模式,执行过程稍微等一下会自动完成退出
/opt/gbaseHD/flink-1.16.2/bin/yarn-session.sh -nm test -d
## 启动Flink的sql-client
/opt/gbaseHD/flink-1.16.2/bin/sql-client.sh -s yarn-session
4. 设置结果显示模式
SET 'sql-client.execution.result-mode' = 'tableau';
SET 'rest.address' = 'gbasehd113';
SET 'rest.port' = '38381';
如果执行任务报错:
Flink SQL> select 1;
[ERROR] Could not execute SQL statement. Reason:
java.lang.NullPointerException: rest.address must be set
可以设置两个属性:
Flink SQL> SET 'rest.address' = 'gbasehd113';
Flink SQL> SET 'rest.port' = '38381';
3. Catalog
Paimon提供了Catalog抽象概念来管理表数据和元数据,Paimon目前支持四种类型的元存储:
- filesystemmetastore(默认),将元数据和表文件存储在文件系统中。
- hivemetastore,它还将元数据存储在Hive元存储中。用户可以直接从Hive访问表。
- jdbcmetastore,它额外将元数据存储在关系数据库中,例如MySQL、Postgres等。
- restmetastore,旨在提供一种从单个客户端访问任何目录后端的轻量级方式。
3.1 Filesystem Catalog
CREATE CATALOG fs_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://Node01:8020/paimon/warehouse'
);
## 如果想创建外部表的Catalog, Paimon不支持filesystem类型:
-- table.type属性为external
CREATE CATALOG external_catalog_fs WITH (
'type'='paimon',
'warehouse'='hdfs://Node01:8020/paimon/external/fs',
'table.type' = 'external'
);
执行报错: 在不指定table.type' = 'external'属性的catalog中,需要指定说明conetcor表属性创建外部表,查看当前所有的catalog:
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| fs_catalog |
+-----------------+
2 rows in set
在HDFS的页面上,可以发现默认会创建一个default的数据库:
3.2 Hive Catalog
通过使用Hive Catalog,对Catalog的更改将直接影响相应的hive metastore。在此类Catalog中创建的表也可以直接从Hive访问。要使用Hive Catalog,数据库名称、表名称和字段名称应小写。
- 上传hive-connector
由于要连接hive的metastore服务,需要使用hive的连接器,以及需要提前启动metastore服务。访问Flink官网的Hive连接器章节,将flink-sql-connector-hive-3.1.3_2.12-1.16.2.jar下载后上传到Flink的lib目录下即可。 - 重启yarn-session集群
由于新添加了jar包,重启yarn-session生效。同时sql-client也要重启。 - 创建catalog
create catalog hive_catalog with (
'type'='paimon',
'metastore'='hive',
'uri'='thrift://Node02:9083',
'hive-conf-dir' = '/opt/module/hive-4.1.0/conf',
-- 一定要按照真实hive仓库地址,自定义warehouse会被覆盖
'warehouse'='hdfs://Node01:8020/user/hive/warehouse'
);
-- 创建支持外部表的catalog
CREATE CATALOG external_catalog_hive WITH (
'type'='paimon',
'metastore' = 'hive',
'uri' = 'thrift://192.168.7.113:9083',
'warehouse'='hdfs://Node01/user/hive/warehouse',
'table.type' = 'external'
);
自定义warehouse地址会无效,因为Paimon会读取hive-conf-dir的配置文件内容。
3.3 JDBC Catalog
通过JDBC方式将传统RDMS上的数据库纳入到Paimon的管理,需要提前把数据库驱动包放到Flink的lib目录下。
- 将MySQL的驱动包放到lib目录
[jack@Node02 software]$ cp mysql-connector-j-8.0.33.jar /opt/module/flink-1.20.2/lib/
- 重启yarn-session集群
由于新添加了jar包,重启yarn-session生效。同时sql-client也要重启。 - 创建catalog
CREATE CATALOG mysql_catalog WITH (
'type' = 'paimon',
'metastore' = 'jdbc',
'uri' = 'jdbc:mysql://Node03:3306/dm',
'jdbc.user' = 'root',
'jdbc.password' = '123456',
'catalog-key'='jdbc',
'warehouse' = 'hdfs://Node01:8020/mysql/warehouse'
);
查看mysql_catalog中的表,发现Paimon并不会主动管理MySQL上已有的表:
Flink SQL> use catalog mysql_catalog;
[INFO] Execute statement succeeded.
Flink SQL> show tables;
Empty set
3.4 REST Catalog
4. sql初始化文件
每次关闭sql-clinet,会发现再次启动后之前的catalog和数据库、表都不在了,并非真正的被删除了而是表信息没有持久化保存,可以创建sql-client的初始化文件,再启动之前加载我们要使用的catalog。
4.1 创建初始化sql文件
vim sql-client-init.sql
## 添加的内容如下
CREATE CATALOG fs_catalog WITH (
'type' = 'paimon',
'warehouse' = 'hdfs://Node01:8020/paimon/warehouse'
);
create catalog hive_catalog with (
'type'='paimon',
'metastore'='hive',
'uri'='thrift://Node02:9083',
'hive-conf-dir' = '/opt/module/hive-4.1.0/conf',
'warehouse'='hdfs://Node01:8020/user/hive/warehouse'
);
USE CATALOG hive_catalog;
SET 'sql-client.execution.result-mode' = 'tableau';
4.2 启动sql-client
./sql-client.sh -s yarn-session -i sql-client-init.sql
4.3 查看Catalog
show catalogs;
show current catalog;
5. 建表
在Paimon Catalog中创建的表就是Paimon的管理表。当表从Catalog中删除时,默认其表文件也将被删除,类似于Hive的内部表。若需创建外部表,需要创建外部表的Catalog, 在Paimon Catalog中建表只能创建paimon格式的表,不支持除了paimon之外其他connector,也就是不能指明外部表。
5.1 创建普通表
CREATE TABLE t_test (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
5.2 查看表结构
查看创建好的表:
Flink SQL> desc t_test;
+----------+--------+-------+----------------------+--------+-----------+
| name | type | null | key | extras | watermark |
+----------+--------+-------+----------------------+--------+-----------+
| user_id | BIGINT | FALSE | PRI(dt, hh, user_id) | | |
| item_id | BIGINT | TRUE | | | |
| behavior | STRING | TRUE | | | |
| dt | STRING | FALSE | PRI(dt, hh, user_id) | | |
| hh | STRING | FALSE | PRI(dt, hh, user_id) | | |
+----------+--------+-------+----------------------+--------+-----------+
5 rows in set
查看建表信息:
Flink SQL> show create t_test
CREATE TABLE t_test (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
5.3 创建分区表
如果定义了主键,则分区字段必须是主键的子集。通过配置partition.expiration-time表属性,可以自动删除过期的分区。
CREATE TABLE t_test_p (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);
常见的,一般会考虑定义以下三类字段为分区字段:
- 创建时间字段(推荐):创建时间通常是不可变的,因此您可以放心地将其视为分区字段并将其添加到主键中。
- 事件时间字段:事件时间是原表中的一个字段(比如updated_time)。对于CDC数据来说,比如从MySQL CDC同步的表或者Paimon生成的Changelogs。它们都是完整的CDC数据,包括了UPDATE_BEFORE记录,从而Paimon可以自动删除之前老分区的数据。
- CDC工具提供的op_ts字段:不推荐定义为分区字段,无法知道之前的记录时间戳。
5.4 CTAS建表
Paimon表可以通过查询的结果创建和填充:
CREATE TABLE test1(
user_id BIGINT,
item_id BIGINT
);
CREATE TABLE test2 AS SELECT * FROM test1;
通过CTAS建表可以重新指定主键等信息。
-- 指定分区
CREATE TABLE test2_p WITH ('partition' = 'dt') AS SELECT * FROM test_p;
-- 指定格式配置
CREATE TABLE test3(
user_id BIGINT,
item_id BIGINT
) WITH ('file.format' = 'orc');
CREATE TABLE test3_op WITH ('file.format' = 'parquet') AS SELECT * FROM test3;
-- 指定主键
CREATE TABLE test_pk WITH ('primary-key' = 'dt,hh') AS SELECT * FROM test;
-- 指定主键和分区
CREATE TABLE test_all WITH ('primary-key' = 'dt,hh', 'partition' = 'dt') AS SELECT * FROM test_p;
5.5 CTL建表
创建与另一个表具有相同schema、分区和表属性的表,但不包含数据。
CREATE TABLE test_ctl LIKE test;
5.6 表属性
表的属性有很多,具体请参阅配置,通过with指定属性:
CREATE TABLE tbl(
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
WITH (
'bucket' = '2',
'bucket-key' = 'user_id'
);
5.7 外部表
类似于Hive的外部表,Paimon的外部表被删除不会清除数据文件,Paimon的外部表可以在任何非Paimon类型的Catalog中使用,使用Paimon的外部表需要在建表的时候申明connector为paimon和path属性即可。
-- 在paimon的catalog中创建外部表,需要指明'auto-create'属性为false
create table t_fs_external(
id INT,
name string,
age INT
)
with (
'connector' = 'paimon',
'path' = 'hdfs://gbasehd111:8020/paimon/external/fs/t_fs_external',
'auto-create' = 'false'
);
--而在非Paimon的catalog中,建表没有限制外部表
create table tt(
id INT,
name string,
age INT
)
with (
'connector' = 'paimon',
'path' = 'hdfs://gbasehd111:8020/paimon/external/fs/tt',
'auto-create' = 'true'
);
5.8 临时表
相对Spark\Hive, 只有Flink支持临时表。如果临时表被删除,其资源将不会被删除,Flink SQL会话关闭时,临时表也会被删除。再次创建Catalog后临时表不像外部表还能找到,与外部表的区别在于,临时表在Paimon Catalog中创建。
CREATE TEMPORARY TABLE t_temp (
k INT,
v STRING
) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://hadoop102:8020/temp.csv',
'format' = 'csv'
);
6. 修改表
6.1 更改/添加表属性
ALTER TABLE test SET (
'write-buffer-size' = '256 MB'
);
6.2 重命名表
ALTER TABLE test1 RENAME TO test_new;
6.3 删除表属性
ALTER TABLE test RESET ('write-buffer-size');
6.4 添加新列
ALTER TABLE test ADD (c1 INT, c2 STRING);
6.5 重命名列
ALTER TABLE test RENAME c1 TO c0;
6.6 删除列
ALTER TABLE test DROP (c0, c2);
6.7 更改列的可为空
CREATE TABLE test_null(
id INT PRIMARY KEY NOT ENFORCED,
coupon_info FLOAT NOT NULL
);
-- 列coupon_info修改成允许为null
ALTER TABLE test_null MODIFY coupon_info FLOAT;
-- 列coupon_info修改成不允许为null
-- 如果表中已经有null值, 修改之前先设置如下参数删除null值
SET 'table.exec.sink.not-null-enforcer' = 'DROP';
ALTER TABLE test_null MODIFY coupon_info FLOAT NOT NULL;
6.8 更改列注释
ALTER TABLE test MODIFY user_id BIGINT COMMENT 'user id';
6.9 更改列位置
ALTER TABLE test MODIFY b INT FIRST;
ALTER TABLE test MODIFY a INT AFTER user_id;
6.10 更改列类型
ALTER TABLE test MODIFY a DOUBLE;
6.11 添加水印
CREATE TABLE test_wm (
id INT,
name STRING,
ts BIGINT
);
ALTER TABLE test_wm ADD(
et AS to_timestamp_ltz(ts,3),
WATERMARK FOR et AS et - INTERVAL '1' SECOND
);
6.12 更改水印
ALTER TABLE test_wm MODIFY WATERMARK FOR et AS et - INTERVAL '2' SECOND;
6.12 删除水印
ALTER TABLE test_wm DROP WATERMARK;
7. DDL
7.1 插入数据
INSERT语句向表中插入新行或覆盖表中的现有数据。sql语法如下:
INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }
其中:
- part_spec:可选,指定分区的键值对列表,多个用逗号分隔。格式为PARTITION (分区列名称 = 分区列值 [ , … ] )
- column_list:可选,指定以逗号分隔的字段列表。
- value_expr:指定要插入的值。可以指定多于一组的值来插入多行。
目前,Flink 不支持直接使用NULL,因此需要将NULL转换为实际数据类型值,比如“CAST (NULL AS STRING)”再插入。也不能将另一个表的可为空列插入到一个表的非空列中。Flink可以使用COALESCE函数来处理:
INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B
7.2 插入有界数据
需要提前设置批模式:
SET 'execution.runtime-mode' = 'batch';
insert into user_info values (1, 'test', 123);
7.3 插入无界数据
需要提前设置流模式(默认不改就是流模式):
-- 流模式
SET 'execution.checkpointing.interval' = '5s';
CREATE TABLE IF NOT EXISTS kafka_input (
id int,
name STRING,
age INT,
`offset` STRING METADATA VIRTUAL
)
WITH
(
'connector' = 'kafka',
'topic' = 'NewTopic',
'properties.bootstrap.servers' = 'gbasehd111:9092,gbasehd112:9092,gbasehd113:9092',
'properties.group.id' = 'test-group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
insert into user_source(id, name, age) select id, name, age from default_catalog.default_database.kafka_input;
7.4 覆盖数据
覆盖数据只支持batch模式。默认情况下,流式读取将忽略INSERT OVERWRITE生成的提交。
- 覆盖未分区的表
SET 'execution.runtime-mode' = 'batch';
--覆盖前数据
Flink SQL> select * from t_test_internal;
+----+--------+
| id | name |
+----+--------+
| 1 | <NULL> |
+----+--------+
1 row in set (19.70 seconds)
INSERT OVERWRITE t_fs_external VALUES(3, 'pay', 2);
-- 覆盖后数据,说明覆盖和Hive不一样,Paimon是全部覆盖,Hive是相关的覆盖
Flink SQL> select * from t_fs_external;
+----+------+-----+
| id | name | age |
+----+------+-----+
| 3 | pay | 2 |
+----+------+-----+
1 row in set
- 覆盖分区表 对于分区表,Paimon默认的覆盖模式是动态分区覆盖(即Paimon只删除insert overwrite数据中出现的分区)
INSERT OVERWRITE test_p SELECT * from test;
-- 覆盖指定分区
INSERT OVERWRITE test_p PARTITION (dt = '2023-07-01', hh = '2') SELECT user_id,item_id,behavior from test;
7.5 清空表
可以使用INSERT OVERWRITE通过插入空值来清除表(关闭动态分区覆盖)。
INSERT OVERWRITE test_p/*+ OPTIONS('dynamic-partition-overwrite'='false') */ SELECT * FROM test_p WHERE false;
7.6 更新数据
目前,Paimon在Flink1.17及后续版本中支持使用UPDATE更新记录。只有主键表支持此功能,但不支持更新主键。MergeEngine需要deduplicate
或partial-update
才能支持此功能。
UPDATE test SET item_id = 4, behavior = 'pv' WHERE user_id = 3;
配置MergeEngine只需要with加上相关属性:'merge-engine'='deduplicate'
,默认为deduplicate。
7.7 删除数据
目前,Paimon 在Flink1.17及后续版本中支持使用DELETE删除记录。需要表有主键并且MergeEngine需要为deduplicate
(默认deduplicate)
DELETE FROM test WHERE user_id = 3;
7.8 Merge Into
通过merge into实现行级更新,只有主键表支持此功能。需要用到paimon-flink-action-1.2.0.jar,上传文件:
[root@gbasehd111 lib]# pwd
/opt/gbaseHD/flink/lib
[root@gbasehd111 lib]# ll |grep action
-rw-r--r--. 1 root root 11381 9月 8 15:05 paimon-flink-action-1.2.0.jar
语法说明:
<FLINK_HOME>/bin/flink run \
/path/to/paimon-flink-action-1.2.0.jar \
merge-into \
--warehouse <warehouse-path> \
--database <database-name> \
--table <target-table> \
[--target-as <target-table-alias>] \
--source-table <source-table-name> \
[--source-sql <sql> ...]\
--on <merge-condition> \
--merge-actions <matched-upsert,matched-delete,not-matched-insert,not-matched-by-source-upsert,not-matched-by-source-delete> \
--matched-upsert-condition <matched-condition> \
--matched-upsert-set <upsert-changes> \
--matched-delete-condition <matched-condition> \
--not-matched-insert-condition <not-matched-condition> \
--not-matched-insert-values <insert-values> \
--not-matched-by-source-upsert-condition <not-matched-by-source-condition> \
--not-matched-by-source-upsert-set <not-matched-upsert-changes> \
--not-matched-by-source-delete-condition <not-matched-by-source-condition> \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]
# 参数说明
--source-sql <sql> 可以传递sql来配置环境并在运行时创建源表。
"match"的说明:
1. matched:更改的行来自目标表,每个行都可以根据条件匹配源表行(source ∩ target):
合并条件(--on)
匹配条件(--matched-xxx-condition)
2. not-matched:更改的行来自源表,并且根据条件所有行都不能与任何目标表的行匹配(source – target):
合并条件(--on)
不匹配条件(--not-matched-xxx-condition):不能使用目标表的列来构造条件表达式。
3. not-matched-by-source:更改的行来自目标表,并且基于条件所有行都不能与任何源表的行匹配(target – source):
合并条件(--on)
源不匹配条件(--not-matched-by-source-xxx-condition):不能使用源表的列来构造条件表达式。
案例实操:
CREATE TABLE ws1 (
id INT,
ts BIGINT,
vc INT,
PRIMARY KEY (id) NOT ENFORCED
);
INSERT INTO ws1 VALUES(1,1,1),(2,2,2),(3,3,3);
CREATE TABLE ws_t (
id INT,
ts BIGINT,
vc INT,
PRIMARY KEY (id) NOT ENFORCED
);
INSERT INTO ws_t VALUES(2,2,2),(3,3,3),(4,4,4),(5,5,5);
- 案例一: ws_t与ws1匹配id,将ws_t中ts>2的vc改为10,ts<=2的删除
./bin/flink run ./lib/paimon-flink-action-1.2.0.jar \
merge_into \
--warehouse hdfs://node01:8020/paimon/fs/internal/warehouse \
--database default \
--table ws_t \
--source_table default.ws1 \
--on "ws1.id"="ws_t.id" \
--merge_actions matched-upsert,matched-delete \
--matched_upsert_condition "ws_t.ts>2" \
--matched_upsert_set "vc=10" \
--matched_delete_condition "ws_t.ts<=2"
- 案例二: ws_t与ws1匹配id,匹配上的将ws_t中vc加10,ws1中没匹配上的插入ws_t中
./bin/flink run ./lib/paimon-flink-action-1.2.0.jar \
merge_into \
--warehouse hdfs://node01:8020/paimon/fs/internal/warehouse \
--database default \
--table ws_t \
--source_table default.ws1 \
--on "ws1.id"="ws_t.id" \
--merge_actions matched-upsert,not-matched-insert \
--matched_upsert_set "vc=ws_t.vc+10" \
--not_matched_insert_values "*"
- 案例三: ws_t与ws1匹配id,ws_t中没匹配上的,ts大于4则vc加20,ts=4则删除
./bin/flink run ./lib/paimon-flink-action-1.2.0.jar \
merge_into \
--warehouse hdfs://node01:8020/paimon/fs/internal/warehouse \
--database default \
--table ws_t \
--source_table default.ws1 \
--on "ws1.id"="ws_t.id" \
--merge_actions not-matched-by-source-upsert,not-matched-by-source-delete \
--not_matched_by_source_upsert_condition "ws_t.ts>4" \
--not_matched_by_source_upsert_set "vc=ws_t.vc+20" \
--not_matched_by_source_delete_condition "ws_t.ts=4"
- 案例四: 使用--source-sql创建新catalog下的源表,匹配ws_t的id,没匹配上的插入ws_t
./bin/flink run ./lib/paimon-flink-action-1.2.0.jar \
merge_into \
--warehouse hdfs://node01:8020/paimon/fs/internal/warehouse \
--database default \
--table ws_t \
--source_sql "CREATE CATALOG fs2 WITH ('type' = 'paimon','warehouse' = 'hdfs://node01:8020/paimon/fs2/internal/warehouse')" \
--source_sql "CREATE DATABASE IF NOT EXISTS fs2.test" \
--source_sql "CREATE TEMPORARY VIEW fs2.test.ws2 AS SELECT id+10 as id,ts,vc FROM ws_t" \
--source_table fs2.test.ws2 \
--on "ws_t.id = ws2.id" \
--merge_actions not-matched-insert \
--not_matched_insert_values "*"
8. DQL查询表
Paimon的批量读取返回表快照中的所有数据。默认情况下,批量读取返回最新快照。
-- 设置执行模式为批模式
SET 'execution.runtime-mode' = 'batch';
8.1 时间旅行
paimon按照日期保存了表的快照数据, 同时使用EARLIEST和LATEST标识最早的和当前的快照版本:
- 读取指定id的快照
Flink SQL> SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '1') */;
+----+----+----+
| id | ts | vc |
+----+----+----+
| 2 | 2 | 2 |
| 3 | 3 | 3 |
| 4 | 4 | 4 |
| 5 | 5 | 5 |
+----+----+----+
4 rows in set
- 读取指定时间戳的快照
SELECT * FROM ws_t$snapshots;
SELECT * FROM ws_t /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;
8.2 增量查询
读取开始快照(不包括)和结束快照之间的增量更改。
-- “3,5”表示快照 3 和快照 5 之间的更改
Flink SQL> SELECT * FROM ws_t /*+ OPTIONS('incremental-between' = '3,5') */;
+----+----+----+
| id | ts | vc |
+----+----+----+
| 4 | 4 | 4 |
| 5 | 5 | 25 |
| 11 | 1 | 1 |
| 12 | 2 | 2 |
| 13 | 3 | 20 |
| 15 | 5 | 25 |
+----+----+----+
6 rows in set (1.75 seconds)
在batch模式中,不返回DELETE记录,因此-D的记录将被删除。如果你想查看DELETE记录,可以查询audit_log表:
Flink SQL> SELECT * FROM ws_t$audit_log /*+ OPTIONS('incremental-between' = '3,5') */;
+---------+----+----+----+
| rowkind | id | ts | vc |
+---------+----+----+----+
| -D | 4 | 4 | 4 |
| +U | 5 | 5 | 25 |
| +I | 11 | 1 | 1 |
| +I | 12 | 2 | 2 |
| +I | 13 | 3 | 20 |
| +I | 15 | 5 | 25 |
+---------+----+----+----+
6 rows in set (1.22 seconds)
8.3 流式查询
设置流模式读取数据:
SET 'execution.checkpointing.interval'='30s';
SET 'execution.runtime-mode' = 'streaming';
设置从最新的地方开始读取,也就是当前没有新数据进来,在页面上不会显示数据:
SELECT * FROM ws_t /*+ OPTIONS('scan.mode' = 'latest') */
- 时间旅行
-- 如果只想处理今天及以后的数据,则可以使用分区过滤器来实现
SELECT * FROM test_p WHERE dt > '2023-07-01'
如果不是分区表那就需要使用时间旅行的流读取:
-- 从指定快照id开始读取变更数据
SELECT * FROM ws_t /*+ OPTIONS('scan.snapshot-id' = '1') */;
--从指定时间戳开始读取
SELECT * FROM ws_t /*+ OPTIONS('scan.timestamp-millis' = '1688369660841') */;
--第一次启动时读取指定快照数据,并继续读取变化
SELECT * FROM ws_t /*+ OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */;
需要注意的的是如果不是从首次位置开始读取的话,查询结果的变更记录是不准确的。 2. 使用ConsumerID 如果数据源是Kafka,在流式读取表时可以指定consumer-id,这样可以实现和Flink类似的savepoint的功能:
-- 指定consumer-id开始流式查询
SELECT * FROM ws_t /*+ OPTIONS('consumer-id' = 'atguigu') */;
- 当之前的作业停止后,新启动的作业可以继续消耗之前的进度,而不需要从状态恢复。新的读取将从消费者文件中找到的下一个快照ID开始读取。
- 在判断一个快照是否过期时,Paimon会查看文件系统中该表的所有消费者,如果还有消费者依赖这个快照,那么这个快照就不会因为过期而被删除。
- 当没有水印定义时,Paimon表会将快照中的水印传递到下游Paimon表,这意味着您可以跟踪整个管道的水印进度。
9. 查询优化
建议在查询时指定分区和主键过滤器,这将加快查询数据的速度。Paimon会按主键对数据进行排序,从而加快点查询和范围查询的速度。使用复合主键时,查询过滤器最好形成主键的最左边前缀,以获得良好的加速效果。
-- 通过为主键最左边的前缀指定范围过滤器
SELECT * FROM orders WHERE catalog_id=1025 AND order_id=29495;
-- 下面例子的过滤器不能很好地加速查询:
-- order_id不是主键
SELECT * FROM orders WHERE order_id=29495;
-- order_id不是主键,OR会让加速失效
SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
10. 系统表
系统表保存了每个表的元数据和信息,可以通过批量模式查询系统表。
10.1 快照表(Snapshots Table)
通过查询快照表,可以了解该表的提交和过期信息以及数据的时间旅行。
SELECT * FROM ws_t$snapshots;
10.2 模式表(Schemas Table)
通过schemas表可以查询该表的历史schema
SELECT * FROM ws_t$schemas;
可以连接快照表和模式表以获取给定快照的字段:
SELECT s.snapshot_id, t.schema_id, t.fields
FROM ws_t$snapshots s JOIN ws_t$schemas t
ON s.schema_id=t.schema_id where s.snapshot_id=3;
10.3 选项表(Options Table)
可以通过选项表查询DDL中指定的表的选项信息。未显示的选项将是默认值。
Flink SQL> SELECT * FROM ws_t$options;
Empty set (8.94 seconds)
10.4 审计日志表(Audit log Table)
如果需要审计表的changelog,可以使用audit_log系统表。通过audit_log表,获取表增量数据时可以获取rowkind列。您可以利用该栏目进行过滤等操作来完成审核。 rowkind 有四个值:
- +I:插入操作。
- -U:使用更新行的先前内容进行更新操作。
- +U:使用更新行的新内容进行更新操作。
- -D:删除操作。
SELECT * FROM ws_t$audit_log;
10.5 文件表(Files Table)
可以查询特定快照表的文件。
-- 查询最新快照的文件
SELECT * FROM ws_t$files;
-- 查询指定快照的文件
SELECT * FROM ws_t$files /*+ OPTIONS('scan.snapshot-id'='1') */;
10.6 标签表(Tags Table)
通过tags表可以查询表的标签历史信息,包括基于哪些快照进行标签以及快照的一些历史信息。您还可以通过名称获取所有标签名称和时间旅行到特定标签的数据。
SELECT * FROM ws_t$tags;
11. 维表Join
Paimon支持Lookup Join语法,它用于从Paimon查询的数据来补充维度字段。要求一个表具有处理时间属性,而另一个表由查找源连接器支持。
11.1 数据准备
CREATE TABLE customers (
id INT PRIMARY KEY NOT ENFORCED,
name STRING,
country STRING,
zip STRING
);
INSERT INTO customers VALUES(1,'zs','ch','123'),(2,'ls','ch','456'), (3,'ww','ch','789');
CREATE TEMPORARY TABLE Orders (
order_id INT,
total INT,
customer_id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second'='1',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='1',
'fields.order_id.end'='1000000',
'fields.total.kind'='random',
'fields.total.min'='1',
'fields.total.max'='1000',
'fields.customer_id.kind'='random',
'fields.customer_id.min'='1',
'fields.customer_id.max'='3'
);
11.2 使用Join
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN customers
FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
Lookup Join算子会在本地维护一个RocksDB缓存并实时拉取表的最新更新。查找连接运算符只会提取必要的数据,因此您的过滤条件对于性能非常重要。 如果Orders(主表)的记录Join缺失,因为customers(查找表)对应的数据还没有准备好。可以考虑使用Flink的Delayed Retry Strategy For Lookup。