Skip to content

分组聚合和Join优化

分组聚合主要指的group by计算,而Hive的join计算,包括Common Join,Map Join,Bucket Map Join,Sort Merge Buckt Map Join等。

1. 分组聚合优化

Hive中未经优化的分组聚合,是通过一个MapReduce Job实现的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
Hive对分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是map-side聚合。所谓map-side聚合,就是在map端维护一个hash table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区,发送至reduce端,完成最终的聚合。map-side聚合能有效减少shuffle的数据量,提高分组聚合运算的效率。
map-side 聚合相关的参数如下:

sql
--启用map-side聚合
set hive.map.aggr=true;

-- 用于检测源表数据是否适合进行map-side聚合。检测的方法是:先对若干条数据进行map-side聚合,
-- 若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行map-side聚合;
-- 否则,认为该表数据不适合进行map-side聚合,
-- 后续数据便不再进行map-side聚合。
set hive.map.aggr.hash.min.reduction=0.5;

-- 用于检测源表是否适合map-side聚合的条数。
set hive.groupby.mapaggr.checkinterval=100000;

-- map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;

提示

不一定所有的计算都适合Map端,比如按照主键进行聚合,数据条数并没有减少,这也是配置hive.map.aggr.hash.min.reduction的意义

2. Common Join

Common Join是Hive中最稳定的join算法,其通过一个MapReduce Job完成一个join操作。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。如下图所示:
Alt text 需要注意的是,sql语句中的join操作和执行计划中的Common Join任务并非一对一的关系,一个sql语句中的相邻的且关联字段相同的多个join操作可以合并为一个Common Join任务。

3. Map Join

3.1 Map Join概述

Map Join算法可以通过两个只有map阶段的Job完成一个join操作。其适用场景为大表join小表。若某join操作满足要求,则第一个Job会读取小表数据,将其制作为hash table,并上传至Hadoop分布式缓存(本质上是上传至HDFS)。第二个Job会先从分布式缓存中读取小表数据,并缓存在Map Task的内存中,然后扫描大表数据,这样在map端即可完成关联操作。如下图所示:
Alt text

3.2 Map Join触发方式

Map Join有两种触发方式,一种是用户在SQL语句中增加hint提示,另外一种是Hive优化器根据参与join表的数据量大小,自动触发。

  1. Hint提示
    用户可通过如下方式,指定通过map join算法,并且ta将作为map join中的小表。这种方式已经过时,不推荐使用。比如:
sql
select /*+ mapjoin(ta) */
    ta.id,
    tb.id
from table_a ta
join table_b tb
on ta.id=tb.id;
  1. 自动触发
    Hive在编译SQL语句阶段,起初所有的join操作均采用Common Join算法实现。
    之后在物理优化阶段,Hive会根据每个Common Join任务所需表的大小判断该Common Join任务是否能够转换为Map Join任务,若满足要求,便将Common Join任务自动转换为Map Join任务。
    但有些Common Join任务所需的表大小,在SQL的编译阶段是未知的(例如对子查询进行join操作),所以这种Common Join任务是否能转换成Map Join任务在编译阶是无法确定的。
    针对这种情况,Hive会在编译阶段生成一个条件任务(Conditional Task),其下会包含一个计划列表,计划列表中包含转换后的Map Join任务以及原有的Common Join任务。最终具体采用哪个计划,是在运行时决定的。大致思路如下图所示:
    Alt text Map join自动转换的具体判断逻辑如下图所示:
    Alt text 其中参数hive.auto.convert.join.noconditionaltask.sizehive.mapjoin.smalltable.filesize作用一样,可能是参与开发Hive的人太多导致参数功能重复的,都是指定小表的文件大小,值得注意的是这里指的小表的文件大小是表数据在内存中的大小而不是文件系统中存储的大小。实际生产中如果实际存储大小10M, 考虑到文件解压、序列化到内存中,参数应该设置为100M左右。
sql
--启动Map Join自动转换
set hive.auto.convert.join=true;

-- 一个Common Join operator转为Map Join operator的判断条件,若该Common Join相关的表中,
-- 存在n-1张表的已知大小总和<=该值,则生成一个Map Join计划,此时可能存在多种n-1张表的组合均满足该条件,
-- 则hive会为每种满足条件的组合均生成一个Map Join计划,同时还会保留原有的Common Join计划作为后备(back up)计划,
-- 实际运行时,优先执行Map Join计划,若不能执行成功,则启动Common Join后备计划。
set hive.mapjoin.smalltable.filesize=250000;

--开启无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;

-- 无条件转Map Join时的小表之和阈值,若一个Common Join operator相关的表中,存在n-1张表的大小总和<=该值,
-- 此时hive便不会再为每种n-1张表的组合均生成Map Join计划,同时也不会保留Common Join作为后备计划。
-- 而是只生成一个最优的Map Join计划。
set hive.auto.convert.join.noconditionaltask.size=10000000;

4. Map Join优化实操

4.1 SQL背景

  1. 目前查询数据SQL:
sql
set hive.auto.convert.join=FALSE ;
explain select
    *
from order_detail od
join product_info product on od.product_id = product.id
join province_info province on od.province_id = province.id;

执行计划结果:
上述SQL语句共有三张表进行两次join操作,且两次join操作的关联字段不同。故优化前的执行计划应该包含两个Common Join operator,也就是由两个MapReduce任务实现。执行计划如下图所示:
Alt text 2. 优化思路
经分析,参与join的三张表中有province_info小表,数据量如下:

表名大小
order_detail1176009934(约1122M)
product_info25285707(约24M)
province_info369(约0.36K)

提示

可使用如下语句获取表/分区的大小信息(也就是HDFS上面文件大小):

sql
desc formatted table_name partition(partition_col='partition');

三张表中,product_info和province_info数据量较小,可考虑将其作为小表,进行Map Join优化。

4.2 方案一

sql
-- 启用Map Join自动转换。
set hive.auto.convert.join=true;
-- 不使用无条件转Map Join
set hive.auto.convert.join.noconditionaltask=false;
-- 调整hive.mapjoin.smalltable.filesize参数,使其大于等于product_info
set hive.mapjoin.smalltable.filesize=25285707;

这样可保证将两个Common Join operator均可转为Map Join operator,并保留Common Join作为后备计划,保证计算任务的稳定。调整完的执行计划如下图:
Alt text

4.3 方案二

sql
-- 启用Map Join自动转换
set hive.auto.convert.join=true;
-- 使用无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;
-- 调整hive.auto.convert.join.noconditionaltask.size参数,
-- 使其大于等于product_info和province_info之和。
set hive.auto.convert.join.noconditionaltask.size=25286076;

这样可直接将两个Common Join operator转为两个Map Join operator,并且由于两个Map Join operator的小表大小之和小于等于hive.auto.convert.join.noconditionaltask.size,故两个Map Join operator任务可合并为同一个。这个方案计算效率最高,但需要的内存也是最多的。
调整完的执行计划如下图:
Alt text

4.4 方案三

sql
-- 启用Map Join自动转换。
set hive.auto.convert.join=true;
-- 使用无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;
-- 调整hive.auto.convert.join.noconditionaltask.size参数,使其等于product_info。
set hive.auto.convert.join.noconditionaltask.size=25285707;

这样可直接将两个Common Join operator转为Map Join operator,但不会将两个Map Join的任务合并。该方案计算效率比方案二低,但需要的内存也更少。
调整完的执行计划如下图:
Alt text

5. Bucket Map Join

5.1 Bucket Map Join概述

Bucket Map Join是对Map Join算法的改进,其打破了Map Join只适用于大表join小表的限制,可用于大表join大表的场景。
Bucket Map Join的核心思想是:若能保证参与join的表均为分桶表,且关联字段为分桶字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍,就能保证参与join的两张表的分桶之间具有明确的关联关系,所以就可以在两表的分桶间进行Map Join操作了。这样一来,第二个Job的Map端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶即可。其原理如图所示:
Alt text 比如上图中,A-0/A-2和B-0是对应关系,A-1/A-4和B-1是对应关系。然后B-0和B-1将作为HashTable缓存起来,按照A表的4个桶启动4个Map任务,将刚刚的分布式缓存B-0、B-1分别进入这些Map任务,作为Local Task处理,之后和Map Join处理差不多了。

5.2 Bucket Map Join触发方式

Bucket Map Join不支持自动转换,发须通过用户在SQL语句中提供如下Hint提示,并配置如下相关参数,方可使用。

sql
-- Hint提示
select /*+ mapjoin(ta) */
    ta.id,
    tb.id
from table_a ta
join table_b tb on ta.id=tb.id;

相关参数配置:

sql
--关闭cbo优化,cbo会导致hint信息被忽略
set hive.cbo.enable=false;
--map join hint默认会被忽略(因为已经过时),需将如下参数设置为false
set hive.ignore.mapjoin.hint=false;
--启用bucket map join优化功能
set hive.optimize.bucketmapjoin = true;

提示

由于Hive从Hive2.x开始就已经不在维护MR计算引擎,而官方推荐Spark/Tez作为计算引擎,这就导致MR里面的Bucket Map Join自动转换烂尾了,使用Spark/Tez在Hive里面是得到支持Bucket Map Join自动转换功能的。

6. Bucket Map Join优化实操

6.1 SQL背景

  1. 目前待优化SQL如下:
sql
select
    *
from(
    select
        *
    from order_detail
    where dt='2020-06-14'
)od
join(
    select
        *
    from payment_detail
    where dt='2020-06-14'
)pd
on od.id=pd.order_detail_id;

6.2 优化前执行效果:

上述SQL语句共有两张表一次join操作,故优化前的执行计划应包含一个Common Join任务,通过一个MapReduce Job实现。执行计划如下图所示:
Alt text

6.3 优化思路

经分析比较两个表的大小:
Alt text
参与join的两张表数据量如下:

表名大小
order_detail1176009934(约1122M)
payment_detail334198480(约319M)

两张表都相对较大,若采用普通的Map Join算法,则Map端需要较多的内存来缓存数据,当然可以选择为Map段分配更多的内存,来保证任务运行成功。但是,Map端的内存不可能无上限的分配,所以当参与Join的表数据量均过大时,就可以考虑采用Bucket Map Join算法。下面演示如何使用Bucket Map Join。
首先需要依据源表创建两个分桶表,order_detail建议分16个bucket,payment_detail建议分8个bucket,注意分桶个数的倍数关系以及分桶字段。

6.4 创建分桶表

生产实际中应该提前规划好表,不会临时重建表,这里仅做演示,下面创建表并导入数据。

sql
-- 订单表
create table order_detail_bucketed(
    id           string comment '订单id',
    user_id      string comment '用户id',
    product_id   string comment '商品id',
    province_id  string comment '省份id',
    create_time  string comment '下单时间',
    product_num  int comment '商品件数',
    total_amount decimal(16, 2) comment '下单金额'
)
clustered by (id) into 16 buckets
row format delimited fields terminated by '\t';
-- 支付表
create table payment_detail_bucketed(
    id              string comment '支付id',
    order_detail_id string comment '订单明细id',
    user_id         string comment '用户id',
    payment_time    string comment '支付时间',
    total_amount    decimal(16, 2) comment '支付金额'
)
clustered by (order_detail_id) into 8 buckets
row format delimited fields terminated by '\t';
-- 导入分桶表数据
insert overwrite table order_detail_bucketed
select
    id,
    user_id,
    product_id,
    province_id,
    create_time,
    product_num,
    total_amount   
from order_detail
where dt='2020-06-14';
insert overwrite table payment_detail_bucketed
select
    id,
    order_detail_id,
    user_id,
    payment_time,
    total_amount
from payment_detail
where dt='2020-06-14';

6.5 设置以下参数调优

sql
-- 关闭cbo优化,cbo会导致hint信息被忽略,需将如下参数修改为false
set hive.cbo.enable=false;
-- map join hint默认会被忽略(因为已经过时),需将如下参数修改为false
set hive.ignore.mapjoin.hint=false;
-- 启用bucket map join优化功能,默认不启用,需将如下参数修改为true
set hive.optimize.bucketmapjoin = true;

6.6 重新执行SQL执行计划

sql
explain formatted select /*+ mapjoin(pd) */
    *
from order_detail_bucketed od
join payment_detail_bucketed pd on od.id = pd.order_detail_id;

优化后的执行计划如图所示: Alt text Bucket Map Join的执行计划的基本信息和普通的Map Join区别不大,若想看到执行计划走的是Bucket Map Join,可执行如下语句来查看执行计划的详细信息。
Alt text 在详细执行计划中,如在Map Join Operator中看到 "BucketMapJoin: true",则表明使用的Join算法为Bucket Map Join。

7. Sort Merge Bucket Map Join

7.1 Sort Merge Bucket Map Join概述

Sort Merge Bucket Map Join(简称SMB Map Join)基于Bucket Map Join。SMB Map Join要求,参与join的表均为分桶表,且需保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段为相同字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍。
SMB Map Join同Bucket Join一样,同样是利用两表各分桶之间的关联关系,在分桶之间进行join操作,不同的是,分桶之间的join操作的实现原理。Bucket Map Join,两个分桶之间的join实现原理为Hash Join算法;而SMB Map Join,两个分桶之间的join实现原理为Sort Merge Join算法。
Hash Join和Sort Merge Join均为关系型数据库中常见的Join实现算法。Hash Join的原理相对简单,就是对参与join的一张表构建hash table,然后扫描另外一张表,然后进行逐行匹配。Sort Merge Join需要在两张按照关联字段排好序的表中进行,其原理如图所示:
Alt text Hive中的SMB Map Join就是对两个分桶的数据按照上述思路进行Join操作。可以看出,SMB Map Join与Bucket Map Join相比,在进行Join操作时,Map端是无需对整个Bucket构建hash table,也无需在Map端缓存整个Bucket数据的,每个Mapper只需按顺序逐个key读取两个分桶的数据进行join即可。

7.2 Sort Merge Bucket Map Join触发方式

Sort Merge Bucket Map Join有两种触发方式,包括Hint提示和自动转换。Hint提示已过时,不推荐使用。下面是自动转换的相关参数:

sql
--启动Sort Merge Bucket Map Join优化
set hive.optimize.bucketmapjoin.sortedmerge=true;
--使用自动转换SMB Join
set hive.auto.convert.sortmerge.join=true;

8. Sort Merge Bucket Map Join优化实操

8.1 SQL背景

  1. 需要优化的SQL如下:
sql
select
    *
from(
    select
        *
    from order_detail
    where dt='2020-06-14'
)od
join(
    select
        *
    from payment_detail
    where dt='2020-06-14'
)pd
on od.id=pd.order_detail_id;

8.2 优化前效果

上述SQL语句共有两张表一次join操作,故优化前的执行计划应包含一个Common Join任务,通过一个MapReduce Job实现。

8.3 优化思路

经分析,参与join的两张表,数据量如下:

表名大小
order_detail1176009934(约1122M)
payment_detail334198480(约319M)

两张表都相对较大,除了可以考虑采用Bucket Map Join算法,还可以考虑SMB Join。相较于Bucket Map Join,SMB Map Join对分桶大小是没有要求的。下面演示如何使用SMB Map Join。
首先需要依据源表创建两个的有序的分桶表,order_detail建议分16个bucket,payment_detail建议分8个bucket,注意分桶个数的倍数关系以及分桶字段和排序字段

8.4 创建有序分桶表

sql
-- 订单表
create table order_detail_sorted_bucketed(
    id           string comment '订单id',
    user_id      string comment '用户id',
    product_id   string comment '商品id',
    province_id  string comment '省份id',
    create_time  string comment '下单时间',
    product_num  int comment '商品件数',
    total_amount decimal(16, 2) comment '下单金额'
)
clustered by (id) sorted by(id) into 16 buckets
row format delimited fields terminated by '\t';
-- 支付表
create table payment_detail_sorted_bucketed(
    id              string comment '支付id',
    order_detail_id string comment '订单明细id',
    user_id         string comment '用户id',
    payment_time    string comment '支付时间',
    total_amount    decimal(16, 2) comment '支付金额'
)
clustered by (order_detail_id) sorted by(order_detail_id) into 8 buckets
row format delimited fields terminated by '\t';
-- 向两个分桶表导入数据
insert overwrite table order_detail_sorted_bucketed
select
    id,
    user_id,
    product_id,
    province_id,
    create_time,
    product_num,
    total_amount   
from order_detail
where dt='2020-06-14';
insert overwrite table payment_detail_sorted_bucketed
select
    id,
    order_detail_id,
    user_id,
    payment_time,
    total_amount
from payment_detail
where dt='2020-06-14';

8.5 设置参数调优

sql
-- 启动Sort Merge Bucket Map Join优化
set hive.optimize.bucketmapjoin.sortedmerge=true;
-- 使用自动转换SMB Join
set hive.auto.convert.sortmerge.join=true;

8.6 重新执行SQL执行计划

sql
explain formatted select
    *
from order_detail_sorted_bucketed od
join payment_detail_sorted_bucketed pd
on od.id = pd.order_detail_id;

优化后的执行计如图所示:
Alt text

7. Map优化总结

  1. 实际工作中,生产上面不会每次连接Hive才去配置优化参数,可以直接将优化参数配置到hive-site.xml文件中,比如这三个基本参数:hive.auto.convert.joinhive.mapjoin.smalltable.filesizehive.auto.convert.join.noconditionaltask
  2. hive.mapjoin.smalltable.filesize配置大小不能超过当前机器的2/3的内存,并且filesize/10才是对应的文件大小。比如hdfs上的表100MB,那么要在内存中缓存这张表就要占用1GB内存,那么当前机器至少1.5GB内存才合理。
  3. Map Join应用在大小表场景, Bucket Map Join、Sort Merge Bucket Map Join适用于都是大表的场景。