Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

5.3 ExecutorBackend启动原理和源码详解

ExecutorBackend是Executor向集群发送更新消息的一个可插拔的接口。ExecutorBackend拥有不同的实现。Standalone模式下ExecutorBackend的默认实现是CoarseGrainedExecutorBackend;在Local模式下,ExecutorBackend的默认实现是LocalBackend。在Mesos调度模式下,ExecutorBackend的默认实现是MesosExecutorBackend。本节主要探索Standalone模式下的ExecutorBackend,通过源码深入理解ExecutorBackend接口设计的精髓。

5.3.1 ExecutorBackend接口与Executor的关系

本节将详细分析Standalone模式下ExecutorBackend和Executor的关系。在StandaloneSchedulerBackend中会实例化一个StandaloneAppClient。StandaloneAppClient中携带了command信息,command信息中指定了要启动的ExecutorBackend的实现类,Standalone模式下,该ExecutorBackend的实现类是org.apache.spark.executor.CoarseGrainedExecutorBackend类。

StandaloneSchedulerBackend.scala的start方法中构建了一个Command对象,该对象的第一个参数是mainClass,即进程的主类。该类在Standalone模式下为org.apache.spark.executor.CoarseGrainedExecutorBackend。分别将sparkJavaopts、javaOpts、command、appUiAddress、coresPerExecutor、appDes传入StandaloneAppClient构造函数。StandaloneAppClient将会向Master发送RegisterApplication注册请求,Master受理后通过launchExecutor方法在Worker节点启动一个ExecutorRunner对象,该对象用于管理一个Executor进程。在ExecutorRunner中将通过CommandUtil构建一个ProcessBuilder,调用ProcessBuilder的start方法将会以进程的方式启动org.apache.spark.executor.CoarseGrainedExecutorBackend。在CoarseGrainedExecotorBackend的onStart方法中,将会向Driver端发送RegisterExecutor(executorId, self, hostPort, cores,extractLogUrls)消息请求注册,完成注册后将立即返回一个RegisteredExecutor(executorAddress.host)消息,CoarseGraiendExecutorBackend收到该消息,马上实例化出一个Executor。

CoarseGrainedExecutorBackend.scala的源码如下:

从这里可以看出,CoarseGrainedExecutorBackend比Executor先实例化。CoarseGrained-ExecutorBackend负责与集群通信,而Executor则专注于任务的处理,它们是一对一的关系,在集群中各司其职。

每个Worker节点上可以启动多个CoarseGrainedExecutorBackend进程,每个进程对应一个Executor。

5.3.2 ExecutorBackend的不同实现

ExecutorBackend是与集群交互的接口,该接口在不同的调度模式下有不同的实现。图5-3是ExecutorBackend及其实现的关系类图。

图5-3 ExecutorBackend及其实现的关系类图

不同模式下,ExecutorRunner启动的进程不一样。在Standalone模式下启动的是org.apache.spark.executor.CoarseGrainedExecutorBackend进程;在Local模式下,启动的是org.apache.spark.executor.LocalExecutorBackend进程;在Mesos模式下,启动的是org.apache.spark.executor.MesosExecutorBackend进程。

下面来看Standalone模式下CoarseGrainedExecutorBackend的启动。在Standalone模式下,会启动org.apache.spark.deploy.Client类,该类将向Master发送RequestSubmitDriver(driverDescription)消息,Master中匹配到RequestSubmitDriver(driverDescription)后,将会调用schedule方法。

Master.scala的receiveAndReply的源码如下:

Master的receiveAndReply收到RequestSubmitDriver消息后,调用schedule方法。

Master的schedule的源码如下:

上面代码中,RecoveryState若不为ALIVE,则直接返回,否则使用Random.shuffle将Workers集合打乱,过滤出ALIVE的Worker,生成新的集合shuffledAliveWorkers,尽量考虑到选择Driver的负载均衡。在for语句中遍历waitingDrivers队列,判断Worker剩余内存和剩余物理核是否满足Driver需求,如满足,则调用launchDriver(worker,driver)方法在选中的Worker上启动Driver进程。

实例化SparkContext时,在SparkContext中将实例化出DAGScheduler、StandaloneSchedulerBackend。Driver在Worker节点上启动之后,在StandaloneSchedulerBackend中将会调用new()函数创建一个StandaloneAppClient。StandaloneAppClient中有一个ClientEndpoint,在其onStart方法中将向Master发送RegisterApplication请求注册application,注册好application后,Master又会调用schedule方法,在满足条件的Worker上为application启动Executor,首先会启动ExecutorRunner,在ExecutorRunner中启动CoarseGrainedExecutor-Backend,启动后将会实例化出Executor。为什么在Standalone模式下会启动CoarseGrained-ExecutorBackend呢?在什么地方设置要启动的CoarseGrainedExecutorBackend进程呢?其实,在实例化StandaloneAppClient的时候就已经传入了。

StandaloneSchedulerBackend.scala的start方法代码中设置了Command对象。Command对象的第一个参数是启动进程的mainClass。因此,ExecutorRunner中启动进程时,启动的是org.apache.spark.executor.CoarseGrainedExecutorBackend。

5.3.3 ExecutorBackend中的通信

ExecutorBackend是一个被Executor使用的可插拔的与集群通信的接口。在ExecutorBackend中有statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)方法,通过这个方法向集群发送Task执行的各种信息,如果任务执行失败,则返回失败的信息;如果执行成功,则返回任务执行的结果。本节重点讲解在Standalone模式下CoarseGrainedExecutor-Backend中的通信。CoarseGrainedExecutorBackend在整个集群中的通信如图5-4所示。

在图5-4中,Executor与CoarseGrainedExecutorBackend协作,将任务计算的结果通过CoarseGrainedExecutorBackend的statusUpdate方法将taskId、TaskState以及结果数据发送给Driver。Driver收到StatusUpdate(executorId,tasked,state,data)消息,通过判断state的不同状态,进行不同的处理。例如,当state的状态为TaskState.LOST时,Driver端会移除Executor;当state的状态为TaskState.FINISHED时,Driver端会调用enqueueSuccessfulTask进行处理。

图5-4 CoarseGrainedExecutorBackend在整个集群中的通信

这里主要看CoarseGrainedExecutorBackend与Driver之间的通信。当Worker节点中启动ExecutorRunner时,ExecutorRunner中会启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver发出RegisterExecutor注册请求。

CoarseGrainedExecutorBackend的onStart方法的源码如下:

上面的代码中,Some(ref)得到Driver的引用,通过ask方法返回Future[Boolean],然后在Future对象上调用onComplete方法进行额外的处理。Driver端收到注册请求,将会注册Executor的请求,并向ListenerBus中发送SparkListenerExecutorAdded事件。

如果executorDataMap中已经存在该Executor的id,就返回RegisterExecutorFailed,如果不存在该Executor的id,则在executorDataMap中加入该Executor的id,并返回RegisteredExecutor消息且向listenerBus中添加SparkListenerExecutorAdded事件。CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新建一个Executor执行器,并为此Executor充当信使,与Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源码如下所示。

CoarseGrainedExecutorBackend.scala的receive的源码如下:

从上面的代码中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新建一个Executor。由此可见,Executor在CoarseGrainedExecutorBackend后实例化,这与Executor和CoarseGrainedExecutorBackend的不同职责有关,Executor主要负责计算,而CoarseGrainedExecutorBackend主要负责通信,通信环境准备好了,架起同CoarseGrainedSchedulerBackend通信的桥梁,就可以接收CoarseGrainedSchedulerBackend中调用launchTask方法发送的LaunchTask消息了,因此通信在前,计算在后。

Executor中的计算结果是通过CoarseGrainedExecutorBackend的statusUpdate方法返回给CoarseGrainedExecutorBackend的。statusUpdate方法的代码如下所示。

CoarseGrainedExecutorBackend.scala的源码如下:

上面源码中,通过参数taskId、state、data构建一个StatusUpdate对象,该对象将被当作消息发送到Driver端,Driver根据返回结果的需要,将会向CoarseGrainedExecutorBackend发送新的指令消息,如LaunchTask、KillTask、StopExecutors、Shutdown等。

5.3.4 ExecutorBackend的异常处理

若CoarseGrainedExecutorBackend在运行中出现异常,将调用exitExecutor方法进行处理,处理以后,系统退出。exitExecutor函数可以由其他子类重载来处理,Executor执行的退出方式不同。例如,当Executor挂掉了,后台程序可能不会让父进程也挂掉。如果须通知Driver,Driver将清理挂掉的Executor的数据。

Spark 2.2.1版本的CoarseGrainedExecutorBackend的exitExecutor方法的源码如下:

Spark 2.4.3版本的CoarseGrainedExecutorBackend.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第13~17行替换为driver.get.send代码。

1.  driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))

CoarseGrainedExecutorBackend在运行中一旦出现异常情况,将调用exitExecutor方法处理。

 Executor向Driver注册RegisterExecutor失败。

 Executor收到Driver的RegisteredExecutor注册成功消息以后,创建Executor实例失败。

 Driver返回Executor注册失败消息RegisterExecutorFailed。

 Executor收到Driver的LaunchTask启动任务消息,但是Executor为null。

 Executor收到Driver的KillTask消息,但是Executor为null。

 Executor和Driver失去连接。