Hive进阶

最近当当网书香节,满100减50,购物车里的书迫不及待的下单买了,5折价格真香。

拿到手后花了约一周时间把《Hive性能调优实战》 这本看完了。

由于最近工作中遇到Hive的优化问题,对我来说收获颇丰,让我对Hive有了更深层次的理解。书的主要内容如下:

总结了一些对我来说比较有帮助的点:

MUTI-INSERT写法

找到student_stat表中每个年龄段最早出生和最晚出生的人的出生日期,插入tp分区里:
正常来说会这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
INSERT into table student_stat partition(tp)
select
s_age,
min(s_birth) stat,
'min' tp
from student_tb_txt
group by s_age

union all
select
s_age,
max(s_birth) stat,
'max' tp
from student_tb_txt
group by s_age;

上面的HQL会形成5个MR的job,执行过程类似这样

正常来说,优化会把这个union all改写成两个insert,类似这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
--计算max值
insert into table student_stat partition(tp)
select
s_age,
max(s_birth) stat,
'max' tp
from student_tb_txt
group by s_age;
--计算min值
insert into table student_stat partition(tp)
select
s_age,
min(s_birth) stat,
'min' tp
from student_tb_txt
group by s_age;

执行过程会是下面这样,同样是两次表扫描,两次求聚合操作(存在Shuffle)并写入结果表。

看起来优化掉了union all操作,计算max和min值的两个job可直接将数据放到student_stat下,减少了一次MapReduce作业,看似方案比较合理,但是实际执行起来反而比优化之前时间更久。其实Hive的早期版本确实可以优化,但是随着Hive版本的迭代,对union操作进行了优化,导致拆分后的代码执行效率更低了。

那么是否可以只经过一次Table Scan并写入呢?答案是肯定的

1
2
3
4
5
6
7
8
9
10
11
12
13
from student_tb_txt
INSERT into table student_stat partition(tp)
select
s_age,
min(s_birth) stat,
'min' tp
group by s_age
insert into table student_stat partition(tp)
select
s_age,
max(s_birth) stat,
'max' tp
group by s_age;

执行过程如下:

这样写就只会产生一个MR的Job,也就是说只执行了一次MapReduce作业。相比优化之前的5个Job,极大的减少了磁盘IO和网络通信,提高执行效率。

Count(distinct)优化

统计年龄枚举值个数:
下意识的觉得group by去重会比distinct效率高所以会写成这样,能有效避免Reduce阶段数据倾斜

1
2
3
4
5
select count(1) from(
select s_age
from student_tb_orc
group by s_age
) b
  • 因为去重的是s_age列,实际上业务含义表示年龄,枚举值个数非常有限,在Map阶段会对s_age去重,因此每个Map得到的s_age有限,最后到达Reduce阶段的非常有限,根本不会达到数据倾斜的量。
  • 另外group by在不同版本间变动比较大,有的版本会用构建hashtable的形式去重,有的版本会通过排序的方式,排序最优时间复杂度无法到O(1) 。另外上面写法转化为两个任务,会消耗更多的磁盘网络I/O资源。

所以,这样的写法有点过度优化,执行过程如下:

如下写法是正常写法:

1
2
select count(distinct s_age)
from student_tb_orc

distinct的命令会在内存中构建一个hashtable,查找去重的时间复杂度是O(1);

Hive 3.0中新增了count(distinct)优化,通过配置set hive.optimize.countdistinct = true设置,即使真的出现数据倾斜也可以自动优化,自动改变SQL执行的逻辑。

执行过程如下:

理透需求原则,这是优化的根本;

把握数据全链路原则,这是优化的脉络;

坚持代码的简洁原则,这让优化更加简单;

没有瓶颈时谈论优化,是自寻烦恼。

学会查看执行计划

查看执行计划的命令:explain sql
查看执行计划的扩展信息:explain extended sql
执行计划包含两部分:

  1. 作业的依赖关系图,即STAGE DEPENDENCIES;
  2. 每个作业的详细信息,即STAGE PLANS;

例如:

1
2
3
4
5
6
7
8
explain
select
s_age,
count(1) as num
from prac.student_tb_txt
where s_age<30
and s_name like '%红'
group by s_age;

执行计划如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan // 表扫描
alias: student_tb_txt
Statistics: Num rows: 25083517 Data size: 2709019904 Basic stats: COMPLETE Column stats: NONE
Filter Operator
predicate: ((s_age < 30) and (s_name like '%红')) (type: boolean)
Statistics: Num rows: 4180586 Data size: 451503299 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: s_age (type: bigint)
outputColumnNames: s_age
Statistics: Num rows: 4180586 Data size: 451503299 Basic stats: COMPLETE Column stats: NONE
Group By Operator
aggregations: count(1)
keys: s_age (type: bigint)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 4180586 Data size: 451503299 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator// 输出结果给Reduce
key expressions: _col0 (type: bigint)
sort order: +
Map-reduce partition columns: _col0 (type: bigint)
Statistics: Num rows: 4180586 Data size: 451503299 Basic stats: COMPLETE Column stats: NONE
value expressions: _col1 (type: bigint)
Reduce Operator Tree:// Reduce阶段
Group By Operator
aggregations: count(VALUE._col0)
keys: KEY._col0 (type: bigint)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 2090293 Data size: 225751649 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 2090293 Data size: 225751649 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

STAGE DEPENDENCIES描述了作业之间的依赖关系,即Stage-0依赖Stage-1的执行结果 。Stage-1表示如下的SQL,即SQL select * from prac.student_tb_txt where s_age<30 and s_name like '%红'的执行结果。

Stage–1分为Map和 Reduce两个阶段,对应的执行计划关键词解读如下:

  • MapReduce:表示当前任务执行所用的计算引擎是 MapReduce
  • Map Operator Tree当前描述的Map阶段执行的操作信息。
    Reduce Operator Tree:表示当前描述的是 Reduce阶段的操作信息。

Map操作树(Map Operator Tree)信息解读如下:

  • TableScan:表示对关键字 alias声明的结果集,这里指代 student tb orc,进行表扫描操作。
  • Statistics:表示对当前阶段的统计信息。例如,当前处理的数据行和数据量,这两个都是预估值。
  • Filter Operator:表示在之前操作(TableScan)的结果集上进行数据的过滤
  • predicate:表示 filter Operator进行过滤时,所用的谓词,即s_age<30 and s_e like ‘’%红%’’。
  • Select Operator:表示在之前的结果集上对列进行投影,即筛选列。
  • expressions:表示需要投影的列,即筛选的列。
  • outputColumnNames:表示输出的列名。
  • Group By Operator:表示在之前的结果集上分组聚合。
  • aggregations:表示分组聚合使用的算法,这里是 count(1)。
  • keys:表示分组的列,在该例子表示的是s_age
  • Reduce output Operator:表示当前描述的是对之前结果聚会后的输出信息,这里表示Map端聚合后的输出信息。
  • key expressions/value expressions: MapReduce计算引擎,在Map阶段和 Reduce阶段输出的都是键-值对的形式,这里 key expression value expressions分别描述的就是Map阶段输出的键(key)和值(value)所用的数据列这里的例子 key expressions指代的就是s_age列, value expressions指代的就是 count(1)列。
  • sort order:表示输出是否进行排序,+表示正序,-表示倒序。
  • Map- -reduce partition columns:表示Map阶段输出到 Reduce阶段的分区列,在Hive-SQL中,可以用 distribute by指代分区的列。
  • Reduce阶段所涉及的关键词与Map阶段的关键词是一样的,字段表示含义也相同,
    因此这里不再罗列。下面是 Reduce中出现但是在Map阶段没有出现的关键词。
  • compressed:在 File Output Operator中这个关键词表示文件输出的结果是否进行压缩, false表示不进行输出压缩。
  • table:表示当前操作表的信息。
  • input format/output format:分别表示文件输入和输出的文件类型。
  • serde:表示读取表数据的序列化和反序列化的方式。

开启map端聚合

关闭map端聚合的情况下,普通Group by 的执行计划

1
2
3
set hive.map.aggr=false;
explain
select id,name from students group by id,name;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: students
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: string), name (type: string)
outputColumnNames: id, name
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: id (type: string), name (type: string)
sort order: ++
Map-reduce partition columns: id (type: string), name (type: string)
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Group By Operator
keys: KEY._col0 (type: string), KEY._col1 (type: string)
mode: complete
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

可以看到Group by操作在Reduce阶段,模式为complete。

下面开启map端聚合测试普通的Group by

1
2
3
hive> set hive.map.aggr=true;
hive> explain
> select id,name from students group by id,name;

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1

STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: students
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: id (type: string), name (type: string)
outputColumnNames: id, name
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
Group By Operator
keys: id (type: string), name (type: string)
mode: hash
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
Reduce Output Operator
key expressions: _col0 (type: string), _col1 (type: string)
sort order: ++
Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
Reduce Operator Tree:
Group By Operator
keys: KEY._col0 (type: string), KEY._col1 (type: string)
mode: mergepartial
outputColumnNames: _col0, _col1
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 22 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.SequenceFileInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink

可以看到Group by操作被提到了Map阶段,模式为 hash,而在Reduce阶段,同样进行了一次Group by操作,但这次的模式为mergepartial,也就是把Map阶段和结果合并起来。

[1]林志煌.Hive性能调优实战[E]机械工业出版社,2020.01