Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

4.1 Spark Driver Program剖析

SparkContext是通往Spark集群的唯一入口,是整个Application运行调度的核心。本节将深度剖析SparkContext。

4.1.1 Spark Driver Program

Spark Driver Program(以下简称Driver)是运行Application的main函数并且新建SparkContext实例的程序。其实,初始化SparkContext是为了准备Spark应用程序的运行环境,在Spark中,由SparkContext负责与集群进行通信、资源的申请、任务的分配和监控等。当Worker节点中的Executor运行完毕Task后,Driver同时负责将SparkContext关闭。通常也可以使用SparkContext来代表驱动程序(Driver)。

Driver(SparkContext)整体架构图如图4-1所示。

图4-1 Driver(SparkContext)整体架构图

4.1.2 SparkContext深度剖析

SparkContext是通往Spark集群的唯一入口,可以用来在Spark集群中创建RDDs、累加器(Accumulators)和广播变量(Broadcast Variables)。SparkContext也是整个Spark应用程序(Application)中至关重要的一个对象,可以说是整个Application运行调度的核心(不是指资源调度)。

SparkContext的核心作用是初始化Spark应用程序运行所需要的核心组件,包括高层调度器(DAGScheduler)、底层调度器(TaskScheduler)和调度器的通信终端(SchedulerBackend),同时还会负责Spark程序向Master注册程序等。

一般而言,通常为了测试或者学习Spark开发一个Application,在Application的main方法中,最开始几行编写的代码一般是这样的:首先,创建SparkConf实例,设置SparkConf实例的属性,以便覆盖Spark默认配置文件spark-env.sh,spark-default.sh和log4j.properties中的参数;然后,SparkConf实例作为SparkContext类的唯一构造参数来实例化SparkContext实例对象。SparkContext在实例化的过程中会初始化DAGScheduler、TaskScheduler和SchedulerBackend,而当RDD的action触发了作业(Job)后,SparkContext会调用DAGScheduler将整个Job划分成几个小的阶段(Stage),TaskScheduler会调度每个Stage的任务(Task)进行处理。还有,SchedulerBackend管理整个集群中为这个当前的Application分配的计算资源,即Executor。

如果用一个车来比喻Spark Application,那么SparkContext就是车的引擎,而SparkConf是关于引擎的配置参数。说明:只可以有一个SparkContext实例运行在一个JVM内存中,所以在创建新的SparkContext实例前,必须调用stop方法停止当前JVM唯一运行的SparkContext实例。

Spark程序在运行时分为Driver和Executor两部分:Spark程序编写是基于SparkContext的,具体包含两方面。

 Spark编程的核心基础RDD是由SparkContext最初创建的(第一个RDD一定是由SparkContext创建的)。

 Spark程序的调度优化也是基于SparkContext,首先进行调度优化。

 Spark程序的注册是通过SparkContext实例化时生产的对象来完成的(其实是SchedulerBackend来注册程序)。

 Spark程序在运行时要通过Cluster Manager获取具体的计算资源,计算资源获取也是通过SparkContext产生的对象来申请的(其实是SchedulerBackend来获取计算资源的)。

 SparkContext崩溃或者结束的时候,整个Spark程序也结束。

4.1.3 SparkContext源码解析

SparkContext是Spark应用程序的核心。我们运行WordCount程序,通过日志来深入了解SparkContext。

WordCount.scala的代码如下:

在IDEA中运行WordCount.scala代码,日志显示如下:

程序一开始,日志里显示的是:INFO SparkContext: Running Spark version 2.4.3,日志中间部分是一些随着SparkContext创建而创建的对象,另一条比较重要的日志信息,作业启动了并正在运行:INFO SparkContext: Starting job: runJob at SparkHadoopWriter.scala:78。

在程序运行的过程中会创建TaskScheduler、DAGScheduler和SchedulerBackend,它们有各自的功能。DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是底层调度器。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。程序打印结果后便开始结束。日志显示:INFO SparkContext: Successfully stopped SparkContext。

通过这个例子可以感受到Spark程序的运行到处都可以看到SparkContext的存在,我们将SparkContext作为Spark源码阅读的入口,来理解Spark的所有内部机制。

图4-2是从一个整体去看SparkContext创建的实例对象。首先,SparkContext构建的顶级三大核心为DAGScheduler、TaskScheduler、SchedulerBackend,其中,DAGScheduler是面向Job的Stage的高层调度器;TaskScheduler是一个接口,是底层调度器,根据具体的ClusterManager的不同会有不同的实现,Standalone模式下具体的实现是TaskSchedulerImpl。SchedulerBackend是一个接口,根据具体的ClusterManager的不同会有不同的实现。Standalone模式下具体的实现是StandaloneSchedulerBackend。

图4-2 SparkContext整体运行图

从整个程序运行的角度讲,SparkContext包含四大核心对象:DAGScheduler、TaskScheduler、SchedulerBackend、MapOutputTrackerMaster。StandaloneSchedulerBackend有三大核心功能:负责与Master连接,注册当前程序RegisterWithMaster;接收集群中为当前应用程序分配的计算资源Executor的注册并管理Executors;负责发送Task到具体的Executor执行。

第一步:程序一开始运行时会实例化SparkContext里的对象,所有不在方法里的成员都会被实例化!一开始实例化时第一个关键的代码是createTaskScheduler,它位于SparkContext的PrimaryConstructor中,当它实例化时会直接被调用,这个方法返回的是taskScheduler和dagScheduler的实例,然后基于这个内容又构建了DAGScheduler,最后调用taskScheduler的start()方法。要先创建taskScheduler,然后再创建dagScheduler,因为taskScheduler是受dagScheduler管理的。

SparkContext.scala的源码如下:

第二步:调用createTaskScheduler,这个方法创建了TaskSchedulerImpl和StandaloneSchedulerBackend,createTaskScheduler方法的第一个入参是SparkContext,传入的this对象是在应用程序中创建的sc,第二个入参是master的地址。

以下是WordCount.scala创建SparkConf和SparkContext的上下文信息。

当SparkContext调用createTaskScheduler方法时,根据集群的条件创建不同的调度器,例如,createTaskScheduler第二个入参master如传入local参数,SparkContext将创建TaskSchedulerImpl实例及LocalSchedulerBackend实例,在测试代码的时候,可以尝试传入local[*]或者是local[2]的参数,然后跟踪代码,看看创建了什么样的实例对象。

SparkContext中的SparkMasterRegex对象定义不同的正则表达式,从master字符串中根据正则表达式适配master信息。

SparkContext.scala的源码如下:

这是设计模式中的策略模式,它会根据实际需要创建出不同的SchedulerBackend的子类。

SparkContext.scala的createTaskScheduler方法的源码如下:

在实际生产环境下,我们都是用集群模式,即以spark://开头,此时在程序运行时,框架会创建一个TaskSchedulerImpl和StandaloneSchedulerBackend的实例,在这个过程中也会初始化taskscheduler,把StandaloneSchedulerBackend的实例对象作为参数传入。StandaloneSchedulerBackend被TaskSchedulerImpl管理,最后返回TaskScheduler和StandaloneSchdeulerBackend。

SparkContext.scala的源码如下:

createTaskScheduler方法执行完毕后,调用了taskscheduler.start()方法来正式启动taskscheduler,这里虽然调用了taskscheduler.start方法,但实际上是调用了taskSchedulerImpl的start方法,因为taskSchedulerImpl是taskScheduler的子类。

Task默认失败重试次数是4次,如果任务不容许失败,就可以调大这个参数。调大spark.task.maxFailures参数有助于确保重要的任务失败后可以重试多次。

初始化TaskSchedulerImpl:调用createTaskScheduler方法时会初始化TaskSchedulerImpl,然后把StandaloneSchedulerBackend当作参数传进去,初始化TaskSchedulerImpl时首先是创建一个Pool来初定义资源分布的模式Scheduling Mode,默认是先进先出(FIFO)的模式。

Spark 2.1.1版本的TaskSchedulerImpl.scala的initialize的源码如下:

rootPool作为TaskSchedulerImpl类的成员变量,在构建TaskSchedulerImpl时初始化。

1.  val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
2.  ......

可以设置spark.scheduler.mode参数来定义资源调度池,例如FAIR、FIFO,默认资源调度池是先进先出(FIFO)模式。

TaskSchedulerImpl.scala的源码如下:

SchedulingMode.scala的源代码如下:

回到taskScheduler start方法,taskScheduler.start方法调用时会再调用schedulerbackend的start方法。

TaskSchedulerImpl.scala的start方法的源码如下:

SchedulerBackend包含多个子类,分别是LocalSchedulerBackend、CoarseGrainedScheduler-Backend和StandaloneSchedulerBackend、MesosCoarseGrainedSchedulerBackend、YarnScheduler-Backend。

StandaloneSchedulerBackend的start方法调用了CoarseGraninedSchedulerBackend的start方法,通过StandaloneSchedulerBackend注册程序把command提交给Master:Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries++ testingClassPath, libraryPathEntries, javaOpts)来创建一个StandaloneAppClient的实例。

StandaloneSchedulerBackend.scala的start方法的源码如下:

Master发指令给Worker去启动Executor所有的进程时加载的Main方法所在的入口类就是command中的CoarseGrainedExecutorBackend,在CoarseGrainedExecutorBackend中启动Executor(Executor是先注册,再实例化),Executor通过线程池并发执行Task,然后再调用它的run方法。

CoarseGrainedExecutorBackend.scala的源码如下:

CoarseGrainedExecutorBackend的main入口方法中调用了run方法。

Spark 2.2.1版本的CoarseGrainedExecutorBackend的run入口方法的源码如下:

Spark 2.4.3版本的CoarseGrainedExecutorBackend.scala源码与Spark 2.2.1版本相比具有如下特点。

 删除上段代码中第18行val port = executorConf.getInt("spark.executor.port", 0)。

 上段代码中第22行将port修改为–1。

 删除上段代码中第41~45行代码。

 上段代码中第40行之后新增确认设置正确的Hadoop配置的代码。

 上段代码中第48行去掉port端口的参数。

 删除上段代码中第55行代码。

CoarseGrainedExecutorBackend通过消息循环体向driver发送RetrieveSparkAppConfig消息,RetrieveSparkAppConfig是一个case object。Driver端的CoarseGrainedSchedulerBackend消息循环体收到消息以后,将Spark的属性信息sparkProperties及加密key等内容封装成SparkAppConfig消息,将SparkAppConfig消息再回复给CoarseGrainedExecutorBackend。

回到StandaloneSchedulerBackend.scala的start方法:其中创建了一个很重要的对象,即StandaloneAppClient对象,然后调用它的client.start()方法。

在start方法中创建一个ClientEndpoint对象。

StandaloneAppClient.scala的star方法的源码如下:

ClientEndpoint是一个RpcEndPoint,首先调用自己的onStart方法,接下来向Master注册。

StandaloneAppClient.scala的ClientEndpoint类的源码如下:

调用registerWithMaster方法,从registerWithMaster调用tryRegisterAllMasters,开一条新的线程来注册,然后发送一条信息(RegisterApplication的case class)给Master。

StandaloneAppClient.scala的registerWithMaster的源码如下:

StandaloneAppClient.scala的tryRegisterAllMasters的源码如下:

Master收到RegisterApplication信息后便开始注册,注册后再次调用schedule()方法。

Master.scala的receive方法的源码如下:

总结:从SparkContext创建taskSchedulerImpl初始化不同的实例对象来完成最终向Master注册的任务,中间包括调用scheduler的start方法和创建StandaloneAppClient来间接创建ClientEndPoint完成注册工作。

我们把SparkContext称为天堂之门,SparkContext开启天堂之门:Spark程序是通过SparkContext发布到Spark集群的;SparkContext导演天堂世界:Spark程序的运行都是在SparkContext为核心的调度器的指挥下进行的;SparkContext关闭天堂之门:SparkContext崩溃或者结束的时候整个Spark程序也结束。