9.2 Spark中checkpoint原理和源码详解
本节对Spark中checkpoint原理及Spark中checkpoint源码进行详解。
9.2.1 Spark中checkpoint原理详解
checkpoint到底是什么?
(1)Spark在生产环境下经常会面临Tranformations的RDD非常多(例如,一个Job中包含10 000个RDD)或者具体Tranformation产生的RDD本身计算特别复杂和耗时(例如,计算时常超过1h),此时我们必须考虑对计算结果数据的持久化。
(2)Spark擅长多步骤迭代,同时擅长基于Job的复用,这时如果能够对曾经计算的过程产生的数据进行复用,就可以极大地提升效率。
(3)如果采用persist把数据放在内存中,虽然是最快速的,但是也是最不可靠的。如果放在磁盘上,也不是完全可靠的。例如,磁盘会损坏,管理员可能清空磁盘等。
(4)checkpoint的产生就是为了相对更加可靠地持久化数据,checkpoint可以指定把数据放在本地并且是多副本的方式,但是在正常的生产情况下是放在HDFS,这就自然地借助HDFS高容错、高可靠的特征完成了最大化的、可靠的持久化数据的方式。
(5)为确保RDD复用计算的可靠性,checkpoint把数据持久化到HDFS中,保证数据最大程度的安全性。
(6)checkpoint就是针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)开始基于HDFS等的数据持久化复用策略,通过对RDD启动checkpoint机制来实现容错和高可用。
9.2.2 Spark中checkpoint源码详解
1.checkpoint的运行原理和源码实现彻底详解
RDD进行计算前须先看一下是否有checkpoint,如果有checkpoint,就不需要再进行计算了。
RDD.scala的iterator方法的源码如下:
进入RDD.scala的getOrCompute方法,源码如下:
getOrCompute方法的getOrElseUpdate方法传入的第四个参数是匿名函数,调用computeOrReadCheckpoint(partition, context)检查checkpoint中是否有数据。
RDD.scala的computeOrReadCheckpoint的源码如下:
computeOrReadCheckpoint方法中的isCheckpointedAndMaterialized是一个布尔值,判断这个RDD是否checkpointed和被物化,Spark 2.0 checkpoint中有两种方式:reliably或者locally。computeOrReadCheckpoint作为isCheckpointed语义的别名返回值。
isCheckpointedAndMaterialized方法的源码如下:
回到RDD.scala的computeOrReadCheckpoint,如果已经持久化及物化isCheckpointed-AndMaterialized,就调用firstParent[T]的iterator。如果没有持久化,则进行compute。
2.checkpoint原理机制
(1)通过调用SparkContext.setCheckpointDir方法指定进行checkpoint操作的RDD把数据放在哪里,在生产集群中是放在HDFS上的,同时为了提高效率,在进行checkpoint的使用时,可以指定很多目录。
SparkContext为即将计算的RDD设置checkpoint保存的目录。如果在集群中运行,必须是HDFS的目录路径。
SparkContext.scala的setCheckpointDir的源码如下:
RDD.scala的checkpoint方法标记RDD的检查点checkpoint。它将保存到SparkContext#setCheckpointDir的目录检查点内的文件中,所有引用它的父RDDs将被移除。须在任何作业之前调用此函数。建议RDD在内存中缓存,否则保存在文件中时需要重新计算。
RDD.scala的checkpoint的源码如下:
其中的checkpointData是RDDCheckpointData。
1. private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
RDDCheckpointData标识某个RDD要进行checkpoint。如果某个RDD要进行checkpoint,那在Spark框架内部就会生成RDDCheckpointData。
(2)在进行RDD的checkpoint的时候,其所依赖的所有的RDD都会从计算链条中清空掉。
(3)作为最佳实践,一般在进行checkpoint方法调用前都要进行persist把当前RDD的数据持久化到内存或者磁盘上,这是因为checkpoint是Lazy级别,必须有Job的执行,且在Job执行完成后,才会从后往前回溯哪个RDD进行了checkpoint标记,然后对标记过的RDD新启动一个Job执行具体的checkpoint过程。
(4)checkpoint改变了RDD的Lineage。
(5)当调用checkpoint方法要对RDD进行checkpoint操作,此时框架会自动生成RDDCheckpointData,当RDD上运行过一个Job后,就会立即触发RDDCheckpointData中的checkpoint方法,在其内部会调用doCheckpoint,实际上在生产时会调用ReliableRDDCheckpointData的doCheckpoint,在生产过程中会导致ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用,而在writeRDDToCheckpointDirectory方法内部,会触发runJob来执行把当前的RDD中的数据写到checkpoint的目录中,同时会产生ReliableCheckpointRDD实例。
RDDCheckpointData.scala的checkpoint方法进行真正的checkpoint:在RDDCheckpointData.synchronized同步块中先判断cpState的状态,然后调用doCheckpoint()。
RDDCheckpointData.scala的checkpoint方法的源码如下:
其中的doCheckpoint方法是RDDCheckpointData.scala中的方法,这里没有具体的实现。
1. protected def doCheckpoint(): CheckpointRDD[T]
RDDCheckpointData的子类包括LocalRDDCheckpointData、ReliableRDDCheckpointData。ReliableRDDCheckpointData子类中doCheckpoint方法具体的实现,在方法中进行writeRDDToCheckpointDirectory的调用。
ReliableRDDCheckpointData.scala的doCheckpoint的源码如下:
writeRDDToCheckpointDirectory将RDD的数据写入到checkpoint的文件中,返回一个ReliableCheckpointRDD。
首先找到sparkContext,赋值给sc变量。
基于checkpointDir创建checkpointDirPath。
fs获取文件系统的内容。
然后是广播sc.broadcast,将路径信息广播给所有的Executor。
接下来是sc.runJob,触发runJob执行,把当前的RDD中的数据写到checkpoint的目录中。
最后返回ReliableCheckpointRDD。无论是对哪个RDD进行checkpoint,最终都会产生ReliableCheckpointRDD,以checkpointDirPath.toString中的数据为数据来源;以originalRDD.partitioner的分区器partitioner作为partitioner;这里的originalRDD就是要进行checkpoint的RDD。
writeRDDToCheckpointDirectory的源码如下:
ReliableCheckpointRDD是读取以前写入可靠存储系统检查点文件数据的RDD。其中的partitioner是构建ReliableCheckpointRDD的时候传进来的。其中的getPartitions是构建一个一个的分片。其中,getPreferredLocations获取数据本地性,fs.getFileBlockLocations获取文件的位置信息。compute方法通过ReliableCheckpointRDD.readCheckpointFile读取数据。
ReliableCheckpointRDD.scala的源码如下:
下面看一下ReliableCheckpointRDD.scala中compute方法中的ReliableCheckpointRDD.readCheckpointFile。readCheckpointFile读取指定检查点文件checkpoint的内容。readCheckpointFile方法通过deserializeStream反序列化fileInputStream文件输入流,然后将deserializeStream变成一个Iterator。
Spark 2.2.1版本的ReliableCheckpointRDD.scala的readCheckpointFile的源码如下:
Spark 2.4.3版本的ReliableCheckpointRDD.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第20行新增一个返回类型Unit。
ReliableRDDCheckpointData.scala的cleanCheckpoint方法,清理RDD数据相关的checkpoint文件。
在生产环境中不使用LocalCheckpointRDD。LocalCheckpointRDD的getPartitions直接从toArray级别中调用new()函数创建CheckpointRDDPartition。LocalCheckpointRDD的compute方法直接报异常。
LocalCheckpointRDD的源码如下:
checkpoint运行流程图如图9-2所示。
图9-2 Checkpoint运行流程图
通过SparkContext设置Checkpoint数据保存的目录,RDD调用checkpoint方法,生产RDDCheckpointData,当RDD上运行一个Job后,就会立即触发RDDCheckpointData中的checkpoint方法,在其内部会调用doCheckpoint;然后调用ReliableRDDCheckpointData的doCheckpoint;ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用;在writeRDDToCheckpointDirectory方法内部会触发runJob,来执行把当前的RDD中的数据写到Checkpoint的目录中,同时会产生ReliableCheckpointRDD实例。
checkpoint保存在HDFS中,具有多个副本;persist保存在内存中或者磁盘中。在Job作业调度的时候,checkpoint沿着finalRDD的“血统”关系lineage从后往前回溯向上查找,查找哪些RDD曾标记为要进行checkpoint,标记为checkpointInProgress;一旦进行checkpoint,RDD所有父RDD就被清空。