5.1 Master启动原理和源码详解
本节讲解Master启动的原理和源码;Master HA双机切换;Master的注册机制和状态管理解密等内容。
5.1.1 Master启动的原理详解
Spark应用程序作为独立的集群进程运行,由主程序中的SparkContext对象(称为驱动程序)协调。Spark集群部署组件如图5-1所示。
图5-1 Spark集群部署组件图
其中各个术语及相关术语的描述如下。
(1)Driver Program:运行Application的main函数并新建SparkContext实例的程序,称为驱动程序(Driver Program)。通常可以使用SparkContext代表驱动程序。
(2)Cluster Manager:集群管理器(Cluster Manager)是集群资源管理的外部服务。Spark上现在主要有Standalone、YARN、Mesos 3种集群资源管理器。Spark自带的Standalone模式能够满足绝大部分纯粹的Spark计算环境中对集群资源管理的需求,基本上只有在集群中运行多套计算框架的时候才建议考虑YARN和Mesos。
(3)Worker Node:集群中可以运行Application代码的工作节点(Worker Node),相当于Hadoop的Slave节点。
(4)Executor:在Worker Node上为Application启动的一个工作进程,在进程中负责任务(Task)的运行,并且负责将数据存放在内存或磁盘上,在Executor内部通过多线程的方式(即线程池)并发处理应用程序的具体任务。
每个Application都有各自独立的Executors,因此应用程序之间是相互隔离的。
(5)Task:任务(Task)是指被Driver送到Executor上的工作单元。通常,一个任务会处理一个Partition的数据,每个Partition一般是一个HDFS的Block块的大小。
(6)Application:是创建了SparkContext实例对象的Spark用户程序,包含了一个Driver program和集群中多个Worker上的Executor。
(7)Job:和Spark的action对应,每个action,如count、savaAsTextFile等都会对应一个Job实例,每个Job会拆分成多个Stages,一个Stage中包含一个任务集(TaskSet),任务集中的各个任务通过一定的调度机制发送到工作单位(Executor)上并行执行。
Spark Standalone集群的部署采用典型的Master/Slave架构。其中,Master节点负责整个集群的资源管理与调度,Worker节点(也可以称Slave节点)在Master节点的调度下启动Executor,负责执行具体工作(包括应用程序以及应用程序提交的任务)。
5.1.2 Master启动的源码详解
Spark中各个组件是通过脚本来启动部署的。下面以脚本为入口点开始分析Master的部署。每个组件对应提供了启动的脚本,同时也会提供停止的脚本。停止脚本比较简单,在此仅分析启动脚本。
1.Master部署的启动脚本解析
首先看一下Master的启动脚本./sbin/start-master.sh,内容如下:
通过脚本的简单分析,可以看出Master组件是以后台守护进程的方式启动的,对应的后台守护进程的启动脚本spark-daemon.sh,在后台守护进程的启动脚本spark-daemon.sh内部,通过脚本spark-class启动一个指定主类的JVM进程,相关代码如下:
通过脚本的分析,可以知道最终执行的是Master类(对应的代码为前面的CLASS="org.apache.spark.deploy.master.Master"),对应的入口点是Master伴生对象中的main方法。下面以该方法作为入口点进一步解析Master部署框架。
部署Master组件时,最简单的方式是直接启动脚本,不带任何选项参数,命令如下:
1. ./sbin/start-master.sh
如需设置选项参数,可以查看帮助信息,根据自己的需要进行设置。
2.Master的源码解析
首先查看Master伴生对象中的main方法。
Spark 2.2.1版本的Master.scala的源码如下:
Spark 2.4.3版本的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第2行之前新增SparkUncaughtExceptionHandler处理的代码。
和其他类(如SparkSubmit)一样,Master类的入口点处也包含了对应的参数类MasterArguments。MasterArguments类包括Spark属性配置相关的一些解析。
MasterArguments.scala的源码如下:
MasterArguments中的printUsageAndExit方法对应的就是命令行中的帮助信息。
MasterArguments.scala的源码如下:
解析完Master的参数后,调用startRpcEnvAndEndpoin方法启动RPC通信环境以及Master的RPC通信终端。
Master.scala的startRpcEnvAndEndpoint的源码如下:
startRpcEnvAndEndpoint方法中定义了ENDPOINT_NAME。
Master.scala的源码如下:
startRpcEnvAndEndpoint方法中通过masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)给Master自己发送一个消息BoundPortsRequest,是一个case object。发送消息BoundPortsRequest给自己,确保masterEndpoint正常启动起来。返回消息的类型是BoundPortsResponse,是一个case class。
MasterMessages.scala的源码如下:
Master收到消息BoundPortsRequest,发送返回消息BoundPortsResponse。
Master.scala的源码如下:
在BoundPortsResponse传入的参数restServerBoundPort是在Master的onStart方法中定义的。
Master.scala的源码如下:
而restServerBoundPort是通过restServer进行map操作启动赋值。下面看一下restServer。
Master.scala的源码如下:
其中调用new()函数创建一个StandaloneRestServer。StandaloneRestServer服务器响应请求提交的[RestSubmissionClient],将被嵌入到standalone Master中,仅用于集群模式。服务器根据不同的情况使用不同的HTTP代码进行响应。
200 OK-请求已成功处理。
400错误请求:请求格式错误,未成功验证,或意外类型。
468未知协议版本:请求指定了此服务器不支持的协议。
500内部服务器错误:服务器在处理请求时引发内部异常。
服务器在HTTP主体中总包含一个JSON表示的[SubmitRestProtocolResponse]。如果发生错误,服务器将包括一个[ErrorResponse]。如果构造了这个错误响应内部失败时,响应将由一个空体组成。响应体指示内部服务器错误。
StandaloneRestServer.scala的源码如下:
下面看一下RestSubmissionClient客户端。客户端提交申请[RestSubmissionServer]。在协议版本V1中,REST URL以表单形式出现http://[host:port]/v1/submissions/[action],[action]可以是create、kill或状态中的其中一种。每种请求类型都表示为发送到以下前缀的HTTP消息。
(1)submit-POST to /submissions/create;
(2)kill-POST /submissions/kill/[submissionId];
(3)status-GET /submissions/status/[submissionId]。
在(1)情况下,参数以JSON字段的形式发布到HTTP主体中。否则,URL指定按客户端的预期操作。由于该协议预计将在Spark版本中保持稳定,因此现有字段不能添加或删除,但可以添加新的可选字段。如在少见的事件中向前或向后兼容性被破坏,Spark须引入一个新的协议版本(如V2)。客户机和服务器必须使用协议的同一版本进行通信。如果不匹配,服务器将用它支持的最高协议版本进行响应。此客户机的实现可以用指定的版本使用该信息重试。
RestSubmissionClient.scala的源码如下:
Restful把一切都看成是资源。利用Restful API可以对Spark进行监控。程序运行的每一个步骤、Task的计算步骤都可以可视化,对Spark的运行进行详细监控。
回到startRpcEnvAndEndpoint方法中,新创建了一个Master实例。Master实例化时会对所有的成员进行初始化,如默认的Cores个数等。
Master.scala的源码如下:
Master继承了ThreadSafeRpcEndpoint和LeaderElectable,其中继承LeaderElectable涉及Master的高可用性(High Availability,HA)机制。这里先关注ThreadSafeRpcEndpoint,继承该类后,Master作为一个RpcEndpoint,实例化后首先会调用onStart方法。
Spark 2.2.1版本的Master.scala的源码如下:
Spark 2.4.3版本的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第9行之后新增webUi.addProxy()的代码。
1. webUi.addProxy()
其中在Master的onStart方法中用new()函数创建MasterWebUI,启动一个webServer。
Master.scala的源码如下:
如MasterWebUI的spark.ui.killEnabled设置为True,可以通过WebUI页面把Spark的进程杀掉。
Spark 2.2.1版本的MasterWebUI.scala的源码如下:
Spark 2.4.3版本的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第17行将attachHandler方法调整为addStaticHandler方法。
1. addStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)
MasterWebUI中在初始化时用new()函数创建MasterPage,在MasterPage中通过代码去写Web页面。
MasterPage.scala的源码如下:
回到MasterWebUI.scala的initialize()方法,其中调用了attachPage方法,在WebUI中增加Web页面。
Spark 2.2.1版本的WebUI.scala的源码如下:
Spark 2.4.3版本的WebUI.scala的源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第1行进行修改,增加attachPage方法的返回类型。
1. def attachPage(page: WebUIPage): Unit = {
在WebUI的bind方法中启用了JettyServer。
WebUI.scala的bind的源码如下:
JettyUtils.scala的startJettyServer尝试将Jetty服务器绑定到所提供的主机名、端口。
startJettyServer的源码如下:
5.1.3 Master HA双机切换
Spark生产环境下一般采用ZooKeeper作HA,且建议为3台Master,ZooKeeper会自动化管理Masters的切换。
采用ZooKeeper作HA时,ZooKeeper会保存整个Spark集群运行时的元数据,包括Workers、Drivers、Applications、Executors。
ZooKeeper遇到当前Active级别的Master出现故障时会从Standby Masters中选取出一台作为Active Master,但是要注意,被选举后到成为真正的Active Master之前需要从ZooKeeper中获取集群当前运行状态的元数据信息并进行恢复。
在Master切换的过程中,所有已经在运行的程序皆正常运行。因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时,Job本身的调度和处理和Master是没有任何关系的。
在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群,因为只有Active Master才能接收新的程序提交请求;另一方面,已经运行的程序中也不能因为Action操作触发新的Job提交请求。
ZooKeeper下Master HA的基本流程如图5-2所示。
ZooKeeper下Master HA的基本流程如下。
(1)使用ZooKeeperPersistenceEngine读取集群的状态数据,包括Drivers、Applications、Workers、Executors等信息。
(2)判断元数据信息是否有空的内容。
(3)把通过ZooKeeper持久化引擎获得了Drivers、Applications、Workers、Executors等信息,重新注册到Master的内存中缓存起来。
(4)验证获得的信息和当前正在运行的集群状态的一致性。
(5)将Application和Workers的状态标识为Unknown,然后向Application中的Driver以及Workers发送现在是Leader的Standby模式的Master的地址信息。
(6)当Driver和Workers收到新的Master的地址信息后会响应该信息。
(7)Master接收到来自Drivers和Workers响应的信息后会使用一个关键的方法:completeRecovery()来对没有响应的Applications (Drivers)、Workers (Executors)进行处理。处理完毕后,Master的State会变成RecoveryState.ALIVE,从而开始对外提供服务。
图5-2 ZooKeeper下Master HA的基本流程
(8)此时Master使用自己的Schedule方法对正在等待的Application和Drivers进行资源调度。
Master HA的4大方式分别是ZOOKEEPER、FILESYSTEM、CUSTOM、NONE。
需要说明的是:
(1)ZOOKEEPER是自动管理Master。
(2)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接收应用程序提交的请求、接收新的Job运行的请求)。
(3)CUSTOM的方式允许用户自定义Master HA的实现,这对高级用户特别有用。
(4)NONE,这是默认情况,Spark集群中就采用这种方式,该方式不会持久化集群的数据,Master启动后立即管理集群。
Master.scala的HA的源码如下:
Spark默认的HA方式是NONE。
1. private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")
如使用ZOOKEEPER的HA方式,RecoveryModeFactory.scala的源码如下:
通过调用zkFactory.createPersistenceEngine()用new()函数创建一个ZooKeeper-PersistenceEngine。
ZooKeeperPersistenceEngine.scala的源码如下:
PersistenceEngine中有至关重要的方法persist来实现数据持久化,readPersistedData来恢复集群中的元数据。
PersistenceEngine.scala的源码如下:
下面来看createdLeaderElectionAgent方法。在createdLeaderElectionAgent方法中调用new()函数创建ZooKeeperLeaderElectionAgent实例。
StandaloneRecoveryModeFactory.scala的源码如下:
ZooKeeperLeaderElectionAgent的源码如下:
FILESYSTEM和NONE的方式采用MonarchyLeaderAgent的方式来完成Leader的选举,其实现是直接把传入的Master作为Leader。
LeaderElectionAgent.scala的源码如下:
FileSystemRecoveryModeFactory.scala的源码如下:
如果WorkerState状态为UNKNOWN(Worker不响应),就把它删除,如果以集群方式运行,driver失败后可以重新启动,最后把状态变回ALIVE。注意,这里要加入--supervise这个参数。
Master.scala的源码如下:
5.1.4 Master的注册机制和状态管理解密
1.Master对其他组件注册的处理
Master接收注册的对象主要是Driver、Application、Worker;Executor不会注册给Master,Executor是注册给Driver中的SchedulerBackend的。
Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker,以提升处理能力。假如在生产环境中的集群中有500台机器,可能又新加入100台机器,这时不需要重新启动整个集群,就可以将100台新机器加入到集群。
Spark 2.2.1版本的Worker的源码如下:
Spark 2.4.3版本的Worker.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第10行之后新增externalShuffleServiceSupplier的成员变量,externalShuffleServiceSupplier是ExternalShuffleService类型,ExternalShuffleService提供一个server,Executors可以从server中读取shuffle文件(而不是彼此之间直接读取),在executors被关闭或杀死的情况下提供对文件的不间断访问,需要SASL身份验证才能读取。
Worker是一个消息循环体,继承自ThreadSafeRpcEndpoint,可以收消息,也可以发消息。Worker的onStart方法如下:
Worker的onStart方法中调用了registerWithMaster()。
registerWithMaster方法中调用了tryRegisterAllMasters,向所有的Master进行注册。
Worker.scala的源码如下:
tryRegisterAllMasters方法中,由于实际运行时有很多Master,因此使用线程池的线程进行提交,然后获取masterEndpoint。masterEndpoint是一个RpcEndpointRef,通过sendRegister-MessageToMaster (masterEndpoint)进行注册。
sendRegisterMessageToMaster方法仅将RegisterWorker消息发送给Master消息循环体。sendRegisterMessageToMaster方法内部不作其他处理。
sendRegisterMessageToMaster方法中的masterEndpoint.send传进去的是RegisterWorker。RegisterWorker是一个case class,包括id、host、port、worker、cores、memory等信息,这里Worker是自己的引用RpcEndpointRef,Master通过Ref通worker通信。
RegisterWorker.scala的源码如下:
Worker通过sendRegisterMessageToMaster向Master发送了RegisterWorker消息,Master收到RegisterWorker请求后,进行相应的处理。
Master.scala的receive的源码如下:
RegisterWorker中,Master接收到Worker的注册请求后,首先判断当前的Master是否是Standby的模式,如果是,就不处理;Master的idToWorker包含了所有已经注册的Worker的信息,然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册,如果有,此时不会重复注册;其中idToWorker是一个HashMap,Key是String代表Worker的字符描述,Value是WorkerInfo。
1. private val idToWorker = new HashMap[String, WorkerInfo]
WorkerInfo包括id、host、port、cores、memory、endpoint等内容。
Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息,然后调用registerWorker执行具体的注册的过程,如果Worker的状态是DEAD的状态,则直接过滤掉。对于UNKNOWN的内容,调用removeWorker进行清理(包括清理该Worker下的Executors和Drivers)。其中,UNKNOWN的情况:Master进行切换时,先对Worker发UNKNOWN消息,只有当Master收到Worker正确的回复消息,才将状态标识为正常。
Spark 2.2.1版本的Master.scala的registerWorker的源码如下:
Spark 2.4.3版本的Master.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第14行removeWorker(oldWorker)新增一个消息说明信息。
1. removeWorker(oldWorker, "Worker replaced by a new worker with same address")
在registerWorker方法中,Worker注册完成后,把注册的Worker加入到Master的内存数据结构中。
回到Master.scala的receiveAndReply方法,Worker注册完成后,调用persistenceEngine.addWorker(worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注册的数据进行持久化。
PersistenceEngine.scala的addWorker方法如下:
1. final def addWorker(worker: WorkerInfo): Unit = { 2. persist("worker_" + worker.id, worker) 3. }
ZooKeeperPersistenceEngine是PersistenceEngine的一个具体实现子类,其persist方法如下:
回到Master.scala的receiveAndReply方法,注册的Worker数据持久化后,进行schedule()。至此,Worker的注册完成。
同样,Driver的注册过程:Driver提交给Master进行注册,Master会将Driver的信息放入内存缓存中,加入等待调度的队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。
Application的注册过程:Application提交给Master进行注册,Driver启动后会执行SparkContext的初始化,进而导致StandaloneSchedulerBackend的产生,其内部有StandaloneAppClient。StandaloneAppClient内部有ClientEndpoint。ClientEndpoint来发送RegisterApplication信息给Master。Master会将Application的信息放入内存缓存中,把Application加入等待调度的Application队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。
2.Master对Driver和Executor状态变化的处理
1)对Driver状态变化的处理
如果Driver的各个状态是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED |DriverState.FAILED,就将其清理掉。其他情况则报异常。
removeDriver清理掉Driver后,再次调用schedule方法,removeDriver的源码如下:
2)对Executor状态变化的处理
ExecutorStateChanged的源码如下:
Executor挂掉时系统会尝试一定次数的重启(最多重启10次)。