Hive优化

平常工作中写HiveSQL比较多,对于一些常见的Hive问题和优化做一下总结:

1. MapJoin

如果不指定Mapjoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join, 即在Reduce阶段完成Join,容易发生数据倾斜。这时可以使用MapJoin把小表全部加载到内存中在Map端进行Join,避免Reduce处理。

2. 行列过滤

列处理:在SELECT中,只拿需要处理的列。如非必要,尽量避免SELECT *;

行处理:尽早的过滤数据,减少每个阶段的数据量,对于分区表尽量使用分区过滤。

3. 尽量原子化操作

尽量避免一个SQL包含复杂的逻辑,可以创建临时表来完成复杂的逻辑。

4. 采用分区技术

可以把数据根据数据量按照天或者按照周、月来分区。

5. 合理设置Map数量和Reduce数

Hive中的SQL查询会生成执行计划,执行计划以MapReduce的方式执行,那么结合数据和集群的大小,Map和Reduce的数量就会影响到SQL的执行效率,除了要控制Hive生成的Job的数量,也要控制map和reduce的数量。

Map的数量

通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
Hive中默认的hive.input.formatorg.apache.hadoop.hive.ql.io.CombineHivelnputFormat,对于combineHiveInputFormat,它的输入的map数量由三个配置决定:
mapred.min.split.size.per.node 一个节点上split的至少的大小
mapred.min.split.size.per.rack 一个交换机下split 至少的大小
mapred.max.split.size 一个split 最大的大小
主要思路是把输入目录下的大文件分成多个map的输入,并合并小文件,做为一个map的输入
具体的原理是下述三步:

a) 根据输入目录下的每个文件,如果其长度超过mapred.max.split.size,以Block 为单位分成多个Split(一个Split是一个map的输入),每个split的长度都大于mapred.max.split.size,因为以Block为单位,因此也会大于·BlockSize,此文件剩下的长度如果大于mapred.min.split.size.per.node,则生成一个Split,否则先暂时保留;

b) 现在剩下的都是一些长度较短的碎片,把每个rack 下碎片合并,只要长度超过
mapred.max.split.size就合并成一-个 split,最后如果剩下的碎片比mapred.min.split.size.per.rack 大,就合并成一个split,否则暂时保留;

c) 把不同rack下的碎片合并,只要长度超过mapred.max.split.size就合并成一个 split,剩下的碎片无论长度,合并成一个split。

Reduce的数量

reduce数量由以下三个参数决定:

  • mapred.reduce.tasks(强制指定reduce的任务数量)
  • hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
  • hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
    计算reducer 数的公式:
    N=min(hive.exec.reducers.max,总输入数据量/hive.exec.reducers.bytes.per.reducer)

6. 注意Join的使用

把重复关联键少的表放在join前面,可以提高join效率。网上所谓的hive会将join前面的表放在内存中,把小表放在前面能减少内存资源消耗这种说法在现在看来其实是有异议的。用1条记录的表和3亿条记录的表做join,无论小表是放在join的前面还是join的后面,执行的时间几乎都是相同的,原因是因为Hive在早期某个版本中,底层对此进行了优化。

7. 小文件处理

HiveInputFormat没有对小文件的合并功能
可以使用Combinefileinputformat,将多个小文件打包作为一个整体的inputsplit,减少map任务数
set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node= 256000000
set mapred.min.split.size.per.rack=256000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHivelnputFormat;

设置hive参数,额外启动-一个MR Job打包小文件:
hive.merge.mapredfiles=false 是否合并Reduce 输出文件,默认为False
hive.merge.size.per.task = 256*1000*1000 合并文件的大小

8. 注意数据倾斜

a) 通过 hive.groupby.skewindata=true控制生成两个MR Job,第一个 MR Job Map的输出结果随机分配到reduce做次预汇总,减少某些key值条数过多某些key条数过小造成的数据倾斜问题;

b) 通过hive.map.aggr = true(默认为true)在Map端做combiner,假如map各条数据基本上不一样,聚合没什么意义,做combiner反而画蛇添足,hive里也考虑的比较周到通过参数
hive.groupby.mapaggr.checkinterval = 100000(默认)

hive.map.aggr.hash.min.reduction=0.5(默认),预先取100000条数据聚合,如果聚合后的条数/100000>0.5,则不再聚合。

9. 合理使用multi insert,union all

multi insert适合基于同一个源表按照不同逻辑不同粒度处理插入不同表的场景,做到只需要扫描源表一次,job个数不变,减少源表扫描次数;

union all用好,可减少表的扫描次数,减少job 的个数,通常预先按不同逻辑不同条件生成的查询union all后,再统一Group by计算,不同表的union all相当于multiple inputs,同一个表的union all,相当map一次输出多条。

10.Group优化

对于Group操作,首先在map端聚合,最后在reduce端做聚合,以下是相关的参数:
hive.map.aggr=true 是否在Map端进行聚合,默认为True,
hive.groupby.mapaggr.checkinterval= 100000 在Map端进行聚合操作的条目数目。

11. 使用压缩

设置map端输出,中间结果压缩。不完全解决数据倾斜问题,但是减少了IO读写和网络传输,能提高很多效率。

set hive.exec.compress.intermediate = true;

对于中间数据压缩,选择一个低CPU开销的Codec要比选择一个压缩率高的Codec要重要的多。

SnappyCodec是一个比较好的中间文件Codec,因为其很好的结合了低CPU开销和好的压缩执行效率。

12. 开启JVM重用

JVM重用是Hadoop调优参数的内容,对hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者Task特别多的场景,这类场景大多数执行时间都很短。Hadoop默认配置是使用JVM来执行map和reduce任务的,这时Jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。

JVM重用可以使得JVM实例在同一个JOB中重新使用N次,N的值可以在Hadoop的mapre-site.xml文件中进行设置
mapred.job.reuse.jvm.num.tasks
也可在hive的执行设置:
set  mapred.job.reuse.jvm.num.tasks=10;

 JVM重用的一个缺点是,开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个不平衡的job中有几个reduce task 执行的时间要比其他reduce task消耗的时间多得多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

13.使用动态分区

在Hive中,有时候会希望根据输入的Key,把结果自动输出到不同的目录中,这可以通过动态分区来实现,就是把每一个 key当作一个 分区。
如果要启动动态分区,则需要进行下面的设置首先需要在hive语句中设置允许动态分区
set hive.exec.dynamic.partition=true
set hive.exec.dynamic.partition.mode=nonstrict
在动态分区有可能很大的情况下,还需要其他的调整
hive.exec.dynamic.partitions.pernode 参数指的是每个节点上能够生成的最大分区,这个在最坏情况下应该是跟最大分区一样的值
hive.exec.dynamic.partitions.partitions 参数指的是总共的最大的动态分区数hive.exec.max.createdfiles参数指的是能够创建的最多文件数(分区一多,文件必然就多了)
最后要注意的是select语句中要把distribute的key也select出来。

14.使用列式存储

根据数据的特点来进行技术选型:如果数据结构是比较扁平的,那么用 ORC 比较合适,如果嵌套较多,就用 Parquet
列存储主要有两个好处:数据压缩和查询性能提升,在节省了存储的同时还提升了查询性能,这个的收益是非常可观的。

15.使用索引

索引可以避免全表扫描和资源浪费
索引可以加快含有Group By语句的查询的计算速度
hive.optimize.index.filter=true; 使用自动索引
hive.optimie.index.groupby=true;使用聚合索引优化GROUP BY操作

16.利用好EXPLAIN

Explain命令对于优化查询语句很重要,针对某些查询语句,我们可以通过它查看各个执行计划,针对耗时的地方,采取优化。