Hive优化
平常工作中写HiveSQL比较多,对于一些常见的Hive问题和优化做一下总结:
1. MapJoin
如果不指定Mapjoin
或者不符合MapJoin
的条件,那么Hive解析器会将Join
操作转换成Common Join
, 即在Reduce
阶段完成Join
,容易发生数据倾斜。这时可以使用MapJoin把小表全部加载到内存中在Map
端进行Join
,避免Reduce
处理。
2. 行列过滤
列处理:在SELECT中,只拿需要处理的列。如非必要,尽量避免SELECT *;
行处理:尽早的过滤数据,减少每个阶段的数据量,对于分区表尽量使用分区过滤。
3. 尽量原子化操作
尽量避免一个SQL包含复杂的逻辑,可以创建临时表来完成复杂的逻辑。
4. 采用分区技术
可以把数据根据数据量按照天或者按照周、月来分区。
5. 合理设置Map数量和Reduce数
Hive中的SQL查询会生成执行计划,执行计划以MapReduce的方式执行,那么结合数据和集群的大小,Map和Reduce的数量就会影响到SQL的执行效率,除了要控制Hive生成的Job的数量,也要控制map和reduce的数量。
Map的数量
通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。
Hive中默认的hive.input.format
是org.apache.hadoop.hive.ql.io.CombineHivelnputFormat
,对于combineHiveInputFormat
,它的输入的map数量由三个配置决定:mapred.min.split.size.per.node
一个节点上split的至少的大小mapred.min.split.size.per.rack
一个交换机下split 至少的大小mapred.max.split.size
一个split 最大的大小
主要思路是把输入目录下的大文件分成多个map的输入,并合并小文件,做为一个map的输入
具体的原理是下述三步:
a) 根据输入目录下的每个文件,如果其长度超过mapred.max.split.size
,以Block 为单位分成多个Split(一个Split是一个map的输入),每个split的长度都大于mapred.max.split.size
,因为以Block为单位,因此也会大于·BlockSize
,此文件剩下的长度如果大于mapred.min.split.size.per.node
,则生成一个Split,否则先暂时保留;
b) 现在剩下的都是一些长度较短的碎片,把每个rack 下碎片合并,只要长度超过mapred.max.split.size
就合并成一-个 split,最后如果剩下的碎片比mapred.min.split.size.per.rack
大,就合并成一个split,否则暂时保留;
c) 把不同rack下的碎片合并,只要长度超过mapred.max.split.size
就合并成一个 split,剩下的碎片无论长度,合并成一个split。
Reduce的数量
reduce数量由以下三个参数决定:
mapred.reduce.tasks
(强制指定reduce的任务数量)hive.exec.reducers.bytes.per.reducer
(每个reduce任务处理的数据量,默认为1000^3=1G)hive.exec.reducers.max
(每个任务最大的reduce数,默认为999)
计算reducer 数的公式:N=min(hive.exec.reducers.max,总输入数据量/hive.exec.reducers.bytes.per.reducer)
6. 注意Join的使用
把重复关联键少的表放在join前面,可以提高join效率。网上所谓的hive会将join前面的表放在内存中,把小表放在前面能减少内存资源消耗
这种说法在现在看来其实是有异议的。用1条记录的表和3亿条记录的表做join,无论小表是放在join的前面还是join的后面,执行的时间几乎都是相同的,原因是因为Hive在早期某个版本中,底层对此进行了优化。
7. 小文件处理
HiveInputFormat
没有对小文件的合并功能
可以使用Combinefileinputformat
,将多个小文件打包作为一个整体的inputsplit,减少map任务数set mapred.max.split.size=256000000
;set mapred.min.split.size.per.node= 256000000
;set mapred.min.split.size.per.rack=256000000
;set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHivelnputFormat
;
设置hive参数,额外启动-一个MR Job打包小文件:hive.merge.mapredfiles=false
是否合并Reduce 输出文件,默认为Falsehive.merge.size.per.task = 256*1000*1000
合并文件的大小
8. 注意数据倾斜
a) 通过 hive.groupby.skewindata=true
控制生成两个MR Job,第一个 MR Job Map的输出结果随机分配到reduce做次预汇总,减少某些key值条数过多某些key条数过小造成的数据倾斜问题;
b) 通过hive.map.aggr = true
(默认为true)在Map端做combiner,假如map各条数据基本上不一样,聚合没什么意义,做combiner反而画蛇添足,hive里也考虑的比较周到通过参数hive.groupby.mapaggr.checkinterval = 100000
(默认)
hive.map.aggr.hash.min.reduction=0.5
(默认),预先取100000条数据聚合,如果聚合后的条数/100000>0.5,则不再聚合。
9. 合理使用multi insert,union all
multi insert
适合基于同一个源表按照不同逻辑不同粒度处理插入不同表的场景,做到只需要扫描源表一次,job个数不变,减少源表扫描次数;
union all用好,可减少表的扫描次数,减少job 的个数,通常预先按不同逻辑不同条件生成的查询union all
后,再统一Group by
计算,不同表的union all
相当于multiple inputs
,同一个表的union all
,相当map一次输出多条。
10.Group优化
对于Group操作,首先在map端聚合,最后在reduce端做聚合,以下是相关的参数:hive.map.aggr=true
是否在Map端进行聚合,默认为True,hive.groupby.mapaggr.checkinterval= 100000
在Map端进行聚合操作的条目数目。
11. 使用压缩
设置map端输出,中间结果压缩。不完全解决数据倾斜问题,但是减少了IO读写和网络传输,能提高很多效率。
set hive.exec.compress.intermediate = true;
对于中间数据压缩,选择一个低CPU开销的Codec要比选择一个压缩率高的Codec要重要的多。
SnappyCodec
是一个比较好的中间文件Codec,因为其很好的结合了低CPU开销和好的压缩执行效率。
12. 开启JVM重用
JVM重用是Hadoop调优参数的内容,对hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者Task特别多的场景,这类场景大多数执行时间都很短。Hadoop默认配置是使用JVM来执行map和reduce任务的,这时Jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。
JVM重用可以使得JVM实例在同一个JOB中重新使用N次,N的值可以在Hadoop的mapre-site.xml
文件中进行设置mapred.job.reuse.jvm.num.tasks
也可在hive的执行设置:set mapred.job.reuse.jvm.num.tasks=10
;
JVM重用的一个缺点是,开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个不平衡的job中有几个reduce task
执行的时间要比其他reduce task
消耗的时间多得多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。
13.使用动态分区
在Hive中,有时候会希望根据输入的Key,把结果自动输出到不同的目录中,这可以通过动态分区来实现,就是把每一个 key当作一个 分区。
如果要启动动态分区,则需要进行下面的设置首先需要在hive语句中设置允许动态分区set hive.exec.dynamic.partition=true
;set hive.exec.dynamic.partition.mode=nonstrict
;
在动态分区有可能很大的情况下,还需要其他的调整hive.exec.dynamic.partitions.pernode
参数指的是每个节点上能够生成的最大分区,这个在最坏情况下应该是跟最大分区一样的值hive.exec.dynamic.partitions.partitions
参数指的是总共的最大的动态分区数hive.exec.max.createdfiles
参数指的是能够创建的最多文件数(分区一多,文件必然就多了)
最后要注意的是select语句中要把distribute的key也select出来。
14.使用列式存储
根据数据的特点来进行技术选型:如果数据结构是比较扁平的,那么用 ORC
比较合适,如果嵌套较多,就用 Parquet
。
列存储主要有两个好处:数据压缩和查询性能提升,在节省了存储的同时还提升了查询性能,这个的收益是非常可观的。
15.使用索引
索引可以避免全表扫描和资源浪费
索引可以加快含有Group By语句的查询的计算速度hive.optimize.index.filter=true
; 使用自动索引hive.optimie.index.groupby=true
;使用聚合索引优化GROUP BY操作
16.利用好EXPLAIN
Explain命令对于优化查询语句很重要,针对某些查询语句,我们可以通过它查看各个执行计划,针对耗时的地方,采取优化。