数据倾斜调优尝试

在数据处理中,有时候会遇到数据倾斜问题,导致任务运行时间过长,尝试调整一些Spark参数,但是效果不一定明显。那么就可能需要从业务逻辑上进行优化。

  1. 那么如何判断是否有数据倾斜?

可以通过Spark后台的历史运行日志来看。

我们找到实际运行时间最长的Job

点进看详细的运行信息

如图:

这个Stage运行时长16min,一共有2001个task,其中duration代表了实际运行时间。

这个task重点可以看Duration和Shuffle Read Size

Metric Min 25th percentile Median 75th percentile Max
Duration 0 ms 0.2 s 0.8 s 1 s 2.3 h
GC Time 0 ms 0 ms 0 ms 0.2 s 2.3 min
Shuffle Read Size / Records 0.0 B / 0 7.8 KB / 85 42.2 KB / 161 241.3 KB / 599 357.3 MB / 1555208
Shuffle Write Size / Records 0.0 B / 0 59.0 B / 1 59.0 B / 1 59.0 B / 1 59.0 B / 1

解读:按照Metric的量进行排序,数据在5个采样点(0%,25%,50%,75%,100%)的分布情况。

我们可以认为这个stage就发生了数据倾斜。

  1. 如何优化数据倾斜?

首先需要判断发生倾斜的部分在哪一段,这样才可以更好的应对。
还是这个job,在Sprak运行后台找到Duration最长的stage

根据describe中的代码片段找到具体这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
CREATE OR REPLACE TEMPORARY VIEW fmx_spf AS
SELECT
f.*
, s.adj_direction
, s.adj_rate
, CASE WHEN s.adj_direction = 'INCREASE' THEN 1 + CAST(s.adj_rate AS DECIMAL(8, 2))/100
WHEN s.adj_direction = 'DECREASE' THEN 1 - CAST(s.adj_rate AS DECIMAL(8, 2))/100
ELSE 1
END AS spf_rate
, s.adj_tiers
, sox_rule_match(json_split(s.adj_rules)
, CONCAT_WS(',', list_ids)
, site_group
, numeric1
, coupon_cd
, CAST(item_price_amt AS string)
, CONCAT(site_id, '-', meta_categ_id)
, customer_status
, cmpgn_id
, ebay_plus_type_desc
, item_cndtn_id
, item_cndtn_name
, own_invtry_ind
, pl_flag
) AS matched_spf_rule_str
FROM prs_w.fmx_vrtcl_comm_dedup f
LEFT JOIN spf_temp s
ON f.epn_pblshr_id = s.publisher_id
AND f.roi_click_event_dt >= s.start_date AND f.roi_click_event_dt < coalesce(s.end_date, '2099-12-31 00:00:00')
AND sox_rule_match(json_split(s.adj_rules)
, CONCAT_WS(',',list_ids)
, site_group
, numeric1
, coupon_cd
, CAST(item_price_amt AS string)
, CONCAT(site_id, '-', meta_categ_id)
, customer_status
, cmpgn_id
, ebay_plus_type_desc
, item_cndtn_id
, item_cndtn_name
, own_invtry_ind
, pl_flag
) IS NOT NULL
;

CACHE TABLE fmx_spf;

如果是join,就看关联条件epn_pblshr_id和roi_click_event_dt。

熟悉业务的话,很容易猜到,发生倾斜的部分是epn_pblshr_id

进行一下验证:

其中表prs_w.famx_gmb_numeric_snapshot 是这段SQL的主表

对其进行按照publisher_id分组看看数据分布:

1
2
3
4
5
6
7
8
select
epn_pblshr_id
, count(1) as cnt
from prs_w.famx_gmb_numeric_snapshot
where trans_dt between '2021-08-01' and '2021-08-31'
group by 1
order by cnt desc
;

运行结果(部分):

no epn_pblshr_id cnt
1 5575376664 1554352
2 5575532731 1292900
3 5575133559 1087001
4 5574735181 839112
5 5575403537 595610
6 5574651234 510958
7 5574630565 426040
8 5575403800 405502
9 5574631662 357304
10 5574858753 353657
11 5575407436 341465
12 5575578768 324448
13 5574672411 301772
14 5575086808 270397
15 5574635388 222064
16 5575402240 201475
17 5575322144 197538
18 5575383302 196567
19 5575420559 186998
20 5575400987 173364
21 5574933636 152722
22 5575635657 139925
23 5575082068 110783
24 5575115467 98180
25 5575418208 97703

其中超过10w的只有23个Publisher,超过1w的有124个。

trans_dt between '2021-08-01' and '2021-08-31'日期范围内一共有8746个publisher。

8月交易数量14740150条,其中:

超过10w交易量的前23个publisher有10241954条,占比69.4%

超过1w交易量的前124个publisher有13422869条,占比91.0%

换句话说,这前一百多个publisher占据了绝大多数的数据量。

  1. 尝试

如上图,1和2是倾斜的数据,3,4,5,6是正常的数据,按照这个原理可以对这些数据进行分别处理。

为了把这些数据量大的publisher给分散到不同的task中处理,可以对倾斜的key进行特殊处理

SQL的逻辑是事实表左关联合同表,取出每条交易符合合同的佣金比率等指标。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- PART 1: 准备
-- 取出交易量大于20000的publisher.
CREATE OR REPLACE TEMPORARY VIEW epn_skewed_pblshr_list AS
SELECT
epn_pblshr_id
FROM prs_w.famx_gmb_numeric_snapshot
WHERE trans_dt BETWEEN '${DATA_START_DT}' AND '${DATA_END_DT}'
GROUP BY 1
HAVING COUNT(1) > 20000
;

CACHE TABLE epn_skewed_pblshr_list;

-- 生成一个0-98的列,下面方便进行笛卡尔积扩容.
CREATE OR REPLACE TEMPORARY VIEW number_list AS
SELECT
0+idx AS num
FROM (SELECT SPLIT(SPACE(98), ' ') AS x) t
LATERAL VIEW POSEXPLODE(x) pe AS idx, ele
;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
-- PART 2: 处理右表
-- 合同表做两部分处理:
-- 倾斜部分(交易量很大的publisher)进行笛卡尔积 -- 扩容99倍,这部分数据,每一个publisher的id前都加上0-98的前缀
CREATE OR REPLACE TEMPORARY VIEW spf_temp_skewed AS
SELECT
CONCAT(t3.num, '_', t1.publisher_id) AS publisher_id
, t1.start_date
, t1.end_date
, t1.adj_direction
, t1.adj_rate
, t1.adj_rules
, t1.adj_tiers
FROM prs_w.epn_pblshr_payout_adj_rules_w t1
INNER JOIN epn_skewed_pblshr_list t2
ON t1.publisher_id = t2.epn_pblshr_id
INNER JOIN number_list t3
ON 1 = 1
WHERE mtdt_dt IN (SELECT MAX(mtdt_dt) FROM prs_w.epn_pblshr_payout_adj_rules_w)
;

-- 非倾斜部分(交易量<20000 的publisher) --为了保持id一致,这部分打上99的前缀
CREATE OR REPLACE TEMPORARY VIEW spf_temp_unskewed AS
SELECT
CONCAT(99, '_', t1.publisher_id) AS publisher_id
, t1.start_date
, t1.end_date
, t1.adj_direction
, t1.adj_rate
, t1.adj_rules
, t1.adj_tiers
FROM prs_w.epn_pblshr_payout_adj_rules_w t1
LEFT JOIN epn_skewed_pblshr_list t2
ON t1.publisher_id = t2.epn_pblshr_id
WHERE t1.mtdt_dt IN (SELECT MAX(mtdt_dt) FROM prs_w.epn_pblshr_payout_adj_rules_w)
AND t2.epn_pblshr_id IS NULL
;

-- 这两部分union到一起,之所以不用nuion all,是因为这两部分不会重复,所以不需要再进行去重
CREATE OR REPLACE TEMPORARY VIEW spf_temp AS
SELECT
publisher_id
, start_date
, end_date
, adj_direction
, adj_rate
, adj_rules
, adj_tiers
FROM spf_temp_skewed
UNION
SELECT
publisher_id
, start_date
, end_date
, adj_direction
, adj_rate
, adj_rules
, adj_tiers
FROM spf_temp_unskewed
;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
-- PART 3: 处理左表
-- 交易明细表部分
-- 左关联倾斜publishr的list,如果关联上则说明是倾斜的交易,打上一个随机(0-98)的前缀,如果没有关联上则打99的前缀
CREATE OR REPLACE TEMPORARY VIEW famx_gmb_numeric_snapshot_v AS
SELECT
f.click_event_dt
, f.item_id
, f.ck_trans_id
, CASE WHEN k.epn_pblshr_id IS NOT NULL THEN CONCAT(CAST(RAND(1)*99 AS INT), '_', f.epn_pblshr_id) ELSE CONCAT(99, '_', f.epn_pblshr_id) END AS epn_pblshr_id
, f.sap_categ_id
, f.gmb_usd_amt
, f.site_id
, f.meta_categ_id
, f.meta_categ_name
, f.vertical_name
, f.gmb_lc_amt
, f.gmb_plan_rate_amt
, f.item_price_amt
, f.refund_ind
, f.site_group
, f.cntry_desc
, f.created_time
, f.roi_click_event_dt
, f.customer_status
, f.list_ids
, f.coupon_cd
, f.coupon_usd_amt
, f.subsidy_ind
, f.charity_ind
, f.numeric1
, f.own_prchs_ind
, f.item_cndtn_id
, f.item_cndtn_name
, f.own_invtry_ind
, f.gmb_non_vat_usd_amt
, f.vat_gmb_amt
, f.pl_flag
, f.trans_dt
FROM prs_w.famx_gmb_numeric_snapshot f
LEFT JOIN epn_skewed_pblshr_list k
ON f.epn_pblshr_id = k.epn_pblshr_id
WHERE f.trans_dt BETWEEN '${DATA_START_DT}' AND '${DATA_END_DT}'
;

这样处理之后famx_gmb_numeric_snapshot_v 的数据量没有变多,但是publisher_id分布的均匀了,而需要左关联的合同表spf_temp 这交易量超过20000条的这些publisher,合同记录被扩大了99倍。

以空间换时间,joingroup by的性能会得到一定的提升。

(最后经过验证,这样操作并不影响数据的准确性。)

  1. 优化后的结果
Metric Min 25th percentile Median 75th percentile Max
Duration 27 s 34 s 38 s 42 s 1.1 min
GC Time 0.4 s 1 s 2 s 2 s 7 s
Input Size / Records 832.2 KB / 7110 856.0 KB / 7308 862.4 KB / 7365 868.9 KB / 7425 892.3 KB / 7650
Shuffle Write Size / Records 58.0 B / 1 59.0 B / 1 59.0 B / 1 59.0 B / 1 59.0 B / 1
  1. 缺点

    1. 代码会变的复杂,不利于阅读和理解,建议加上清晰的注释。

    2. 后续维护麻烦,如果有新的feature开发,修改的地方会变多。

  2. 总结

实际上,在进行笛卡尔积扩容,打随机前缀等操作也会产生一定的性能损耗,如果节省的时间消耗的时间差不多的话,就没有必要做这个优化。如果这个优化减少的时间远大于消耗的时间,则可以考虑做这个优化。

优化需要根据集群性能,业务逻辑,数据量,多方面来综合考量。

从Spark3.0开始,原生支持倾斜join了,由于公司的Spark还未升级到Spark 3版本,上述优化是非常有效的。

https://spark.apache.org/docs/3.1.2/sql-performance-tuning.html#optimizing-skew-join

|属性名| 默认值| 开始适用的版本|
|::|::|::|
|spark.sql.adaptive.skewJoin.enabled |true| 3.0.0|
|spark.sql.adaptive.skewJoin.skewedPartitionFactor| 10| 3.0.0|
|spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |256MB| 3.0.0|