9.1 Spark中Cache原理和源码详解
本节对Spark中Cache原理及Spark中Cache源码进行详解。
9.1.1 Spark中Cache原理详解
Spark中Cache机制原理:首先,RDD是通过iterator进行计算的。
(1)CacheManager会通过BlockManager从Local或者Remote获取数据直接通过RDD的compute进行计算,有可能需要考虑checkpoint。
(2)通过BlockManager首先从本地获取数据,如果得不到数据,就会从远程获取数据。
(3)首先查看当前的RDD是否进行了checkpoint,如果进行了的话,就直接读取checkpoint的数据,否则必须进行计算;因为此时RDD需要缓存,所以计算如果需要,则通过BlockManager再次进行持久化。
(4)如果持久化的时候只是缓存到磁盘中,就直接使用BlockManager的doPut方法写入磁盘(需要考虑Replication)。
(5)如果指定内存作缓存,优先保存到内存中,此时会使用MemoryStore.unrollSafely方法来尝试安全地将数据保存在内存中,如果内存不够,会使用一个方法来整理一部分内存空间,然后基于整理出来的内存空间放入我们想缓存的最新数据。
(6)直接通过RDD的compute进行计算,有可能需要考虑checkpoint。
Spark中,Cache原理示意图如图9-1所示。
9.1.2 Spark中Cache源码详解
CacheManager管理是缓存,而缓存可以是基于内存的缓存,也可以是基于磁盘的缓存。CacheManager需要通过BlockManager来操作数据。
Task发生计算时要调用RDD的compute进行计算。下面看一下MapPartitionsRDD的compute方法。
图9-1 Cache原理示意图
Spark 2.2.1版本的MapPartitionsRDD的源码如下:
Spark 2.4.3版本的MapPartitionsRDD.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第4行之后新增2个参数isFromBarrier、isOrderSensitive。isFromBarrier指示此RDD是否从RDDBarrier转换,包含至少一个RDDBarrier的阶段应转换为屏障阶段。isOrderSensitive指示函数是否区分顺序。如果它是顺序敏感的,当输入顺序改变时,它可能返回完全不同的结果。大多数状态函数是顺序敏感的。
上段代码中第17行之后新增代码,isBarrier_方法指示RDD是否处于屏障阶段。Spark必须同时启动屏障阶段的所有任务。如果RDD处于屏障阶段,至少有一个父RDD或其自身映射自一个RDDBarrier。对于ShuffledRDD,此函数总是返回false,因为ShuffledRDD表示新阶段的开始;对于MapPartitionsRDD,可以从RDDBarrier转换,在这种情况下,MapPartitionsRDD应标记为屏障。重写GetOutputDeterministicLevel方法实现自定义逻辑计算RDD输出的确定级别。
compute真正计算的时候通过iterator计算,MapPartitionsRDD的iterator依赖父RDD计算。iterator是RDD内部的方法,如有缓存,将从缓存中读取数据,否则进行计算。这不是被用户直接调用,但可用于实现自定义子RDD。
RDD.scala的iterator方法如下:
RDD.scala的iterator方法中判断storageLevel != StorageLevel.NONE,说明数据可能存放在内存、磁盘中,调用getOrCompute(split, context)方法。如果之前计算过一次,再次计算可以找CacheManager要数据。
RDD.scala的getOrCompute的源码如下:
在有缓存的情况下,缓存可能基于内存,也可能基于磁盘,getOrCompute获取缓存;如没有缓存,则需重新计算RDD。为何需要重新计算?如果数据放在内存中,假设缓存了100万个数据分片,下一个步骤计算的时候需要内存,因为需要进行计算的内存空间占用比之前缓存的数据占用内存空间重要,假设须腾出10000个数据分片所在的空间,因此从BlockManager中将内存中的缓存数据drop到磁盘上,如果不是内存和磁盘的存储级别,那10000个数据分片的缓存数据就可能丢失,99万个数据分片可以复用,而这10000个数据分片须重新进行计算。
Cache在工作的时候会最大化地保留数据,但是数据不一定绝对完整,因为当前的计算如果需要内存空间,那么Cache在内存中的数据必须让出空间,此时如何在RDD持久化的时候同时指定可以把数据放在Disk上,那么部分Cache的数据就可以从内存转入磁盘,否则数据就会丢失。
getOrCompute方法返回的是Iterator。进行Cache以后,BlockManager对其进行管理,通过blockId可以获得曾经缓存的数据。具体CacheManager在获得缓存数据的时候会通过BlockManager来抓到数据。
getOrElseUpdate方法中,如果block存在,检索给定的块block;如果不存在,则调用提供makeIterator方法计算块block,对块block进行持久化,并返回block的值。
BlockManager.scala的getOrElseUpdate的源码如下:
BlockManager.scala的getOrElseUpdate中根据blockId调用了get[T](blockId)方法,get方法从block块管理器(本地或远程)获取一个块block。如果块在本地存储且没获取锁,则先获取块block的读取锁。如果该块是从远程块管理器获取的,当data迭代器被完全消费以后,那么读取锁将自动释放。get的时候,如果本地有数据,从本地获取数据返回;如果没有数据,则从远程节点获取数据。
BlockManager.scala的get方法的源码如下:
BlockManager的get方法从Local的角度讲,如果数据在本地,get方法调用getLocalValues获取数据。如果数据在内存中(level.useMemory且memoryStore包含了blockId),则从memoryStore中获取数据;如果数据在磁盘中(level.useDisk且diskStore包含了blockId),则从diskStore中获取数据。这说明数据在本地缓存,可以在内存中,也可以在磁盘上。
BlockManager的get方法从remote的角度讲,get方法中将调用getRemoteValues方法。
BlockManager.Scala的getRemoteValues的源码如下:
getRemoteValues方法中调用getRemoteBytes方法,通过blockTransferService.fetchBlockSync从远程节点获取数据。
Spark 2.2.1版本的BlockManager.Scala的getRemoteBytes的源码如下:
Spark 2.4.3版本的BlockManager.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第5行之后新增代码,因为所有的远程块都注册在driver中,所以不需要所有的从属执行器获取块状态。
上段代码中第6行将getLocations方法调整为sortLocations方法。sortLocations方法返回给定块的位置列表,本地计算机的优先级从多个块管理器可以共享同一个主机,然后是同一机架上的主机。
上段代码中第14行新增一个tempFileManager参数。如果块大小超过阈值,将FileManager传递给BlockTransferService,利用它来溢出块;如果没有,传递空值意味着块将持久存在内存中。
上段代码中第31行将getLocations方法调整为使用sortLocations(master.getLocations(blockId)).iterator方法,如果有大量执行者,则位置列表可以包含大量过时的条目导致大量重试,可能花大量的时间。除去这些陈旧的条目,在一定数量的提取失败后刷新块位置。
上段代码中第42行代码进行替换,对于未记录的escape hatch,如果ChunkedByteBuffer时出现任何问题,返回到旧代码路径。如果新路径稳定,可在Spark 2.4以后的版本中清除。
BlockManager的get方法,如果本地有数据,则从本地获取数据返回;如果远程有数据,则从远程获取数据返回;如果都没有数据,就返回None。get方法的返回类型是Option[BlockResult],Option的结果分为两种情况:①如果有内容,则返回Some[BlockResult;②如果没有内容,则返回None。这是Option的基础语法。
Option.scala的源码如下:
回到BlockManager的getOrElseUpdate方法,从get方法返回的结果进行模式匹配,如果有数据,则对Some(block)返回Left(block),这是获取到block的情况;如果没数据,则是None,须计算block。
回到RDD.scala的getOrCompute方法,在getOrCompute方法中调用SparkEnv.get.blockManager.getOrElseUpdate方法时,传入blockId、storageLevel、elementClassTag,其中第四个参数是一个匿名函数,在匿名函数中调用了computeOrReadCheckpoint(partition,context)。然后在getOrElseUpdate方法中,根据blockId获取数据,如果获取到缓存数据,就返回;如果没有数据,就调用doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true)进行计算,doPutIterator其中第二个参数makeIterator就是getOrElseUpdate方法中传入的匿名函数,在匿名函数中获取到Iterator数据。
RDD.getOrCompute中computeOrReadCheckpoint方法,如果RDD进行了checkpoint,则从父RDD的iterator中直接获取数据;或者没有Checkpoint物化,则重新计算RDD的数据。
RDD.scala的computeOrReadCheckpoint的源码如下:
BlockManager.scala的getOrElseUpdate方法中如果根据blockID没有获取到本地数据,则调用doPutIterator将通过BlockManager再次进行持久化。
BlockManager.scala的getOrElseUpdate方法的源码如下:
BlockManager.scala的getOrElseUpdate方法中调用了doPutIterator。doPutIterator将makeIterator从父RDD的checkpoint读取的数据或者重新计算的数据存放到内存中,如果内存不够,就溢出到磁盘中持久化。
BlockManager.scala的doPutIterator方法的源码如下: