6.2 Spark Application是如何向集群申请资源的
本节讲解Application申请资源的两种类型:第一种是尽可能在集群的所有Worker上分配Executor;第二种是运行在尽可能少的Worker上。本节讲解Application申请资源的源码内容,将彻底解密Spark Application是如何向集群申请资源的。
6.2.1 Application申请资源的两种类型详解
Master负责资源管理和调度。资源调度的方法schedule位于Master.scala类中,当注册程序或者资源发生改变时,都会导致schedule的调用。Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变时(包括Executor增加或者减少、Worker增加或者减少等)。
Spark默认为应用程序启动Executor的方式是FIFO的方式,也就是所有提交的应用程序都放在调度的等待队列中,先进先出,只有在满足了前面应用程序的资源分配的基础上,才能够满足下一个应用程序资源的分配;在FIFO的情况下,默认是spreadOutApps来让应用程序尽可能多地运行在所有的Node上。为应用程序分配Executors有两种方式:第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的、更好的数据本地性;第二种方式是尝试运行在尽可能少的Worker上。
为了更形象地描述Master的调度机制,下面通过图6-1介绍抽象的资源调度框架。
图6-1 Master中抽象的资源调度框架
其中,Worker1到WorkerN是集群中全部的Workers节点,调度时,会根据应用程序请求的资源信息,从全部Workers节点中过滤出资源足够的节点,假设可以得到Worker1到WorkerM的节点。当前过滤的需求是内核数和内存大小足够启动一个Executor,因为Executor是集群执行应用程序的单位组件(注意:和任务(Task)不是同一个概念,对应的任务是在Executor中执行的)。
选出可用Workers之后,会根据内核大小进行排序,这可以理解成是一种基于可用内核排序的、简单的负载均衡策略。然后根据设置的spreadOutApps参数,对应指定两种资源分配策略。
(1)当spreadOutApps=true:使用轮流均摊的策略,也就是采用圆桌(round-robin)算法,图中的虚线表示第一次轮流摊派的资源不足以满足申请的需求,因此开始第二轮摊派,依次轮流均摊,直到符合资源需求。
(2)当spreadOutApps=false:使用依次全占策略,依次从可用Workers上获取该Worker上可用的全部资源,直到符合资源需求。
对应图中Worker内部的小方块,在此表示分配的资源的抽象单位。对应资源的条件,理解的关键点在于资源是分配给Executor的,因此最终启动Executor时,占用的资源必须满足启动所需的条件。
前面描述了Workers上的资源是如何分配给应用程序的,之后正式开始为Executor分配资源,并向Worker发送启动Executor的命令了。根据申请时是否明确指定需要为每个Executor分配确定的内核个数,有:
(1)明确指定每个Executor需要分配的内核个数时:每次分配的是一个Executor所需的内核数和内存数,对应在某个Worker分配到的总的内核数可能是Executor的内核数的倍数,此时,该Worker节点上会启动多个Executor,每个Executor需要指定的内核数和内存数(注意该Worker节点上分配到的总的内存大小)。
(2)未明确指定每个Executor需要分配的内核个数时:每次分配一个内核,最后所有在某Worker节点上分配到的内核都会放到一个Executor内(未明确指定内核个数,因此可以一起放入一个Executor)。因此,最终该应用程序在一个Worker上只有一个Executor(这里指的是针对一个应用程序,当该Worker节点上存在多个应用程序时,仍然会为每个应用程序分别启动相应的Executor)。
在此强调、补充一下调度机制中使用的三个重要的配置属性。
①指定为所有Executors分配的总内核个数:在spark-submit脚本提交参数时进行配置。所有Executors分配的总内核个数的控制属性在类SparkSubmitArguments的方法printUsageAndExit中。
②指定需要为每个Executor分配的内核个数:在spark-submit脚本提交参数时进行配置。每个Executor分配的内核个数的控制属性在类SparkSubmitArguments的方法printUsageAndExit中。
SparkSubmitArguments.scala的源码如下:
③资源分配策略:数据本地性(数据密集)与计算密集的控制属性,对应的配置属性在Master类中,代码如下:
6.2.2 Application申请资源的源码详解
1.任务调度与资源调度的区别
任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度。
资源调度是指应用程序如何获得资源。
任务调度是在资源调度的基础上进行的,如果没有资源调度,任务调度就成为无源之水,无本之木。
2.资源调度内幕
(1)因为Master负责资源管理和调度,所以资源调度的方法schedule位于Master.scala类中,注册程序或者资源发生改变时都会导致schedule的调用,如注册程序时:
(2)Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等)。
进入schedule(),schedule为当前等待的应用程序分配可用的资源。每当一个新的应用程序进来时,schedule都会被调用。或者资源发生变化时(如Executor挂掉,Worker挂掉,或者新增加机器),schedule都会被调用。
(3)当前Master必须以ALIVE的方式进行资源调度,如果不是ALIVE的状态,就会直接返回,也就是Standby Master不会进行Application的资源调用。
(4)接下来通过workers.toSeq.filter(_.state == WorkerState.ALIVE)过滤判断所有Worker中哪些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作。
(5)使用Random.shuffle把Master中保留的集群中所有ALIVE级别的Worker的信息随机打乱;Master的schedule()方法中:workers是一个数据结构,打乱workers有利于负载均衡。例如,不是以固定的顺序启动launchDriver。WorkerInfo是Worker注册时将信息注册过来。
WorkerInfo.scala的源码如下:
随机打乱的算法:将Worker的信息传进来,先调用new()函数创建一个ArrayBuffer,将所有的信息放进去。然后将两个索引位置的内容进行交换。例如,如果有4个Worker,依次分别为第一个Worker至第四个Worker,第一个位置是第1个Worker,第2个位置是第2个Worker,第3个位置是第3个Worker,第4个位置是第4个Worker;通过Shuffle以后,现在第一个位置可能是第3个Worker,第2个位置可能是第1个Worker,第3个位置可能是第4个Worker,第4个位置可能是第2个Worker,位置信息打乱。
Random.scala中的shuffle方法,其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置。
(6)Master的schedule()方法中:循环遍历等待启动的Driver,如果是Client模式,就不需要waitingDrivers等待;如果是Cluster模式,此时Driver会加入waitingDrivers等待列表。
当SparkSubmit指定Driver在Cluster模式的情况下,此时Driver会加入waitingDrivers等待列表中,在每个DriverInfo的DriverDescription中有要启动Driver时对Worker的内存及Cores的要求等内容。
DriverInfo包括启动时间、ID、描述信息、提交时间等内容。
DriverInfo.scala的源码如下:
其中,DriverInfo的DriverDescription描述信息中包括jarUrl、内存、Cores、supervise、command等内容。如果在Cluster模式中指定supervise为True,那么Driver挂掉时就会自动重启。
DriverDescription.scala的源码如下:
在符合资源要求的情况下,采用随机打乱后的一个Worker来启动Driver,worker是Master中对Worker的一个描述。
Master.scala的launchDriver方法如下:
Master通过worker.endpoint.send(LaunchDriver)发指令给Worker,让远程的Worker启动Driver,Driver启动以后,Driver的状态就变成DriverState.RUNNING。
(7)先启动Driver,才会发生后续的一切资源调度的模式。
(8)Spark默认为应用程序启动Executor的方式是FIFO方式,也就是所有提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的资源分配的基础,才能够满足下一个应用程序资源的分配。
Master的schedule()方法中,调用startExecutorsOnWorkers()为当前的程序调度和启动Worker的Executor,默认情况下排队的方式是FIFO。
Spark 2.2.1版本的Master.scala的startExecutorsOnWorkers的源码如下:
Spark 2.4.3版本的Master.scala源码与Spark 2.2.1版本相比具有如下特点。
上段代码中第3行将for循环遍历语句调整为for (app <- waitingApps)。
上段代码中第4行构建coresPerExecutor变量调整为app.desc.coresPerExecutor.getOrElse(1),如果剩余的核心小于coresPerExecutor,则不会分配剩余的核心。
上段代码中第8行coresPerExecutor.getOrElse(1)调整为coresPerExecutor。
上段代码中第15行coresPerExecutor调整为app.desc.coresPerExecutor。
(9)为应用程序具体分配Executor前要判断应用程序是否还需要分配Core,如果不需要,则不会为应用程序分配Executor。
startExecutorsOnWorkers中的coresLeft是请求的requestedCores和可用的coresGranted的相减值。例如,如果整个程序要求1000个Cores,但是目前集群可用的只有100个Cores,如果coresLeft不为0,就放入等待队列中;如果coresLeft是0,那么就不需要调度。
1. private[master] def coresLeft: Int = requestedCores - coresGranted
(10)Master.scala的startExecutorsOnWorkers中,具体分配Executor之前,要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序,产生计算资源由大到小的usableWorkers数据结构。
然后调用scheduleExecutorsOnWorkers,在FIFO的情况下,默认spreadOutApps让应用程序尽可能多地运行在所有的Node上。
scheduleExecutorsOnWorker中,minCoresPerExecutor表示每个Executor最小分配的core个数。scheduleExecutorsOnWorker的源码如下:
(11)为应用程序分配Executors有两种方式:第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的、更好的数据本地性;第二种方式是尝试运行在尽可能少的Worker上。
(12)具体在集群上分配Cores时会尽可能地满足我们的要求。math.min用于计算最小值。coresToAssig用于计算app.coresLeft与可用的Worker中可用的Cores的和的最小值。例如,应用程序要求1000个Cores,但整个集群中只有100个Cores,所以只能先分配100个Cores。
scheduleExecutorsOnWorkers方法如下:
(13)如果每个Worker下面只能为当前的应用程序分配一个Executor,那么每次只分配一个Core。scheduleExecutorsOnWorkers方法如下:
总结为两种情况:一种情况是尽可能在一台机器上运行程序的所有功能;另一种情况是尽可能在所有节点上运行程序的所有功能。无论是哪种情况,每次给Executor增加Cores,是增加一个,如果是spreadOutApps的方式,循环一轮再下一轮。例如,有4个Worker,第一次为每个Executor启动一个线程,第二次循环分配一个线程,第三次循环再分配一个线程……
scheduleExecutorsOnWorkers方法如下:
回到Master.scala的startExecutorsOnWorkers,现在已经决定为每个worker分配多少个cores,然后进行资源分配。
allocateWorkerResourceToExecutors的源码如下:
allocateWorkerResourceToExecutors中的app.addExecutor增加一个Executor,记录Executor的相关信息。
回到allocateWorkerResourceToExecutors方法中,launchExecutor(worker, exec)启动Executor。
1. launchExecutor(worker, exec)
(14)准备具体要为当前应用程序分配的Executor信息后,Master要通过远程通信发指令给Worker来具体启动ExecutorBackend进程。
launchExecutor方法如下:
(15)紧接着给应用程序的Driver发送一个ExecutorAdded的信息。
launchExecutor方法如下: