Skip to content

Paimon进阶使用

1. 管理快照

1.1 快照过期

Paimon中标记被删除的数据并没有真正被删除,因为Paimon还支持时间旅行到更早的快照。它们仅在快照过期时被删除。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件,以释放磁盘空间。设置以下表属性:

选项必需默认类型描述
snapshot.time-retainedNo1 hDuration已完成快照的最长时间保留。
snapshot.num-retained.minNo10Integer要保留的已完成快照的最小数量。
snapshot.num-retained.maxNoInteger.MAX_VALUEInteger要保留的已完成快照的最大数量。

注意,保留时间太短或保留数量太少可能会导致如下问题:

  1. 批量查询找不到该文件。例如,表比较大,批量查询需要10分钟才能读取,但是10分钟前的快照过期了,此时批量查询会读取到已删除的快照。
  2. 表文件上的流式读取作业(没有外部日志系统)无法重新启动。当作业重新启动时,它记录的快照可能已过期。(可以使用Consumer Id来保护快照过期的小保留时间内的流式读取)。

1.2 回滚快照

回滚快照支持使用action的jar包进行操作:

sh
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
    rollback-to \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    --snapshot <snapshot-id> \
    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]

另外支持使用FlinkSQL进行回滚:

sql
CALL sys.rollback_to(`table` => 'database_name.table_name', snapshot_id => <snasphot-id>);

2. 管理分区

创建分区表时可以设置partition.expiration-time。 Paimon会定期检查分区的状态,并根据时间删除过期的分区。判断分区是否过期逻辑为:将分区中提取的时间与当前时间进行比较,看生存时间是否超过partition.expiration-time

sql
CREATE TABLE T (...) PARTITIONED BY (dt) WITH (
    'partition.expiration-time' = '7 d',
    'partition.expiration-check-interval' = '1 d',
    'partition.timestamp-formatter' = 'yyyyMMdd'
);
选项默认类型描述
partition.expiration-check-interval1hDuration分区过期的检查间隔。
partition.expiration-time(none)Duration分区的过期时间间隔。
如果分区的生命周期超过此值,则该分区将过期。
分区时间是从分区值中提取的。
partition.timestamp-formatter(none)String用于格式化字符串时间戳的格式化程序。
它可以与partition.timestamp-pattern一起使用来创建使用指定值的格式化程序。
默认格式化程序为“yyyy-MM-dd HH:mm:ss”和“yyyy-MM-dd”。
支持多个分区字段,例如“$year-$month-$day $hour:00:00”。
时间戳格式化程序与 Java 的 DateTimeFormatter 兼容。
partition.timestamp-pattern(none)String可以指定一种模式来从分区获取时间戳。
格式化程序模式由“partition.timestamp-formatter”定义。
默认情况下,从第一个字段读取。
如果分区中的时间戳是名为“dt”的单个字段,则可以使用“$dt”。
如果它分布在年、月、日和小时的多个字段中,则可以使用“$year-$month-$day $hour:00:00”。
如果时间戳位于dt和hour字段中,则可以使用“$dt $hour:00:00”。

3. 管理小文件

小文件可能会导致:

  • 稳定性问题:HDFS中小文件过多,NameNode会承受过大的压力。
  • 成本问题:HDFS中的小文件会暂时使用最小1个Block的大小,例如128MB。
  • 查询效率:小文件过多查询效率会受到影响。

默认使用Flink Writer,每个checkpoint会生成1-2个快照,并且checkpoint会强制在DFS上生成文件,因此checkpoint间隔越小,会生成越多的小文件。另外writer的内存(write-buffer-size)耗尽也会将数据flush到DFS并生成相应的文件。可以启用write-buffer-spillable在writer中生成溢出文件,从而在DFS中生成更大的文件。
所以可以设置如下:

  • 增大checkpoint间隔
  • 增加write-buffer-size或启用write-buffer-spillable

3.2 快照因素

Paimon维护文件的多个版本,文件的Compaction和删除是逻辑上的,并没有真正删除文件。文件只有在Snapshot过期后才会被真正删除,因此减少文件的第一个方法就是减少 Snapshot 过期的时间。 Flink writer会自动使快照过期。
另外配置较少的桶数,否则会出现也有很多小文件。

3.3 主键表LSM因素

LSM树将文件组织成Sorted Runs的运行。num-sorted-run.compaction-trigger的值这意味着一个桶中至少有 5 个文件。如果要减少此数量,可以保留更少的文件,但写入性能可能会受到影响。

3.4 仅追加表的文件因素

默认情况下,Append-Only还会进行自动Compaction以减少小文件的数量,对于分桶的Append-only表,为了排序会对bucket内的文件行Compaction,可能会保留更多的小文件。

3.5 Full-Compaction因素

主键表是5个文件,但是Append-Only表(桶)可能单个桶里有50个小文件,这是很难接受的。更糟糕的是,不再活动的分区还保留了如此多的小文件。建议配置Full-Compaction,在Flink写入时配置full-compaction.delta-commits定期进行full-compaction。并且可以确保在写入结束之前分区被full-compaction。

4. 缩放Bucket

由于总桶数对性能影响很大,Paimon允许用户通过ALTER TABLE命令调整桶数,并通过INSERT OVERWRITE重新组织数据布局,而无需重新创建表/分区。当执行覆盖作业时,框架会自动扫描旧桶号的数据,并根据当前桶号对记录进行哈希处理。

警告

  • ALTER TABLE仅修改表的元数据,不会重新组织或重新格式化现有数据。重新组织现有数据必须通过INSERT OVERWRITE来实现。
  • 重新缩放桶数不会影响读取和正在运行的写入作业。
  • 一旦存储桶编号更改,任何新安排的INSERT INTO作业写入未重新组织的现有表/分区将抛出TableException ,并显示如下类似异常:
sh
Try to write table/partition ... with a new bucket num ..., 
but the previous bucket num is ... Please switch to batch mode, 
and perform INSERT OVERWRITE to rescale current data layout first.

在覆盖期间,确保没有其他作业写入同一表/分区。

对于分区表,不同的分区可以有不同的桶号。例如:

sql
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
SELECT * FROM ...;
  
ALTER TABLE my_table SET ('bucket' = '8');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
SELECT * FROM ...;

4.2 案例实操

如下是正在跑的一个作业:

sql
-- 建表
CREATE TABLE verified_orders (
    trade_order_id BIGINT,
    item_id BIGINT,
    item_price DOUBLE,
    dt STRING,
    PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED 
) PARTITIONED BY (dt)
WITH (
    'bucket' = '16'
);

-- kafka表 
CREATE temporary TABLE raw_orders(
    trade_order_id BIGINT,
    item_id BIGINT,
    item_price BIGINT,
    gmt_create STRING,
    order_status STRING
) WITH (
    'connector' = 'kafka',
    'topic' = '...',
    'properties.bootstrap.servers' = '...',
    'format' = 'csv'
    ...
);

-- 流式插入16个分桶
INSERT INTO verified_orders
SELECT trade_order_id,
       item_id,
       item_price,
       DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
FROM raw_orders
WHERE order_status = 'verified';

过去几周运行良好。然而,最近数据量增长很快,作业的延迟不断增加。为了提高数据新鲜度,用户可以执行如下操作缩放分桶:

sh
# 使用保存点暂停流作业
./bin/flink stop \
      --savepointPath /tmp/flink-savepoints \
      $JOB_ID

增加桶数:

sql
ALTER TABLE verified_orders SET ('bucket' = '32');
-- 切换到批处理模式并覆盖流作业正在写入的当前分区
SET 'execution.runtime-mode' = 'batch';
-- 假设今天是2022-06-22
-- 情况1:没有更新历史分区的延迟事件,因此覆盖今天的分区就足够了
INSERT OVERWRITE verified_orders PARTITION (dt = '2022-06-22')
SELECT trade_order_id,
       item_id,
       item_price
FROM verified_orders
WHERE dt = '2022-06-22';
  
- 情况2:有更新历史分区的延迟事件,但范围不超过3天
INSERT OVERWRITE verified_orders
SELECT trade_order_id,
       item_id,
       item_price,
       dt
FROM verified_orders
WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22');

覆盖作业完成后,切换回流模式,从保存点恢复(可以增加并行度=新bucket数量):

sql
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.savepoint.path' = <savepointPath>;

INSERT INTO verified_orders
SELECT trade_order_id,
     item_id,
     item_price,
     DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
FROM raw_orders
WHERE order_status = 'verified';