Spark性能調(diào)優(yōu)——如何將運行時間從10小時以上壓縮到30分鐘以內(nèi)
火花Logo
情景🔍我们对两个数据集中的一个整数列进行了简单的连接操作,其中一个数据集包含19 GB,另一个包含16 GB,这两个数据集都是 Snappy 压缩的 Parquet 格式数据。
df_1 -> 19 GB -> 1.82亿条记录数
df_2 ->16 GB-> 1.62亿条
现在,我知道你们可能会问,等等,这些数据量是不是相对较少,而之前连接这些数据需要超过10小时?没错!为什么呢?让我们来弄明白吧。
集群详情: 💻注意:即使启用了AQE功能,它还是慢得让人抓狂。
我们有一个EMR,它有4个_r5dn.16xlarge_节点。每个节点都有64个虚拟内核、488GB内存和2400GB SSD。我们来计算一下集群的总规模。
*总vCPU核数 = 4 64 = 256**
*总计内存 = 4 488 = 1952 GB 约 1.9TB**
*总计SSD容量 = 4 2400 GB = 9600 GB 约 9.3TB**
我知道你在想什么,我真是一个糟糕的数据工程师,谁会用1.9TB内存集群来处理40-45GB的数据。好吧,后续步骤确实有数TB的数据,但这次连接的数据集相对较小。实际上,在后续步骤中我们处理的是7-8TB的数据。
为什么这个合并操作要花这么久的时间?
首先,让我们来看看这个连接的 PySpark 代码
df_1 = spark.read.load("abc").select("id", "col_a", "col_b") # 15GB
df_2 = spark.read.load("xyz").select("id", "col_c") # 6GB
df_join = df_1.join(df_2, "id", "inner") # 将两个DataFrame通过"id"列进行内连接操作
df_join.write.parquet(path) # 将结果写入Parquet格式的文件中
这个加入非常简单。现在我们来看看这个‘id’字段的特点。
来自两个数据表的样本记录
这里出了什么问题吗?🧐 一切都好像挺好的,不是吗?哎,并不像看起来那么简单!
问题!!!数学根本不像在做数学!
检查好了
这两个数据集都能轻松放进内存里
这只是一个内连接,更不用说是交叉连接。
那问题出在哪里呢?好吧,我们忘了这个小家伙了,也是数据工程师们最头疼的敌人,即SKEW。
数据工程师面对倾斜数据时
是的,数据有所偏斜。如果你检查数据,“id”字段的偏斜特别明显,这是我们用来连接的键。所以,比如,在第一张表中,每10条记录里有7条的值是1。这就造成了偏斜,只有少数任务会卡住不动,因为两张表中所有“id”为1的记录都处理在同一个执行器里,这造成了瓶颈。
数据在id字段上的分布严重失衡
让我们假设我们有三个执行者,执行者1获取所有id值为1的记录,执行者2获取所有id值为2的记录,等等……。两张表中的记录分别由各个执行者处理。
洗牌后,
执行者1共有10条记录(其中7条来自df_1,3条来自df_2)
程序2有4条记录
执行者 3 的 2 条记录,
现在,Executor 1 只有 10 条记录,看起来似乎很少。但想象一下,在实际情况中,Executor 1 有 1000 万 条记录,而其他 Executor 只有 1000 条记录。这就是我们所说的偏差。
观察Spark UI的阶段时间轴,我们就能看出是否有任何数据倾斜。
可能有一项或几项任务会比其他任务花很长时间。
上面的图显示数据分布不当,意味着少数几个核心承担了过多的记录处理工作。
不过,为什么是10多个小时?我们97%的任务 都会在20到30分钟内完成。只有剩下的 3%的任务 才需要运行 9到10个小时。因为这些3到4个任务会对数百万条记录进行连接匹配操作,两张表中的相同id值有数百万条记录。想象一下,单个核心试图将表1中id值为1的5百万条记录与表2中id值也为1的3百万条记录进行连接。
所以,那个核心正在为 ID 值为 '1' 的情况准备 1500 万条记录(即 5 乘以 3 等于 15 百万条记录)。相比之下,其他执行核心只需处理几千条记录即可完成任务。
要怎么才能均匀地分发这些数据?🤔有的是。那就是撒盐!🧂
放盐!
加盐是一种技术,通过在现有数字中加入少量随机数字(称为盐),并将这些处理后的数字用作连接键,这样其中值为“1”的记录就会分布在多个执行器中。
但是怎么做呢?我们一步步来,我会详细告诉你每一步怎么做。
第一步:我们选择一个盐值范围(0到X),称作salt_num。
步骤2: 在大数据集里,我们添加一个名为 id_salted 的列,该列的值将是通过将id与一个随机数(范围在0到saltnumber-1之间)拼接而成,中间用下划线分隔,即 **CONCAT(id, ‘’, random(0,salt_number-1))**。
步骤3:爆破较小的那个数据集,使其包含所有来自盐编号值0到X的记录组合。这张表将有*N盐编号值**条记录。
步骤4:将它与新的 ‘id_salted’ 连接起来
让我们假设_saltnum=4,看看这两个数据集会是怎样的。
在大规模数据集(df_1)中将增加一列,而小数据集(df_2)的数据将会展开后的数据。
请观察那个大的表并没有数据膨胀,只是多了一个一个额外的字段,里面包含随机生成的盐值。而小表的数据则膨胀成了(记录数乘以盐值数量),即每个记录会被复制盐值数量次。
现在,id 值为 1 的 7 条记录不会被分配到同一个执行器,而是会被分配到 4 个不同的执行器(因为我们有 4 个不同的 id_salted 值)。
所以,我们不再让单个执行者负担过重,而是使用了四个执行者,并且是并行工作的。
我们现在来看看代码实现
import pyspark.sql.functions as F
salt_num = 4
# 对 df_1 - 大表 添加盐值
df_1_salted_tmp = df_1.withColumn("salt", F.floor(F.rand()*salt_num).cast('int'))
df_1_salted = df_1_salted_tmp.withColumn("id_salted", F.concat(F.col('id'),F.lit('_'),F.col('salt'))).drop('salt')
## 接下来,我们对 df_2 - 小表 添加盐值
df_salt_range = spark.range(salt_num).toDF("salt") ## 因为 salt_num 是 4,创建一个包含4条记录的DF,范围从0到3
df_2_salted_tmp = df_2.crossJoin(df_salt_range) ## 小表进行交叉连接
df_2_salted = df_2_salted_tmp.withColumn("id_salted", F.concat(F.col('id'),F.lit('_'),F.col('salt'))).drop('salt')
## 按 id_salted 列连接
df_join = df_1_salted.join(df_2_salted, "id_salted").drop("id_salted")
df_join.write.parquet(path) ## 将结果保存为 parquet 文件
而且就这样了!!! 我们成功优化了这个连接 👏
这是Spark UI现在看起来是这样。
所有任务几乎都用了差不多的时间!(示例图片,仅供参考)
如何决定加多少盐❓好吧,这里并没有一个完美的x=a+b公式。这只是个粗略的估计,有时,我们可能得试试不同的盐数。
但一般来说,我们需要找到平衡点。
可用的核心数。
✔️复制这张num_salt次数能放得进内存里。
例如,在我们之前提到的集群中,有256个VCores和足够的RAM。因此,我们可以将salt_num设为10。因此,小表的总记录数为1.62亿乘以10,也就是16.2亿,这很容易处理。
注意⚠️🚨虽然加盐是一种很好的方法,但它应该被认真对待。 lol。也就是说,如果我们在不适当的地方使用加盐,或者使用一个非常高的盐量,这样会破坏整个流程,甚至使情况变得更糟。重要的是做一些简单的计算,决定加盐是否真正解决问题,以及理想的盐量是多少。
💭额外的思考内容实际上,在实际部署时,我们将数据分成了两部分。一部分是id的记录数分布几乎均衡。另一部分则是存在偏差,我们仅在这部分偏差的记录集上应用了salting。因此,我们只是在表的一部分上应用了salting,而不是在整个表上。例如,大约只对1百万条记录进行了处理。
所以我们选择了一个 盐值为100的高 数字,这将产生 一亿条 记录,并且我们的集群可以轻松处理这些数据。
在生产过程中,带有盐处理的倾斜部分运行了20分钟,而无盐处理的数据(直接按ID进行连接)运行了12分钟。因此,总耗时大约为32分钟。
耶!我们把运行时间从10多个小时缩短到了大约30分钟,真是太棒了!
总结当数据高度倾斜时,salting真的对我们很有帮助。不仅在连接操作时,在任何聚合查询,例如groupBy中也可以使用salting。
虽然这样很好,但必须小心不要让数据膨胀得太厉害,以免数据过多引发问题。
谢谢读到最后。如果你喜欢,可以考虑关注我一下。
如果你想通过我的 LinkedIn 与我连接,可以在这里找到我的个人资料链接 -> https://www.linkedin.com/in/praveen-kumar-bn/
Praveen讲解数据 #大资料 #数据工程 #Spark #PySpark共同學習,寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章