Paimon进阶使用 
1. 管理快照 
1.1 快照过期 
Paimon中标记被删除的数据并没有真正被删除,因为Paimon还支持时间旅行到更早的快照。它们仅在快照过期时被删除。通过使旧快照过期,可以删除不再使用的旧数据文件和元数据文件,以释放磁盘空间。设置以下表属性:
| 选项 | 必需 | 默认 | 类型 | 描述 | 
|---|---|---|---|---|
| snapshot.time-retained | No | 1 h | Duration | 已完成快照的最长时间保留。 | 
| snapshot.num-retained.min | No | 10 | Integer | 要保留的已完成快照的最小数量。 | 
| snapshot.num-retained.max | No | Integer.MAX_VALUE | Integer | 要保留的已完成快照的最大数量。 | 
注意,保留时间太短或保留数量太少可能会导致如下问题:
- 批量查询找不到该文件。例如,表比较大,批量查询需要10分钟才能读取,但是10分钟前的快照过期了,此时批量查询会读取到已删除的快照。
- 表文件上的流式读取作业(没有外部日志系统)无法重新启动。当作业重新启动时,它记录的快照可能已过期。(可以使用Consumer Id来保护快照过期的小保留时间内的流式读取)。
1.2 回滚快照 
回滚快照支持使用action的jar包进行操作:
<FLINK_HOME>/bin/flink run \
    /path/to/paimon-flink-action-0.5-SNAPSHOT.jar \
    rollback-to \
    --warehouse <warehouse-path> \
    --database <database-name> \ 
    --table <table-name> \
    --snapshot <snapshot-id> \
    [--catalog-conf <paimon-catalog-conf> [--catalog-conf <paimon-catalog-conf> ...]]另外支持使用FlinkSQL进行回滚:
CALL sys.rollback_to(`table` => 'database_name.table_name', snapshot_id => <snasphot-id>);2. 管理分区 
创建分区表时可以设置partition.expiration-time。 Paimon会定期检查分区的状态,并根据时间删除过期的分区。判断分区是否过期逻辑为:将分区中提取的时间与当前时间进行比较,看生存时间是否超过partition.expiration-time。
CREATE TABLE T (...) PARTITIONED BY (dt) WITH (
    'partition.expiration-time' = '7 d',
    'partition.expiration-check-interval' = '1 d',
    'partition.timestamp-formatter' = 'yyyyMMdd'
);| 选项 | 默认 | 类型 | 描述 | 
|---|---|---|---|
| partition.expiration-check-interval | 1h | Duration | 分区过期的检查间隔。 | 
| partition.expiration-time | (none) | Duration | 分区的过期时间间隔。 如果分区的生命周期超过此值,则该分区将过期。 分区时间是从分区值中提取的。 | 
| partition.timestamp-formatter | (none) | String | 用于格式化字符串时间戳的格式化程序。 它可以与 partition.timestamp-pattern一起使用来创建使用指定值的格式化程序。默认格式化程序为“yyyy-MM-dd HH:mm:ss”和“yyyy-MM-dd”。 支持多个分区字段,例如“$year-$month-$day $hour:00:00”。 时间戳格式化程序与 Java 的 DateTimeFormatter 兼容。 | 
| partition.timestamp-pattern | (none) | String | 可以指定一种模式来从分区获取时间戳。 格式化程序模式由“partition.timestamp-formatter”定义。 默认情况下,从第一个字段读取。 如果分区中的时间戳是名为“dt”的单个字段,则可以使用“$dt”。 如果它分布在年、月、日和小时的多个字段中,则可以使用“$year-$month-$day $hour:00:00”。 如果时间戳位于dt和hour字段中,则可以使用“$dt $hour:00:00”。 | 
3. 管理小文件 
小文件可能会导致:
- 稳定性问题:HDFS中小文件过多,NameNode会承受过大的压力。
- 成本问题:HDFS中的小文件会暂时使用最小1个Block的大小,例如128MB。
- 查询效率:小文件过多查询效率会受到影响。
3.1 Flink Checkpoint因素 
默认使用Flink Writer,每个checkpoint会生成1-2个快照,并且checkpoint会强制在DFS上生成文件,因此checkpoint间隔越小,会生成越多的小文件。另外writer的内存(write-buffer-size)耗尽也会将数据flush到DFS并生成相应的文件。可以启用write-buffer-spillable在writer中生成溢出文件,从而在DFS中生成更大的文件。
 所以可以设置如下:
- 增大checkpoint间隔
- 增加write-buffer-size或启用write-buffer-spillable
3.2 快照因素 
Paimon维护文件的多个版本,文件的Compaction和删除是逻辑上的,并没有真正删除文件。文件只有在Snapshot过期后才会被真正删除,因此减少文件的第一个方法就是减少 Snapshot 过期的时间。 Flink writer会自动使快照过期。
 另外配置较少的桶数,否则会出现也有很多小文件。
3.3 主键表LSM因素 
LSM树将文件组织成Sorted Runs的运行。num-sorted-run.compaction-trigger的值这意味着一个桶中至少有 5 个文件。如果要减少此数量,可以保留更少的文件,但写入性能可能会受到影响。
3.4 仅追加表的文件因素 
默认情况下,Append-Only还会进行自动Compaction以减少小文件的数量,对于分桶的Append-only表,为了排序会对bucket内的文件行Compaction,可能会保留更多的小文件。
3.5 Full-Compaction因素 
主键表是5个文件,但是Append-Only表(桶)可能单个桶里有50个小文件,这是很难接受的。更糟糕的是,不再活动的分区还保留了如此多的小文件。建议配置Full-Compaction,在Flink写入时配置full-compaction.delta-commits定期进行full-compaction。并且可以确保在写入结束之前分区被full-compaction。
4. 缩放Bucket 
由于总桶数对性能影响很大,Paimon允许用户通过ALTER TABLE命令调整桶数,并通过INSERT OVERWRITE重新组织数据布局,而无需重新创建表/分区。当执行覆盖作业时,框架会自动扫描旧桶号的数据,并根据当前桶号对记录进行哈希处理。
警告
- ALTER TABLE仅修改表的元数据,不会重新组织或重新格式化现有数据。重新组织现有数据必须通过- INSERT OVERWRITE来实现。
- 重新缩放桶数不会影响读取和正在运行的写入作业。
- 一旦存储桶编号更改,任何新安排的INSERT INTO作业写入未重新组织的现有表/分区将抛出TableException ,并显示如下类似异常:
Try to write table/partition ... with a new bucket num ..., 
but the previous bucket num is ... Please switch to batch mode, 
and perform INSERT OVERWRITE to rescale current data layout first.在覆盖期间,确保没有其他作业写入同一表/分区。
对于分区表,不同的分区可以有不同的桶号。例如:
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01')
SELECT * FROM ...;
  
ALTER TABLE my_table SET ('bucket' = '8');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-02')
SELECT * FROM ...;4.2 案例实操 
如下是正在跑的一个作业:
-- 建表
CREATE TABLE verified_orders (
    trade_order_id BIGINT,
    item_id BIGINT,
    item_price DOUBLE,
    dt STRING,
    PRIMARY KEY (dt, trade_order_id, item_id) NOT ENFORCED 
) PARTITIONED BY (dt)
WITH (
    'bucket' = '16'
);
-- kafka表 
CREATE temporary TABLE raw_orders(
    trade_order_id BIGINT,
    item_id BIGINT,
    item_price BIGINT,
    gmt_create STRING,
    order_status STRING
) WITH (
    'connector' = 'kafka',
    'topic' = '...',
    'properties.bootstrap.servers' = '...',
    'format' = 'csv'
    ...
);
-- 流式插入16个分桶
INSERT INTO verified_orders
SELECT trade_order_id,
       item_id,
       item_price,
       DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
FROM raw_orders
WHERE order_status = 'verified';过去几周运行良好。然而,最近数据量增长很快,作业的延迟不断增加。为了提高数据新鲜度,用户可以执行如下操作缩放分桶:
# 使用保存点暂停流作业
./bin/flink stop \
      --savepointPath /tmp/flink-savepoints \
      $JOB_ID增加桶数:
ALTER TABLE verified_orders SET ('bucket' = '32');
-- 切换到批处理模式并覆盖流作业正在写入的当前分区
SET 'execution.runtime-mode' = 'batch';
-- 假设今天是2022-06-22
-- 情况1:没有更新历史分区的延迟事件,因此覆盖今天的分区就足够了
INSERT OVERWRITE verified_orders PARTITION (dt = '2022-06-22')
SELECT trade_order_id,
       item_id,
       item_price
FROM verified_orders
WHERE dt = '2022-06-22';
  
- 情况2:有更新历史分区的延迟事件,但范围不超过3天
INSERT OVERWRITE verified_orders
SELECT trade_order_id,
       item_id,
       item_price,
       dt
FROM verified_orders
WHERE dt IN ('2022-06-20', '2022-06-21', '2022-06-22');覆盖作业完成后,切换回流模式,从保存点恢复(可以增加并行度=新bucket数量):
SET 'execution.runtime-mode' = 'streaming';
SET 'execution.savepoint.path' = <savepointPath>;
INSERT INTO verified_orders
SELECT trade_order_id,
     item_id,
     item_price,
     DATE_FORMAT(gmt_create, 'yyyy-MM-dd') AS dt
FROM raw_orders
WHERE order_status = 'verified';