Skip to content

任务并行和小文件合并优化

1. 优化之任务并行度

对于一个分布式的计算任务而言,设置一个合适的并行度十分重要。Hive的计算任务由MapReduce完成,故并行度的调整需要分为Map端和Reduce端。

1.1 Map端并行度

Map端的并行度,也就是Map的个数。是由输入文件的切片数决定的。一般情况下,Map端的并行度无需手动调整。以下特殊情况可考虑调整map端并行度:

  1. 查询的表中存在大量小文件 按照Hadoop默认的切片策略,一个小文件会单独启动一个map task负责计算。若查询的表中存在大量小文件,则会启动大量map task,造成计算资源的浪费。这种情况下,可以使用Hive提供的CombineHiveInputFormat,多个小文件合并为一个切片,从而控制map task个数。相关参数如下:
sql
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  1. map端有复杂的查询逻辑 若SQL语句中有正则替换、json解析等复杂耗时的查询逻辑时,map端的计算会相对慢一些。若想加快计算速度,在计算资源充足的情况下,可考虑增大map端的并行度,令map task多一些,每个map task计算的数据少一些。相关参数如下:
sql
-- 一个切片的最大值
set mapreduce.input.fileinputformat.split.maxsize=256000000;

1.2 Reduce端并行度

Reduce端的并行度,也就是Reduce个数。相对来说,更需要关注。Reduce端的并行度,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算。
Reduce端的并行度的相关参数如下:

sql
-- 指定Reduce端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces;
-- Reduce端并行度最大值
set hive.exec.reducers.max;
-- 单个Reduce Task计算的数据量,用于估算Reduce并行度
set hive.exec.reducers.bytes.per.reducer;

Reduce端并行度的确定逻辑如下:
若指定参数mapreduce.job.reduces的值为一个非负整数,则Reduce并行度为指定值。否则,Hive自行估算Reduce并行度,估算逻辑如下:
假设Job输入的文件大小为totalInputBytes

  • 参数hive.exec.reducers.bytes.per.reducer的值为bytesPerReducer。
  • 参数hive.exec.reducers.max的值为maxReducers。
    则Reduce端的并行度为:
    Alt text 根据上述描述,可以看出,Hive自行估算Reduce并行度时,是以整个MR Job输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定Reduce并行度了。

Reduce并行度在MR引擎中不太准确

MR引擎从Hive2.X就已经过时了,推荐使用Spark和Tez引擎,Spark和Tez引擎是根据Map计算结果来进行判断输入文件大小的。

2. 优化实操

2.1 SQL背景

sql
select
    province_id,
    count(*)
from order_detail
group by province_id;

2.2 优化前

上述sql语句,在不指定Reduce并行度时,Hive自行估算并行度的逻辑如下:

ini
totalInputBytes= 1136009934
bytesPerReducer=256000000
maxReducers=1009

经计算,Reduce并行度为:
Alt text 执行sql后,查看Hadoop上任务执行历史记录:
Alt text

2.3 优化思路

上述sql语句,在默认情况下,是会进行map-side聚合的,也就是Reduce端接收的数据,实际上是map端完成聚合之后的结果。观察任务的执行过程,会发现,每个map端输出的数据只有34条记录,共有5个map task。
Alt text 也就是说Reduce端实际只会接收170(34*5)条记录,故理论上Reduce端并行度设置为1就足够了。这种情况下,用户可通过以下参数,自行设置Reduce端并行度为1。

sql
-- 指定Reduce端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces=1;

3. 优化之小文件合并

小文件合并优化,分为两个方面,分别是Map端输入的小文件合并、Reduce端输出的小文件合并。

3.1 Map端输入文件合并

合并Map端输入的小文件,是指将多个小文件划分到一个切片中,进而由一个Map Task去处理。目的是防止为单个小文件启动一个Map Task,浪费计算资源。
相关参数为:

sql
-- 可将多个小文件切片,合并为一个切片,进而由一个map任务处理
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

3.2 Reduce输出文件合并

合并Reduce端输出的小文件,是指将多个小文件合并成大文件。目的是减少HDFS小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。
相关参数为:

sql
--开启合并map only任务输出的小文件
set hive.merge.mapfiles=true;
--开启合并map reduce任务输出的小文件
set hive.merge.mapredfiles=true;
--合并后的文件大小
set hive.merge.size.per.task=256000000;
--触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
set hive.merge.smallfiles.avgsize=16000000;

4. 优化实操

4.1 需求背景

计算各省份订单金额总和,下表为结果表。

sql
create table order_amount_by_province(
    province_id string comment '省份id',
    order_amount decimal(16,2) comment '订单金额'
)
location '/order_amount_by_province';
-- 数据录入
insert overwrite table order_amount_by_province
select
    province_id,
    sum(total_amount)
from order_detail
group by province_id;

4.2 优化前

根据任务并行度一节所需内容,可分析出,默认情况下,该sql语句的Reduce端并行度为5,故最终输出文件个数也为5,下图为输出文件,可以看出,5个均为小文件。
Alt text

4.3 优化思路

若想避免小文件的产生,可采取方案有两个。

  1. 合理设置任务的Reduce端并行度。 若将上述计算任务的并行度设置为1,就能保证其输出结果只有一个文件。
  2. 启用Hive合并小文件优化 设置以下参数:
sql
-- 开启合并map reduce任务输出的小文件
set hive.merge.mapredfiles=true;
-- 合并后的文件大小
set hive.merge.size.per.task=256000000;
-- 触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
set hive.merge.smallfiles.avgsize=16000000;

再次执行上述的insert语句,观察结果表中的文件,只剩一个了。
Alt text HDFS上面执行效果如下图:
Alt text