7.5 Tungsten Sorted Based Shuffle
本节讲解Tungsten Sorted Based Shuffle,包括Tungsten Sorted Based Shuffle概述、Tungsten Sorted Based内核、Tungsten Sorted Based数据读写的源码解析等内容。
7.5.1 概述
基于Tungsten Sort的Shuffle实现机制主要是借助Tungsten项目所做的优化来高效处理Shuffle。
Spark提供了配置属性,用于选择具体的Shuffle实现机制,但需要说明的是,虽然默认情况下Spark默认开启的是基于Sort的Shuffle实现机制(对应spark.shuffle.manager的默认值),但实际上,参考Shuffle的框架内核部分可知基于Sort的Shuffle实现机制与基于Tungsten Sort的Shuffle实现机制都是使用SortShuffleManager,而内部使用的具体的实现机制,是通过提供的两个方法进行判断的。对应非基于Tungsten Sort时,通过SortShuffleWriter.shouldBypassMergeSort方法判断是否需要回退到Hash风格的Shuffle实现机制,当该方法返回的条件不满足时,则通过SortShuffleManager.canUseSerializedShuffle方法判断是否需要采用基于Tungsten Sort的Shuffle实现机制,而当这两个方法返回都为false,即都不满足对应的条件时,会自动采用常规意义上的基于Sort的Shuffle实现机制。
因此,当设置了spark.shuffle.manager=tungsten-sort时,也不能保证就一定采用基于Tungsten Sort的Shuffle实现机制。有兴趣的读者可以参考Spark 1.5及之前的注册方法的实现,该实现中SortShuffleManager的注册方法仅构建了BaseShuffleHandle实例,同时对应的getWriter中也只对应构建了BaseShuffleHandle实例。
7.5.2 Tungsten Sorted Based Shuffle内核
基于Tungsten Sort的Shuffle实现机制的入口点仍然是SortShuffleManager类,与同样在SortShuffleManager类控制下的其他两种实现机制不同的是,基于Tungsten Sort的Shuffle实现机制使用的ShuffleHandle与ShuffleWriter分别为SerializedShuffleHandle与UnsafeShuffleWriter。因此,对应的具体实现机制如图7-12所示。
图7-12 基于TungstenSort的Shuffle实现机制的框架类图
在Sorted Based Shuffle中,SortShuffleManager根据内部采用的不同实现细节,分别给出两种排序模式,而基于TungstenSort的Shuffle实现机制对应的就是序列化排序模式。
从图7-12中可以看到基于Sort的Shuffle实现机制,具体的写入器的选择与注册得到的ShuffleHandle类型有关,参考SortShuffleManager类的registerShuffle方法。
registerShuffle方法中会判断是否满足序列化模式的条件,如果满足,则使用基于TungstenSort的Shuffle实现机制,对应在代码中,表现为使用类型为SerializedShuffleHandle的ShuffleHandle。上述代码进一步说明了在spark.shuffle.manager设置为sort时,内部会自动选择具体的实现机制。对应代码的先后顺序,就是选择的先后顺序。
对应的序列化排序(Serialized sorting)模式需要满足的条件如下所示。
(1)Shuffle依赖中不带聚合操作或没有对输出进行排序的要求。
(2)Shuffle的序列化器支持序列化值的重定位(当前仅支持KryoSerializer以及Spark SQL子框架自定义的序列化器)。
(3)Shuffle过程中的输出分区个数少于16 777 216个。
实际上,使用过程中还有其他一些限制,如引入那个Page形式的内存管理模型后,内部单条记录的长度不能超过128MB(具体内存模型可以参考PackedRecordPointer类)。另外,分区个数的限制也是该内存模型导致的(同样参考PackedRecordPointer类)。
所以,目前使用基于TungstenSort的Shuffle实现机制条件还是比较苛刻的。
7.5.3 Tungsten Sorted Based Shuffle数据读写的源码解析
对应这种SerializedShuffleHandle及其相关的Shuffle数据写入器类型的相关代码,可以参考SortShuffleManager类的getWriter方法。
SortShuffleManager.scala的源码如下:
数据写入器类UnsafeShuffleWriter中使用SortShuffleManager实例中的变量shuffleBlockResolver来对逻辑数据块与物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用的IndexShuffleBlockResolver。
UnsafeShuffleWriter构建时传入了一个与其他两种基于Sorted的Shuffle实现机制不同的参数:context.taskMemoryManager(),在此构建了一个TaskMemoryManager实例并传入UnsafeShuffleWriter。TaskMemoryManager与Task是一对一的关系,负责管理分配给Task的内存。
下面开始解析写数据块的UnsafeShuffleWriter类的源码实现。首先来看其write的方法。
UnsafeShuffleWriter.scala的源码如下:
写过程的关键步骤有以下三步。
(1)通过insertRecordIntoSorter(records.next())方法将每条记录插入外部排序器。
(2)closeAndWriteOutput方法写数据文件与索引文件,在写的过程中,会先合并外部排序器在插入过程中生成的Spill中间文件。
(3)sorter.cleanupResources()最后释放外部排序器的资源。
首先查看将每条记录插入外部排序器(ShuffleExternalSorter)时所使用的insertRecordIntoSorter方法。
UnsafeShuffleWriter.scala的源码如下:
下面继续查看第二步写数据文件与索引文件的closeAndWriteOutput方法。
closeAndWriteOutput的源码如下:
closeAndWriteOutput方法主要有以下三步。
(1)触发外部排序器,获取Spill信息。
(2)合并中间的Spill文件,生成数据文件,并返回各个分区对应的数据量信息。
(3)根据各个分区的数据量信息生成数据文件对应的索引文件。
writeIndexFileAndCommit方法和Sorted Based Shuffle机制的实现一样,在此仅分析过程中不同的Spill文件合并步骤,即mergeSpills方法的具体实现。
UnsafeShuffleWriter.scala的mergeSpills方法的源码如下:
各种合并策略在性能上具有一定差异,会根据具体的条件采用,主要有基于Java NIO(New I/O)和基于普通文件流合并文件的方式。下面简单描述一下基于文件合并流的处理过程。
Spark 2.2.1版本的UnsafeShuffleWriter.scala的mergeSpillsWithFileStream方法的源码如下:
Spark 2.4.3版本的UnsafeShuffleWriter.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第15行之前新增构建BufferedOutputStream实例bos的代码。
上段代码中第16行将new FileOutputStream(outputFile)调整为bos。
上段代码中第22行将构建FileInputStream实例,调整为构建NioBufferedFileInputStream实例。
基于NIO的文件合并流程基本类似,只是底层采用NIO的技术实现。