7.3 Hash Based Shuffle
本节讲解Hash Based Shuffle,包括Hash Based Shuffle概述、Hash Based Shuffle内核、Hash Based Shuffle的数据读写的源码解析等内容。
7.3.1 概述
在Spark 1.1之前,Spark中只实现了一种Shuffle方式,即基于Hash的Shuffle。在Spark 1.1版本中引入了基于Sort的Shuffle实现方式,并且在Spark 1.2版本之后,默认的实现方式从基于Hash的Shuffle,修改为基于Sort的Shuffle实现方式,即使用的ShuffleManager从默认的hash修改为sort。说明在Spark 2.0版本中,Hash的Shuffle方式已经不再使用。
Spark之所以一开始就提供基于Hash的Shuffle实现机制,其主要目的之一就是为了避免不需要的排序(这也是Hadoop Map Reduce被人诟病的地方,将Sort作为固定步骤,导致许多不必要的开销)。但基于Hash的Shuffle实现机制在处理超大规模数据集的时候,由于过程中会产生大量的文件,导致过度的磁盘I/O开销和内存开销,会极大地影响性能。
但在一些特定的应用场景下,采用基于Hash的实现Shuffle机制的性能会超过基于Sort的Shuffle实现机制。关于基于Hash与基于Sort的Shuffle实现机制的性能测试方面,可以参考Spark创始人之一的ReynoldXin给的测试:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost all of our testing”。
相关数据可以参考https://issues.apache.org/jira/browse/SPARK-3280。
因此,在Spark 1.2版本中修改为默认基于Sort的Shuffle实现机制时,同时也给出了特定应用场景下回退的机制。
7.3.2 Hash Based Shuffle内核
1.基于Hash的Shuffle实现机制的内核框架
基于Hash的Shuffle实现,ShuffleManager的具体实现子类为HashShuffleManager,对应的具体实现机制如图7-3所示。
图7-3 基于哈希算法的Shuffle实现机制的内核框架
其中,HashShuffleManager是ShuffleManager的基于哈希算法实现方式的具体实现子类。数据块的读写分别由BlockStoreShuffleReader与HashShuffleWriter实现;数据块的文件解析器则由具体子类FileShuffleBlockResolver实现;BaseShuffleHandle是ShuffleHandle接口的基本实现,保存Shuffle注册的信息。
HashShuffleManager继承自ShuffleManager,对应实现了各个抽象接口。基于Hash的Shuffle,内部使用的各组件的具体子类如下所示。
(1)BaseShuffleHandle:携带了Shuffle最基本的元数据信息,包括shuffleId、numMaps和dependency。
(2)BlockStoreShuffleReader:负责写入的Shuffle数据块的读操作。
(3)FileShuffleBlockResolver:负责管理,为Shuffle任务分配基于磁盘的块数据的Writer。每个ShuffleShuffle任务为每个Reduce分配一个文件。
(4)HashShuffleWriter:负责Shuffle数据块的写操作。
在此与解析整个Shuffle过程一样,以HashShuffleManager类作为入口进行解析。
首先看一下HashShuffleManager具体子类的注释,如下所示。
Spark 1.6.0版本的HashShuffleManager.scala的源码(Spark 2.4版本已无HashShuffleManager方式)如下:
2.基于Hash的Shuffle实现方式一
为了避免Hadoop中基于Sort方式的Shuffle所带来的不必要的排序开销,Spark在开始时采用了基于Hash的Shuffle方式。但这种方式存在不少缺陷,这些缺陷大部分是由于在基于Hash的Shuffle实现过程中创建了太多的文件所造成的。在这种方式下,每个Mapper端的Task运行时都会为每个Reduce端的Task生成一个文件,具体如图7-4所示。
图7-4 基于Hash的Shuffle实现方式——文件的输出细节图
Executor-Mapper表示执行Mapper端的Tasks的工作点,可以分布到集群中的多台机器节点上,并且可以以不同的形式出现,如以Spark Standalone部署模式中的Executor出现,也可以以Spark On Yarn部署模式中的容器形式出现,关键是它代表了实际执行Mapper端的Tasks的工作点的抽象概念。其中,M表示Mapper端的Task的个数,R表示Reduce端的Task的个数。
对应在右侧的本地文件系统是在该工作点上所生成的文件,其中R表示Reduce端的分区个数。生成的文件名格式为:shuffle_shuffleId_mapId_reduceId,其中的shuffle_shuffleId_1_1表示mapId为1,同时reduceId也为1。
在Mapper端,每个分区对应启动一个Task,而每个Task会为每个Reducer端的Task生成一个文件,因此最终生成的文件个数为M×R。
由于这种实现方式下,对应生成文件个数仅与Mapper端和Reducer端各自的分区数有关,因此图中将Mapper端的全部M个Task抽象到一个Executor-Mapper中,实际场景中通常是分布到集群中的各个工作点中。
生成的各个文件位于本地文件系统的指定目录中,该目录地址由配置属性spark.local.dir设置。说明:分区数与Task数,一个是静态的数据分块个数,一个是数据分块对应执行的动态任务个数,因此,在特定的、描述个数的场景下,两者是一样的。
3.基于Hash的Shuffle实现方式二
为了减少Hash所生成的文件个数,对基于Hash的Shuffle实现方式进行了优化,引入文件合并的机制,该机制设置的开关为配置属性spark.shuffle.consolidateFiles。在引入文件合并的机制后,当设置配置属性为true,即启动文件合并时,在Mapper端的输出文件会进行合并,在一定程度上可以大量减少文件的生成,降低不必要的开销。文件合并的实现方式可以参考图7-5。
图7-5 基于Hash的Shuffle的合并文件机制的输出细节图
Executor-Mapper表示集群中分配的某个工作点,其中,C表示在该工作点上所分配到的内核(Core)个数,T表示在该工作点上为每个Task分配的内核个数。C/T表示在该工作点上调度时最大的Task并行个数。
右侧的本地文件系统是在该工作点上所生成的文件,其中R表示Reduce端的分区个数。生成的文件名格式为:merged_shuffle_shuffleId_bucketId_fileId,其中的merged_shuffle_shuffleId_1_1表示bucketId为1,同时fileId也为1。
在Mapper端,Task会复用文件组,由于最大并行个数为C/T,因此文件组最多分配C/T个,当某个Task运行结束后,会释放该文件组,之后调度的Task则复用前一个Task所释放的文件组,因此会复用同一个文件。最终在该工作点上生成的文件总数为C/T*R,如果设工作点个数为E,则总的文件数为E*C/T*R。
4.基于Hash的Shuffle机制的优缺点
1)优点
可以省略不必要的排序开销。
避免了排序所需的内存开销。
2)缺点
生成的文件过多,会对文件系统造成压力。
大量小文件的随机读写会带来一定的磁盘开销。
数据块写入时所需的缓存空间也会随之增加,会对内存造成压力。
7.3.3 Hash Based Shuffle数据读写的源码解析
1.基于Hash的Shuffle实现方式一的源码解析
下面针对Spark 1.6版本中的基于Hash的Shuffle实现在数据写方面进行源码解析(Spark2.0版本中已无Hash的Shuffle实现方式)。在基于Hash的Shuffle实现机制中,采用HashShuffleWriter作为数据写入器。在HashShuffleWriter中控制Shuffle写数据的关键代码如下所示。
Spark 1.6.0版本的HashShuffleWriter.scala的源码(Spark 2.4版本已无HashShuffle-Manager方式)如下:
当需要在Map端进行聚合时,使用的是聚合器(Aggregator)的combineValuesByKey方法,在该方法中使用ExternalAppendOnlyMap类对记录集进行处理,处理时如果内存不足,会引发Spill操作。早期的实现会直接缓存到内存,在数据量比较大时容易引发内存泄漏。
在HashShuffleManager中,ShuffleBlockResolver特质使用的具体子类为FileShuffleBlock-Resolver,即指定了具体如何从一个逻辑Shuffle块标识信息来获取一个块数据,对应为下面第7行调用的forMapTask方法。
Spark 1.6.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.4版本已无HashShuffleManager方式)如下:
其中,ShuffleBlockId实例构建的源码如下:
从name方法的重载上可以看出,后续构建的文件与代码中的mapId、reduceId的关系。当然,所有同一个Shuffle的输出数据块,都会带上shuffleId这个唯一标识的,因此全局角度上,逻辑数据块name不会重复(针对一些推测机制或失败重试机制之类的场景而已,逻辑name没有带上时间信息,因此缺少多次执行的输出区别,但在管理这些信息时会维护一个时间作为有效性判断)。
2.基于Hash的Shuffle实现方式二的源码解析
下面通过详细解析FileShuffleBlockResolver源码来加深对文件合并机制的理解。
由于在Spark 1.6中,文件合并机制已经删除,因此下面基于Spark 1.5版本的代码对文件合并机制的具体实现细节进行解析。以下代码位于FileShuffleBlockResolver类中。
合并机制的关键控制代码如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.4版本已无HashShuffleManager方式)如下:
其中,第10行中的consolidateShuffleFiles变量,是判断是否设置了文件合并机制,当设置consolidateShuffleFiles为true后,会继续调用getUnusedFileGroup方法,在该方法中会获取未使用的文件组,即重新分配或已经释放可以复用的文件组。
获取未使用的文件组(ShuffleFileGroup)的相关代码getUnusedFileGroup如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.4版本已无HashShuffleManager方式)如下:
其中,第13行代码对应生成的文件名,即物理文件名,相关代码如下所示。
Spark 1.5.0版本的FileShuffleBlockResolver.scala的源码(Spark 2.4版本已无HashShuffleManager方式)如下:
可以看到,与未使用文件合并时的基于Hash的Shuffle实现方式不同的是,在生成的文件名中没有对应的mapId,取而代之的是与文件组相关的fileId,而fileId则是多个Mapper端的Task所共用的,在此仅从生成的物理文件名中也可以看出文件合并的某些实现细节。
另外,对应生成的文件组既然是复用的,当一个Mapper端的Task执行结束后,便会释放该文件组(ShuffleFileGroup),之后继续调度时便会复用该文件组。对应地,调度到某个Executor工作点上同时运行的Task最大个数,就对应了最多分配的文件组个数。
而在TaskSchedulerImpl调度Task时,各个Executor工作点上Task调度控制的源码说明了在各个Executor工作点上调度并行的Task数,具体代码如下所示。
Spark 2.2.1版本的TaskSchedulerImpl.scala的源码如下:
Spark 2.4.3版本的TaskSchedulerImpl.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第6行之后新增一个参数addressesWithDescs。
其中,设置每个Task所需的内核个数的配置属性如下:
对于这些会影响Executor中并行执行的任务数的配置信息,设置时需要多方面考虑,包括内核个数与任务个数的合适比例,在内存模型中,为任务分配内存的具体策略等。任务分配内存的具体策略可以参考Spark官方给出的具体设计文档,以及文档中各种设计方式的权衡等内容。