7.4 Sorted Based Shuffle
在历史的发展中,为什么Spark最终还是放弃了HashShuffle,使用了Sorted-Based Shuffle,而且作为后起之秀的Tungsten-based Shuffle到底是在什么样的背景下产生的。Tungsten-Sort Shuffle已经并入了Sorted-Based Shuffle,Spark的引擎会自动识别程序需要的是Sorted-Based Shuffle,还是Tungsten-Sort Shuffle,Spark会检查相对的应用程序有没有Aggregrate的操作。Sorted-Based Shuffle也有缺点,其缺点反而是它排序的特性,它强制要求数据在Mapper端必须先进行排序(注意,这里没有说对计算结果进行排序),所以导致它排序的速度有点慢。而Tungsten-Sort Shuffle对它的排序算法进行了改进,优化了排序的速度。
Spark会根据宽依赖把它一系列的算子划分成不同的Stage,Stage的内部会进行Pipeline、Stage与Stage之间进行Shuffle。Shuffle的过程包含三部分,如图7-6所示。
图7-6 Shuffle的过程示意图
第一部分是Shuffle的Writer;第二部分是网络传输;第三部分是Shuffle的Read,这三大部分设置了内存操作、磁盘I/O、网络I/O以及JVM的管理。而这些东西是影响了Spark应用程序95%以上效率的唯一原因。假设程序代码本身非常好,性能的95%都消耗在Shuffle阶段的本地写磁盘文件、网络传输数据以及抓取数据这样的生命周期中,如图7-7所示。
在Shuffle写数据的时候,内存中有一个缓存区叫Buffer,可以将其想象成一个Map,同时在本地磁盘有对应的本地文件。如果本地磁盘有文件,在内存中肯定也需要有对应的管理句柄。也就是说,单从ShuffleWriter内存占用的角度讲,已经有一部分内存空间用在存储Buffer数据,另一部分内存空间是用来管理文件句柄的,回顾HashShuffle所产生小文件的个数是Mapper分片数量×Reducer分片数量(M×R)。例如,Mapper端有1000个数据分片,Reducer端也有1000个数据分片,在HashShuffle的机制下,它在本地内存空间中会产生1000×1000=1000000个小文件,结果可想而知,这么多的I/O,这么多的内存消耗、这么容易产生OOM,以及这么沉重的CG负担。再说,如果Reducer端去读取Mapper端的数据时,Mapper端有这么多的小文件,要打开很多网络通道去读数据,打开1000000端口不是一件很轻松的事。这会导致一个非常经典的错误:Reducer端下一个Stage通过Driver去抓取上一个Stage属于它自己的数据的时候,说文件找不到。其实,这个时候不是真的在磁盘上找不到文件,而是程序不响应,因为它在进行垃圾回收(GC)操作。
图7-7 Shuffle示意图
Spark最根本要优化和迫切要解决的问题是:减少Mapper端ShuffleWriter产生的文件数量,这样便可以让Spark从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模(一个Task背后可能是一个Core去运行,也可能是多个Core去运行,但默认情况下是用一个Core去运行一个Task)。
减少Mapper端的小文件带来的好处如下。
(1)Mapper端的内存占用变少了。
(2)Spark不仅仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。
(3)Reducer端抓取数据的次数变少了。
(4)网络通道的句柄变少了。
(5)不仅仅减少了数据级别内存的消耗,更极大减少了Spark框架运行时必须消耗Reducer的内容。
7.4.1 概述
Sorted-Based Shuffle的出现,最显著的优势是把Spark从只能处理中小规模数据的平台,变成可以处理无限大规模数据的平台。集群规模意味着Spark处理数据的规模,也意味着Spark的运算能力。
Sorted-Based Shuffle不会为每个Reducer中的Task生产一个单独的文件,相反,Sorted-Based Shuffle会把Mapper中每个ShuffleMapTask所有的输出数据Data只写到一个文件中,因为每个ShuffleMapTask中的数据会被分类,所以Sort-based Shuffle使用了index文件,存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息。基于Sort-based Shuffle会在Mapper中的每个ShuffleMapTask中产生两个文件(并发度的个数×2),如图7-8所示。
图7-8 Sorted-Based Shuffle示意图
图7-8会产生一个Data文件和一个Index文件。其中,Data文件是存储当前Task的Shuffle输出的,而Index文件则存储了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所需要抓取的上一个Stage中ShuffleMapTask所产生的数据。
假设现在Mapper端有1000个数据分片,Reducer端也有1000个数据分片,它的并发度是100,使用Sorted-Based Shuffle会产生多少个Mapper端的小文件,答案是100×2 = 200个。它的MapTask会独自运行,每个MapTask在运行时写两个文件,运行成功后就不需要这个MapTask的文件句柄,无论是文件本身的句柄,还是索引的句柄,都不需要,所以如果它的并发度是100个Core,每次运行100个任务,它最终只会占用200个文件句柄,这与HashShuffle的机制不一样,HashShuffle最差的情况是Hashed句柄存储在内存中。
图7-9中,Sorted-Based Shuffle主要在Mapper阶段,这个跟Reducer端没有任何关系,在Mapper阶段,Sorted-Based Shuffle要进行排序,可以认为是二次排序,它的原理是有两个Key进行排序,第一个是PartitionId进行排序,第二个是本身数据的Key进行排序。它会把PartitionId分成3个,索引分别为0、1、2,这个在Mapper端进行排序的过程其实是让Reducer去抓取数据的时候变得更高效。例如,第一个Reducer,它会到Mapper端的索引为0的数据分片中抓取数据。具体而言,Reducer首先找Driver去获取父Stage中每个ShuffleMapTask输出的位置信息,根据位置信息获取Index文件,解析Index文件,从解析的Index文件中获取Data文件中属于自己的那部分内容。
图7-9 Sorted-Based Shuffle流程图
一个Mapper任务除了有一个数据文件外,它也会有一个索引文件,Map Task把数据写到文件磁盘的顺序是根据自身的Key写进去的,同时也是按照Partition写进去的,因为它是顺序写数据,记录每个Partition的大小。
Sort-Based Shuffle的弱点如下。
(1)如果Mapper中Task的数量过大,依旧会产生很多小文件,此时在Shuffle传数据的过程中到Reducer端,Reducer会需要同时大量地记录进行反序列化,导致大量内存消耗和GC负担巨大,造成系统缓慢,甚至崩溃!
(2)强制了在Mapper端必须要排序,这里的前提是数据本身不需要排序。
(3)如果在分片内也需要进行排序,此时需要进行Mapper端和Reducer端的两次排序。
(4)它要基于记录本身进行排序,这就是Sort-Based Shuffle最致命的性能消耗。
7.4.2 Sorted Based Shuffle内核
Sorted-Based Shuffle的核心是借助于ExternalSorter把每个ShuffleMapTask的输出排序到一个文件中(FileSegmentGroup),为了区分下一个阶段Reducer Task不同的内容,它还需要有一个索引文件(Index)来告诉下游Stage的并行任务,那一部分是属于下游Stage的,如图7-10所示。
图7-10中,在Reducer端有4个Reducer Task,它会产生一组File Group和一个索引文件,File Group里的FileSegement会进行排序,下游的Task很容易根据索引(index)定位到这个File中的那一部分。FileSegement是属于下游的,相当于一个指针,下游的Task要向Driver去确定文件在哪里,然后到这个File文件所在的地方,实际上会与BlockManager进行沟通,BlockManager首先会读一个Index文件,根据它的命名规则进行解析。例如,下一个阶段的第一个Task,一般就是抓取第一个Segment,这是一个指针定位的过程。
Sort-Based Shuffle最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容,划分成不同FileSegment构成的单一文件File;另外一个是索引文件Index。图7-10中,Sort-Based Shuffle展示了一个Sort and Spill的过程(它是Spill到磁盘的时候再进行排序的)。
图7-10 Sorted-Based Shuffle的核心示意图
7.4.3 Sorted Based Shuffle数据读写的源码解析
Sorted Based Shuffle,即基于Sorted的Shuffle实现机制,在该Shuffle过程中,Sorted体现在输出的数据会根据目标的分区Id(即带Shuffle过程的目标RDD中各个分区的Id值)进行排序,然后写入一个单独的Map端输出文件中。相应地,各个分区内部的数据并不会再根据Key值进行排序,除非调用带排序目的的方法,在方法中指定Key值的Ordering实例,才会在分区内部根据该Ordering实例对数据进行排序。当Map端的输出数据超过内存容纳大小时,会将各个排序结果Spill到磁盘上,最终再将这些Spill的文件合并到一个最终的文件中。在Spark的各种计算算子中到处体现了一种惰性的理念,在此也类似,在需要提升性能时,引入根据分区Id排序的设计,同时仅在指定分区内部排序的情况下,才会全局去排序。而Hadoop的MapReduce相比之下带有一定的学术气息,中规中矩,严格设计Shuffle阶段中的各个步骤。
基于Hash的Shuffle实现,ShuffleManager的具体实现子类为HashShuffleManager,对应的具体实现机制如7-11所示。
在图7-11中,各个不同的ShuffleHandle与不同的具体Shuffle写入器实现子类是一一对应的,可以认为是通过注册时生成的不同ShuffleHandle设置不同的Shuffle写入器实现子类。
从ShuffleManager注册的配置属性与具体实现子类的映射关系,即前面提及的在SparkEnv中实例化的代码,可以看出sort与tungsten-sort对应的具体实现子类都是org.apache.spark.shuffle.sort.SortShuffleManager。也就是当前基于Sort的Shuffle实现机制与使用Tungsten项目的Shuffle实现机制都是通过SortShuffleManager类来提供接口,两种实现机制的区别在于,该类中使用了不同的Shuffle数据写入器。
SortShuffleManager根据内部采用的不同实现细节,对应有两种不同的构建Map端文件输出的写方式,分别为序列化排序模式与反序列化排序模式。
图7-11 基于Sorted的Shuffle实现机制的框架类图
(1)序列化排序(Serialized sorting)模式:这种方式对应了新引入的基于Tungsten项目的方式。
(2)反序列化排序(Deserialized sorting)模式:这种方式对应除了前面这种方式之外的其他方式。
基于Sort的Shuffle实现机制采用的是反序列化排序模式。下面分析该实现机制下的数据写入器的实现细节。
基于Sort的Shuffle实现机制,具体的写入器的选择与注册得到的ShuffleHandle类型有关,参考SortShuffleManager类的registerShuffle方法,相关代码如下所示。
SortShuffleManager.scala的源码如下:
Sorted Based Shuffle写数据的源码解析如下。
基于Sort的Shuffle实现机制中相关的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。对应这两种ShuffleHandle及其相关的Shuffle数据写入器类型的相关代码可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。
SortShuffleManager的getWriter的源码如下:
在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块与物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用的IndexShuffleBlockResolver。
下面开始解析这两种写数据块方式的源码实现。
1.BypassMergeSortShuffleWriter写数据的源码解析
该类实现了带Hash风格的基于Sort的Shuffle机制,为每个Reduce端的任务构建一个输出文件,将输入的每条记录分别写入各自对应的文件中,并在最后将这些基于各个分区的文件合并成一个输出文件。
在Reducer端任务数比较少的情况下,基于Hash的Shuffle实现机制明显比基于Sort的Shuffle实现机制要快,因此基于Sort的Shuffle实现机制提供了一个fallback方案,对于Reducer端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带Hash风格的fallback计划,由BypassMergeSortShuffleWriter具体实现。
使用该写入器的条件如下。
(1)不能指定Ordering,从前面数据读取器的解析可以知道,当指定Ordering时,会对分区内部的数据进行排序。因此,对应的BypassMergeSortShuffleWriter写入器避免了排序开销。
(2)不能指定Aggregator。
(3)分区个数小于spark.shuffle.sort.bypassMergeThreshold配置属性指定的个数。
和其他ShuffleWriter的具体子类一样,BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,关键代码如下所示。
BypassMergeSortShuffleWriter.scala的write的源码如下:
其中调用的createTempShuffleBlock方法描述了各个分区生成的中间临时文件的格式与对应的BlockId。
DiskBlockManager的createTempShuffleBlock的源码如下:
从上面的分析中可以知道,每个Map端的任务最终会生成两个文件,即数据(Data)文件和索引(Index)文件。
另外,使用DiskBlockObjectWriter写记录时,是以32条记录批次写入的,不会占用太大的内存。但由于对应不能指定聚合器(Aggregator),写数据时也是直接写入记录,因此对应后续的网络I/O的开销也会很大。
2.SortShuffleWriter写数据的源码解析
前面BypassMergeSortShuffleWriter的写数据是在Reducer端的分区个数较少的情况下提供的一种优化方式,但当数据集规模非常大时,使用该写数据方式不合适时,就需要使用SortShuffleWriter来写数据块。
和其他ShuffleWriter的具体子类一样,SortShuffleWriter写数据的具体实现位于实现的write方法中。
Spark 2.2.1版本的SortShuffleWriter的write的源码如下:
Spark 2.4.3版本的SortShuffleWriter.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第5行删掉。
在这种基于Sort的Shuffle实现机制中引入了外部排序器(ExternalSorter)。ExternalSorter继承了Spillable,因此内存使用达到一定阈值时,会Spill到磁盘,可以减少内存带来的开销。
外部排序器的insertAll方法内部在处理完(包含聚合和非聚合两种方式)每条记录时,都会检查是否需要Spill。内部各种细节比较多,这里以Spill条件判断为主线,简单描述一下条件相关的代码。具体判断是否需要Spill的相关代码可以参考Spillable类中的maybeSpill方法(该方法的简单调用流程为:ExternalSorter #insterAll–>ExternalSorter #maybeSpillCollection–>Spillable#maybeSpill)。
Spillable.scala的maybeSpill的源码如下:
对于外部排序器,除了insertAll方法外,它的writePartitionedFile方法也非常重要。
ExternalSorter.scala的writePartitionedFile的源码如下:
其中,BlockId是数据块的逻辑位置,File参数是对应逻辑位置的物理存储位置。这两个参数值的获取方法和使用BypassMergeSortShuffleHandle及其对应的ShuffleWriter是一样的。
在该方法中,有一个容易混淆的地方,与Shuffle的度量(Metric)信息有关,对应代码如下:
其中,第1行对应修改了Spilled的数据在内存中的字节大小,第2行则对应修改了Spilled的数据在磁盘中的字节大小。在内存中时,数据是以反序列化形式存放的,而存储到磁盘(默认会序列化)时,会对数据进行序列化。反序列化后的数据会远远大于序列化后的数据(也可以通过UI界面查看这两个度量信息的大小差异来确认,具体差异的大小和数据以及选择的序列化器有关,有兴趣的读者可以参考各序列器间的性能等比较文档)。
从这一点也可以看出,如果在内存中使用反序列化的数据,会大大增加内存的开销(也意味着增加GC负载),并且反序列化也会增加CPU的开销,因此引入了利用Tungsten项目的基于Tungsten Sort的Shuffle实现机制。Tungsten项目的优化主要有三个方面,这里从避免反序列化的数据量会极大消耗内存这方面考虑,主要是借助Tungsten项目的内存管理模型,可以直接处理序列化的数据;同时,CPU开销方面,直接处理序列化数据,可以避免数据反序列化的这部分处理开销。