Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

5.1 Master启动原理和源码详解

本节讲解Master启动的原理和源码;Master HA双机切换;Master的注册机制和状态管理解密等内容。

5.1.1 Master启动的原理详解

Spark应用程序作为独立的集群进程运行,由主程序中的SparkContext对象(称为驱动程序)协调。Spark集群部署组件如图5-1所示。

图5-1 Spark集群部署组件图

其中各个术语及相关术语的描述如下。

(1)Driver Program:运行Application的main函数并新建SparkContext实例的程序,称为驱动程序(Driver Program)。通常可以使用SparkContext代表驱动程序。

(2)Cluster Manager:集群管理器(Cluster Manager)是集群资源管理的外部服务。Spark上现在主要有Standalone、YARN、Mesos 3种集群资源管理器。Spark自带的Standalone模式能够满足绝大部分纯粹的Spark计算环境中对集群资源管理的需求,基本上只有在集群中运行多套计算框架的时候才建议考虑YARN和Mesos。

(3)Worker Node:集群中可以运行Application代码的工作节点(Worker Node),相当于Hadoop的Slave节点。

(4)Executor:在Worker Node上为Application启动的一个工作进程,在进程中负责任务(Task)的运行,并且负责将数据存放在内存或磁盘上,在Executor内部通过多线程的方式(即线程池)并发处理应用程序的具体任务。

每个Application都有各自独立的Executors,因此应用程序之间是相互隔离的。

(5)Task:任务(Task)是指被Driver送到Executor上的工作单元。通常,一个任务会处理一个Partition的数据,每个Partition一般是一个HDFS的Block块的大小。

(6)Application:是创建了SparkContext实例对象的Spark用户程序,包含了一个Driver program和集群中多个Worker上的Executor。

(7)Job:和Spark的action对应,每个action,如count、savaAsTextFile等都会对应一个Job实例,每个Job会拆分成多个Stages,一个Stage中包含一个任务集(TaskSet),任务集中的各个任务通过一定的调度机制发送到工作单位(Executor)上并行执行。

Spark Standalone集群的部署采用典型的Master/Slave架构。其中,Master节点负责整个集群的资源管理与调度,Worker节点(也可以称Slave节点)在Master节点的调度下启动Executor,负责执行具体工作(包括应用程序以及应用程序提交的任务)。

5.1.2 Master启动的源码详解

Spark中各个组件是通过脚本来启动部署的。下面以脚本为入口点开始分析Master的部署。每个组件对应提供了启动的脚本,同时也会提供停止的脚本。停止脚本比较简单,在此仅分析启动脚本。

1.Master部署的启动脚本解析

首先看一下Master的启动脚本./sbin/start-master.sh,内容如下:

通过脚本的简单分析,可以看出Master组件是以后台守护进程的方式启动的,对应的后台守护进程的启动脚本spark-daemon.sh,在后台守护进程的启动脚本spark-daemon.sh内部,通过脚本spark-class启动一个指定主类的JVM进程,相关代码如下:

通过脚本的分析,可以知道最终执行的是Master类(对应的代码为前面的CLASS="org.apache.spark.deploy.master.Master"),对应的入口点是Master伴生对象中的main方法。下面以该方法作为入口点进一步解析Master部署框架。

部署Master组件时,最简单的方式是直接启动脚本,不带任何选项参数,命令如下:

1.  ./sbin/start-master.sh

如需设置选项参数,可以查看帮助信息,根据自己的需要进行设置。

2.Master的源码解析

首先查看Master伴生对象中的main方法。

Spark 2.2.1版本的Master.scala的源码如下:

Spark 2.4.3版本的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第2行之前新增SparkUncaughtExceptionHandler处理的代码。

和其他类(如SparkSubmit)一样,Master类的入口点处也包含了对应的参数类MasterArguments。MasterArguments类包括Spark属性配置相关的一些解析。

MasterArguments.scala的源码如下:

MasterArguments中的printUsageAndExit方法对应的就是命令行中的帮助信息。

MasterArguments.scala的源码如下:

解析完Master的参数后,调用startRpcEnvAndEndpoin方法启动RPC通信环境以及Master的RPC通信终端。

Master.scala的startRpcEnvAndEndpoint的源码如下:

startRpcEnvAndEndpoint方法中定义了ENDPOINT_NAME。

Master.scala的源码如下:

startRpcEnvAndEndpoint方法中通过masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)给Master自己发送一个消息BoundPortsRequest,是一个case object。发送消息BoundPortsRequest给自己,确保masterEndpoint正常启动起来。返回消息的类型是BoundPortsResponse,是一个case class。

MasterMessages.scala的源码如下:

Master收到消息BoundPortsRequest,发送返回消息BoundPortsResponse。

Master.scala的源码如下:

在BoundPortsResponse传入的参数restServerBoundPort是在Master的onStart方法中定义的。

Master.scala的源码如下:

而restServerBoundPort是通过restServer进行map操作启动赋值。下面看一下restServer。

Master.scala的源码如下:

其中调用new()函数创建一个StandaloneRestServer。StandaloneRestServer服务器响应请求提交的[RestSubmissionClient],将被嵌入到standalone Master中,仅用于集群模式。服务器根据不同的情况使用不同的HTTP代码进行响应。

 200 OK-请求已成功处理。

 400错误请求:请求格式错误,未成功验证,或意外类型。

 468未知协议版本:请求指定了此服务器不支持的协议。

 500内部服务器错误:服务器在处理请求时引发内部异常。

服务器在HTTP主体中总包含一个JSON表示的[SubmitRestProtocolResponse]。如果发生错误,服务器将包括一个[ErrorResponse]。如果构造了这个错误响应内部失败时,响应将由一个空体组成。响应体指示内部服务器错误。

StandaloneRestServer.scala的源码如下:

下面看一下RestSubmissionClient客户端。客户端提交申请[RestSubmissionServer]。在协议版本V1中,REST URL以表单形式出现http://[host:port]/v1/submissions/[action],[action]可以是create、kill或状态中的其中一种。每种请求类型都表示为发送到以下前缀的HTTP消息。

(1)submit-POST to /submissions/create;

(2)kill-POST /submissions/kill/[submissionId];

(3)status-GET /submissions/status/[submissionId]。

在(1)情况下,参数以JSON字段的形式发布到HTTP主体中。否则,URL指定按客户端的预期操作。由于该协议预计将在Spark版本中保持稳定,因此现有字段不能添加或删除,但可以添加新的可选字段。如在少见的事件中向前或向后兼容性被破坏,Spark须引入一个新的协议版本(如V2)。客户机和服务器必须使用协议的同一版本进行通信。如果不匹配,服务器将用它支持的最高协议版本进行响应。此客户机的实现可以用指定的版本使用该信息重试。

RestSubmissionClient.scala的源码如下:

Restful把一切都看成是资源。利用Restful API可以对Spark进行监控。程序运行的每一个步骤、Task的计算步骤都可以可视化,对Spark的运行进行详细监控。

回到startRpcEnvAndEndpoint方法中,新创建了一个Master实例。Master实例化时会对所有的成员进行初始化,如默认的Cores个数等。

Master.scala的源码如下:

Master继承了ThreadSafeRpcEndpoint和LeaderElectable,其中继承LeaderElectable涉及Master的高可用性(High Availability,HA)机制。这里先关注ThreadSafeRpcEndpoint,继承该类后,Master作为一个RpcEndpoint,实例化后首先会调用onStart方法。

Spark 2.2.1版本的Master.scala的源码如下:

Spark 2.4.3版本的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第9行之后新增webUi.addProxy()的代码。

1.  webUi.addProxy()

其中在Master的onStart方法中用new()函数创建MasterWebUI,启动一个webServer。

Master.scala的源码如下:

如MasterWebUI的spark.ui.killEnabled设置为True,可以通过WebUI页面把Spark的进程杀掉。

Spark 2.2.1版本的MasterWebUI.scala的源码如下:

Spark 2.4.3版本的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第17行将attachHandler方法调整为addStaticHandler方法。

1.  addStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)

MasterWebUI中在初始化时用new()函数创建MasterPage,在MasterPage中通过代码去写Web页面。

MasterPage.scala的源码如下:

回到MasterWebUI.scala的initialize()方法,其中调用了attachPage方法,在WebUI中增加Web页面。

Spark 2.2.1版本的WebUI.scala的源码如下:

Spark 2.4.3版本的WebUI.scala的源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第1行进行修改,增加attachPage方法的返回类型。

1.  def attachPage(page: WebUIPage): Unit = {

在WebUI的bind方法中启用了JettyServer。

WebUI.scala的bind的源码如下:

JettyUtils.scala的startJettyServer尝试将Jetty服务器绑定到所提供的主机名、端口。

startJettyServer的源码如下:

5.1.3 Master HA双机切换

Spark生产环境下一般采用ZooKeeper作HA,且建议为3台Master,ZooKeeper会自动化管理Masters的切换。

采用ZooKeeper作HA时,ZooKeeper会保存整个Spark集群运行时的元数据,包括Workers、Drivers、Applications、Executors。

ZooKeeper遇到当前Active级别的Master出现故障时会从Standby Masters中选取出一台作为Active Master,但是要注意,被选举后到成为真正的Active Master之前需要从ZooKeeper中获取集群当前运行状态的元数据信息并进行恢复。

在Master切换的过程中,所有已经在运行的程序皆正常运行。因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时,Job本身的调度和处理和Master是没有任何关系的。

在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群,因为只有Active Master才能接收新的程序提交请求;另一方面,已经运行的程序中也不能因为Action操作触发新的Job提交请求。

ZooKeeper下Master HA的基本流程如图5-2所示。

ZooKeeper下Master HA的基本流程如下。

(1)使用ZooKeeperPersistenceEngine读取集群的状态数据,包括Drivers、Applications、Workers、Executors等信息。

(2)判断元数据信息是否有空的内容。

(3)把通过ZooKeeper持久化引擎获得了Drivers、Applications、Workers、Executors等信息,重新注册到Master的内存中缓存起来。

(4)验证获得的信息和当前正在运行的集群状态的一致性。

(5)将Application和Workers的状态标识为Unknown,然后向Application中的Driver以及Workers发送现在是Leader的Standby模式的Master的地址信息。

(6)当Driver和Workers收到新的Master的地址信息后会响应该信息。

(7)Master接收到来自Drivers和Workers响应的信息后会使用一个关键的方法:completeRecovery()来对没有响应的Applications (Drivers)、Workers (Executors)进行处理。处理完毕后,Master的State会变成RecoveryState.ALIVE,从而开始对外提供服务。

图5-2 ZooKeeper下Master HA的基本流程

(8)此时Master使用自己的Schedule方法对正在等待的Application和Drivers进行资源调度。

Master HA的4大方式分别是ZOOKEEPER、FILESYSTEM、CUSTOM、NONE。

需要说明的是:

(1)ZOOKEEPER是自动管理Master。

(2)FILESYSTEM的方式在Master出现故障后需要手动重新启动机器,机器启动后会立即成为Active级别的Master来对外提供服务(接收应用程序提交的请求、接收新的Job运行的请求)。

(3)CUSTOM的方式允许用户自定义Master HA的实现,这对高级用户特别有用。

(4)NONE,这是默认情况,Spark集群中就采用这种方式,该方式不会持久化集群的数据,Master启动后立即管理集群。

Master.scala的HA的源码如下:

Spark默认的HA方式是NONE。

1.  private val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE")

如使用ZOOKEEPER的HA方式,RecoveryModeFactory.scala的源码如下:

通过调用zkFactory.createPersistenceEngine()用new()函数创建一个ZooKeeper-PersistenceEngine。

ZooKeeperPersistenceEngine.scala的源码如下:

PersistenceEngine中有至关重要的方法persist来实现数据持久化,readPersistedData来恢复集群中的元数据。

PersistenceEngine.scala的源码如下:

下面来看createdLeaderElectionAgent方法。在createdLeaderElectionAgent方法中调用new()函数创建ZooKeeperLeaderElectionAgent实例。

StandaloneRecoveryModeFactory.scala的源码如下:

ZooKeeperLeaderElectionAgent的源码如下:

FILESYSTEM和NONE的方式采用MonarchyLeaderAgent的方式来完成Leader的选举,其实现是直接把传入的Master作为Leader。

LeaderElectionAgent.scala的源码如下:

FileSystemRecoveryModeFactory.scala的源码如下:

如果WorkerState状态为UNKNOWN(Worker不响应),就把它删除,如果以集群方式运行,driver失败后可以重新启动,最后把状态变回ALIVE。注意,这里要加入--supervise这个参数。

Master.scala的源码如下:

5.1.4 Master的注册机制和状态管理解密
1.Master对其他组件注册的处理

Master接收注册的对象主要是Driver、Application、Worker;Executor不会注册给Master,Executor是注册给Driver中的SchedulerBackend的。

Worker是在启动后主动向Master注册的,所以如果在生产环境下加入新的Worker到正在运行的Spark集群上,此时不需要重新启动Spark集群就能够使用新加入的Worker,以提升处理能力。假如在生产环境中的集群中有500台机器,可能又新加入100台机器,这时不需要重新启动整个集群,就可以将100台新机器加入到集群。

Spark 2.2.1版本的Worker的源码如下:

Spark 2.4.3版本的Worker.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第10行之后新增externalShuffleServiceSupplier的成员变量,externalShuffleServiceSupplier是ExternalShuffleService类型,ExternalShuffleService提供一个server,Executors可以从server中读取shuffle文件(而不是彼此之间直接读取),在executors被关闭或杀死的情况下提供对文件的不间断访问,需要SASL身份验证才能读取。

Worker是一个消息循环体,继承自ThreadSafeRpcEndpoint,可以收消息,也可以发消息。Worker的onStart方法如下:

Worker的onStart方法中调用了registerWithMaster()。

registerWithMaster方法中调用了tryRegisterAllMasters,向所有的Master进行注册。

Worker.scala的源码如下:

tryRegisterAllMasters方法中,由于实际运行时有很多Master,因此使用线程池的线程进行提交,然后获取masterEndpoint。masterEndpoint是一个RpcEndpointRef,通过sendRegister-MessageToMaster (masterEndpoint)进行注册。

sendRegisterMessageToMaster方法仅将RegisterWorker消息发送给Master消息循环体。sendRegisterMessageToMaster方法内部不作其他处理。

sendRegisterMessageToMaster方法中的masterEndpoint.send传进去的是RegisterWorker。RegisterWorker是一个case class,包括id、host、port、worker、cores、memory等信息,这里Worker是自己的引用RpcEndpointRef,Master通过Ref通worker通信。

RegisterWorker.scala的源码如下:

Worker通过sendRegisterMessageToMaster向Master发送了RegisterWorker消息,Master收到RegisterWorker请求后,进行相应的处理。

Master.scala的receive的源码如下:

RegisterWorker中,Master接收到Worker的注册请求后,首先判断当前的Master是否是Standby的模式,如果是,就不处理;Master的idToWorker包含了所有已经注册的Worker的信息,然后会判断当前Master的内存数据结构idToWorker中是否已经有该Worker的注册,如果有,此时不会重复注册;其中idToWorker是一个HashMap,Key是String代表Worker的字符描述,Value是WorkerInfo。

1.  private val idToWorker = new HashMap[String, WorkerInfo]

WorkerInfo包括id、host、port、cores、memory、endpoint等内容。

Master如果决定接收注册的Worker,首先会创建WorkerInfo对象来保存注册的Worker的信息,然后调用registerWorker执行具体的注册的过程,如果Worker的状态是DEAD的状态,则直接过滤掉。对于UNKNOWN的内容,调用removeWorker进行清理(包括清理该Worker下的Executors和Drivers)。其中,UNKNOWN的情况:Master进行切换时,先对Worker发UNKNOWN消息,只有当Master收到Worker正确的回复消息,才将状态标识为正常。

Spark 2.2.1版本的Master.scala的registerWorker的源码如下:

Spark 2.4.3版本的Master.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第14行removeWorker(oldWorker)新增一个消息说明信息。

1.  removeWorker(oldWorker, "Worker replaced by a new worker with same address")

在registerWorker方法中,Worker注册完成后,把注册的Worker加入到Master的内存数据结构中。

回到Master.scala的receiveAndReply方法,Worker注册完成后,调用persistenceEngine.addWorker(worker),PersistenceEngine是持久化引擎,在Zookeeper下就是Zookeeper的持久化引擎,把注册的数据进行持久化。

PersistenceEngine.scala的addWorker方法如下:

1.      final def addWorker(worker: WorkerInfo): Unit = {
2.    persist("worker_" + worker.id, worker)
3.  }

ZooKeeperPersistenceEngine是PersistenceEngine的一个具体实现子类,其persist方法如下:

回到Master.scala的receiveAndReply方法,注册的Worker数据持久化后,进行schedule()。至此,Worker的注册完成。

同样,Driver的注册过程:Driver提交给Master进行注册,Master会将Driver的信息放入内存缓存中,加入等待调度的队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。

Application的注册过程:Application提交给Master进行注册,Driver启动后会执行SparkContext的初始化,进而导致StandaloneSchedulerBackend的产生,其内部有StandaloneAppClient。StandaloneAppClient内部有ClientEndpoint。ClientEndpoint来发送RegisterApplication信息给Master。Master会将Application的信息放入内存缓存中,把Application加入等待调度的Application队列,通过持久化引擎(如ZooKeeper)把注册信息持久化,然后进行Schedule。

2.Master对Driver和Executor状态变化的处理

1)对Driver状态变化的处理

如果Driver的各个状态是DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED |DriverState.FAILED,就将其清理掉。其他情况则报异常。

removeDriver清理掉Driver后,再次调用schedule方法,removeDriver的源码如下:

2)对Executor状态变化的处理

ExecutorStateChanged的源码如下:

Executor挂掉时系统会尝试一定次数的重启(最多重启10次)。