在数据处理中,有时候会遇到数据倾斜问题,导致任务运行时间过长,尝试调整一些Spark参数,但是效果不一定明显。那么就可能需要从业务逻辑上进行优化。
那么如何判断是否有数据倾斜?
可以通过Spark后台的历史运行日志来看。
我们找到实际运行时间最长的Job
点进看详细的运行信息
如图:
这个Stage运行时长16min,一共有2001个task,其中duration代表了实际运行时间。
这个task重点可以看Duration和Shuffle Read Size
Metric
Min
25th percentile
Median
75th percentile
Max
Duration
0 ms
0.2 s
0.8 s
1 s
2.3 h
GC Time
0 ms
0 ms
0 ms
0.2 s
2.3 min
Shuffle Read Size / Records
0.0 B / 0
7.8 KB / 85
42.2 KB / 161
241.3 KB / 599
357.3 MB / 1555208
Shuffle Write Size / Records
0.0 B / 0
59.0 B / 1
59.0 B / 1
59.0 B / 1
59.0 B / 1
解读:按照Metric的量进行排序,数据在5个采样点(0%,25%,50%,75%,100%)的分布情况。
我们可以认为这个stage就发生了数据倾斜。
如何优化数据倾斜?
首先需要判断发生倾斜的部分在哪一段,这样才可以更好的应对。 还是这个job,在Sprak运行后台找到Duration最长的stage
根据describe中的代码片段找到具体这段代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 CREATE OR REPLACE TEMPORARY VIEW fmx_spf AS SELECT f.* , s.adj_direction , s.adj_rate , CASE WHEN s.adj_direction = 'INCREASE' THEN 1 + CAST (s.adj_rate AS DECIMAL (8 , 2 ))/ 100 WHEN s.adj_direction = 'DECREASE' THEN 1 - CAST (s.adj_rate AS DECIMAL (8 , 2 ))/ 100 ELSE 1 END AS spf_rate , s.adj_tiers , sox_rule_match(json_split(s.adj_rules) , CONCAT_WS(',' , list_ids) , site_group , numeric1 , coupon_cd , CAST (item_price_amt AS string) , CONCAT(site_id, '-' , meta_categ_id) , customer_status , cmpgn_id , ebay_plus_type_desc , item_cndtn_id , item_cndtn_name , own_invtry_ind , pl_flag ) AS matched_spf_rule_str FROM prs_w.fmx_vrtcl_comm_dedup fLEFT JOIN spf_temp s ON f.epn_pblshr_id = s.publisher_id AND f.roi_click_event_dt >= s.start_date AND f.roi_click_event_dt < coalesce (s.end_date, '2099-12-31 00:00:00' ) AND sox_rule_match(json_split(s.adj_rules) , CONCAT_WS(',' ,list_ids) , site_group , numeric1 , coupon_cd , CAST (item_price_amt AS string) , CONCAT(site_id, '-' , meta_categ_id) , customer_status , cmpgn_id , ebay_plus_type_desc , item_cndtn_id , item_cndtn_name , own_invtry_ind , pl_flag ) IS NOT NULL ; CACHE TABLE fmx_spf;
如果是join,就看关联条件epn_pblshr_id和roi_click_event_dt。
熟悉业务的话,很容易猜到,发生倾斜的部分是epn_pblshr_id
进行一下验证:
其中表prs_w.famx_gmb_numeric_snapshot
是这段SQL的主表
对其进行按照publisher_id分组看看数据分布:
1 2 3 4 5 6 7 8 select epn_pblshr_id , count (1 ) as cnt from prs_w.famx_gmb_numeric_snapshotwhere trans_dt between '2021-08-01' and '2021-08-31' group by 1 order by cnt desc ;
运行结果(部分):
no
epn_pblshr_id
cnt
1
5575376664
1554352
2
5575532731
1292900
3
5575133559
1087001
4
5574735181
839112
5
5575403537
595610
6
5574651234
510958
7
5574630565
426040
8
5575403800
405502
9
5574631662
357304
10
5574858753
353657
11
5575407436
341465
12
5575578768
324448
13
5574672411
301772
14
5575086808
270397
15
5574635388
222064
16
5575402240
201475
17
5575322144
197538
18
5575383302
196567
19
5575420559
186998
20
5575400987
173364
21
5574933636
152722
22
5575635657
139925
23
5575082068
110783
24
5575115467
98180
25
5575418208
97703
其中超过10w的只有23个Publisher,超过1w的有124个。
在trans_dt between '2021-08-01' and '2021-08-31'
日期范围内一共有8746个publisher。
8月交易数量14740150条,其中:
超过10w交易量的前23个publisher有10241954条,占比69.4%
超过1w交易量的前124个publisher有13422869条,占比91.0%
换句话说,这前一百多个publisher占据了绝大多数的数据量。
尝试
如上图,1和2是倾斜的数据,3,4,5,6是正常的数据,按照这个原理可以对这些数据进行分别处理。
为了把这些数据量大的publisher给分散到不同的task中处理,可以对倾斜的key进行特殊处理
SQL的逻辑是事实表 左关联合同表 ,取出每条交易符合合同的佣金比率等指标。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 CREATE OR REPLACE TEMPORARY VIEW epn_skewed_pblshr_list AS SELECT epn_pblshr_id FROM prs_w.famx_gmb_numeric_snapshotWHERE trans_dt BETWEEN '${DATA_START_DT}' AND '${DATA_END_DT}' GROUP BY 1 HAVING COUNT (1 ) > 20000 ; CACHE TABLE epn_skewed_pblshr_list; CREATE OR REPLACE TEMPORARY VIEW number_list AS SELECT 0 + idx AS numFROM (SELECT SPLIT(SPACE(98 ), ' ' ) AS x) tLATERAL VIEW POSEXPLODE(x) pe AS idx, ele;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 CREATE OR REPLACE TEMPORARY VIEW spf_temp_skewed AS SELECT CONCAT(t3.num, '_' , t1.publisher_id) AS publisher_id , t1.start_date , t1.end_date , t1.adj_direction , t1.adj_rate , t1.adj_rules , t1.adj_tiers FROM prs_w.epn_pblshr_payout_adj_rules_w t1INNER JOIN epn_skewed_pblshr_list t2ON t1.publisher_id = t2.epn_pblshr_idINNER JOIN number_list t3ON 1 = 1 WHERE mtdt_dt IN (SELECT MAX (mtdt_dt) FROM prs_w.epn_pblshr_payout_adj_rules_w); CREATE OR REPLACE TEMPORARY VIEW spf_temp_unskewed AS SELECT CONCAT(99 , '_' , t1.publisher_id) AS publisher_id , t1.start_date , t1.end_date , t1.adj_direction , t1.adj_rate , t1.adj_rules , t1.adj_tiers FROM prs_w.epn_pblshr_payout_adj_rules_w t1LEFT JOIN epn_skewed_pblshr_list t2ON t1.publisher_id = t2.epn_pblshr_idWHERE t1.mtdt_dt IN (SELECT MAX (mtdt_dt) FROM prs_w.epn_pblshr_payout_adj_rules_w)AND t2.epn_pblshr_id IS NULL ; CREATE OR REPLACE TEMPORARY VIEW spf_temp AS SELECT publisher_id , start_date , end_date , adj_direction , adj_rate , adj_rules , adj_tiers FROM spf_temp_skewedUNION SELECT publisher_id , start_date , end_date , adj_direction , adj_rate , adj_rules , adj_tiers FROM spf_temp_unskewed;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 CREATE OR REPLACE TEMPORARY VIEW famx_gmb_numeric_snapshot_v AS SELECT f.click_event_dt , f.item_id , f.ck_trans_id , CASE WHEN k.epn_pblshr_id IS NOT NULL THEN CONCAT(CAST (RAND(1 )* 99 AS INT ), '_' , f.epn_pblshr_id) ELSE CONCAT(99 , '_' , f.epn_pblshr_id) END AS epn_pblshr_id , f.sap_categ_id , f.gmb_usd_amt , f.site_id , f.meta_categ_id , f.meta_categ_name , f.vertical_name , f.gmb_lc_amt , f.gmb_plan_rate_amt , f.item_price_amt , f.refund_ind , f.site_group , f.cntry_desc , f.created_time , f.roi_click_event_dt , f.customer_status , f.list_ids , f.coupon_cd , f.coupon_usd_amt , f.subsidy_ind , f.charity_ind , f.numeric1 , f.own_prchs_ind , f.item_cndtn_id , f.item_cndtn_name , f.own_invtry_ind , f.gmb_non_vat_usd_amt , f.vat_gmb_amt , f.pl_flag , f.trans_dt FROM prs_w.famx_gmb_numeric_snapshot fLEFT JOIN epn_skewed_pblshr_list kON f.epn_pblshr_id = k.epn_pblshr_idWHERE f.trans_dt BETWEEN '${DATA_START_DT}' AND '${DATA_END_DT}' ;
这样处理之后famx_gmb_numeric_snapshot_v
的数据量没有变多,但是publisher_id分布的均匀了,而需要左关联的合同表spf_temp
这交易量超过20000条的这些publisher,合同记录被扩大了99倍。
以空间换时间,join
和group by
的性能会得到一定的提升。
(最后经过验证,这样操作并不影响数据的准确性。)
优化后的结果
Metric
Min
25th percentile
Median
75th percentile
Max
Duration
27 s
34 s
38 s
42 s
1.1 min
GC Time
0.4 s
1 s
2 s
2 s
7 s
Input Size / Records
832.2 KB / 7110
856.0 KB / 7308
862.4 KB / 7365
868.9 KB / 7425
892.3 KB / 7650
Shuffle Write Size / Records
58.0 B / 1
59.0 B / 1
59.0 B / 1
59.0 B / 1
59.0 B / 1
缺点
代码会变的复杂,不利于阅读和理解,建议加上清晰的注释。
后续维护麻烦,如果有新的feature开发,修改的地方会变多。
总结
实际上,在进行笛卡尔积扩容,打随机前缀等操作也会产生一定的性能损耗,如果节省的时间 和消耗的时间 差不多的话,就没有必要做这个优化。如果这个优化减少的时间远大于消耗的时间,则可以考虑做这个优化。
优化需要根据集群性能,业务逻辑,数据量,多方面来综合考量。
从Spark3.0开始,原生支持倾斜join了,由于公司的Spark还未升级到Spark 3版本,上述优化是非常有效的。
https://spark.apache.org/docs/3.1.2/sql-performance-tuning.html#optimizing-skew-join
属性名
默认值
开始适用的版本
spark.sql.adaptive.skewJoin.enabled
true
3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor
10
3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
256MB
3.0.0