Spark大数据商业实战三部曲:内核解密、商业案例、性能调优(第2版)
上QQ阅读APP看书,第一时间看更新

6.1 Spark Application是如何提交给集群的

本节讲解Application提交参数配置、Application提交给集群原理、Application提交给集群源码等内容,将彻底解密Spark Application到底是如何提交给集群的。

6.1.1 Application提交参数配置详解

用户应用程序可以使用bin/spark-submit脚本来启动。spark-submit脚本负责使用Spark及其依赖关系设置类路径,并可支持Spark支持的不同群集管理器和部署模式。

bin/spark-submit脚本示例如下:

spark-submit脚本提交参数配置中一些常用的选项。

--class:应用程序的入口点(如org.apache.spark.examples.SparkPi)。

--master:集群的主URL(如spark://23.195.26.187:7077)。

--deploy-mode:将Driver程序部署在集群Worker节点(cluster);或作为外部客户端(client)部署在本地(默认值:client)。

--conf:任意Spark配置属性,使用key = value格式。对于包含空格的值,用引号括起来,如“key = value”。

application-jar:包含应用程序和所有依赖关系Jar包的路径。该URL必须在集群内全局可见。例如,所有节点上存在的hdfs://路径或file://路径。

application-arguments:传递给主类的main方法的参数。

6.1.2 Application提交给集群原理详解

在Spark官网部署页面(http://spark.apache.org/docs/latest/cluster-overview.html),可以看到当前集群支持以下4种集群管理器(cluster manager)。

(1)Standalone:Spark原生的简单集群管理器。使用Standalone可以很方便地搭建一个集群。

(2)Apache Mesos:一个通用的集群管理器,可以在上面运行HadoopMapReduce和一些服务型的应用。

(3)Hadoop YARN:在Hadoop 2中提供的资源管理器。

(4)Kubernetes:一个开源系统,用于自动化容器化应用程序的部署、扩张和管理。

另外,Spark提供的EC2启动脚本,可以很方便地在Amazon EC2上启动一个Standalone集群。

实际上,除了上面这些通用的集群管理器外,Spark内部也提供一些方便我们测试、学习的简单集群部署模式。为了更全面地理解,我们会从Spark应用程序部署点切入,也就是从提交一个Spark应用程序开始,引出并详细解析各种部署模式。

说明:下面涉及类的描述时,如果可以通过类名唯一确定一个类,将直接给出类名,如果不能,会先给出全路径的类名,然后在不出现歧义的地方再简写为类名。

为了简化应用程序提交的复杂性,Spark提供了各种应用程序提交的统一入口,即spark-submit脚本,应用程序的提交都间接或直接地调用了该脚本。下面简单分析几个脚本,包含./bin/spark-shell、./bin/pyspark、./bin/sparkR、./bin/spark-sql、./bin/run-example、./bin/speak-submit,以及所有脚本最终都调用到的一个执行Java类的脚本./bin/spark-class。

1.脚本./bin/spark-shell

通过该脚本可以打开使用Scala语言进行开发、调试的交互式界面,脚本的代码如下:

对应在第4行和第8行处,调用了应用程序提交脚本./bin/spark-submit。脚本./bin/spark-shell的基本用法如下:

1.  "Usage: ./bin/spark-shell [options]"

其他脚本类似。下面分别针对各个脚本的用法(具体用法可查看脚本的帮助信息,如通过--help选项来获取)与关键执行语句等进行简单解析。了解工具(如脚本)如何使用,最根本的是先查看其帮助信息,然后在此基础上进行扩展。

2.脚本./bin/pyspark

通过该脚本可以打开使用Python语言开发、调试的交互式界面。

(1)该脚本的用法如下:

1.  "Usage: ./bin/pyspark [options]"

(2)该脚本的执行语句如下:

3.脚本./bin/sparkR

通过该脚本可以打开使用sparkR开发、调试的交互式界面。

(1)该脚本的用法如下:

1.  "Usage: ./bin/sparkR [options]"

(2)该脚本的执行语句如下:

1.  exec "${SPARK_HOME}"/bin/spark-submit sparkr-shell-main "$@"
4.脚本./bin/spark-sql

通过该脚本可以打开使用SparkSql开发、调试的交互式界面。

(1)该脚本的用法如下:

1.  "Usage: ./bin/spark-sql [options] [cli option]"

(2)该脚本的执行语句如下:

5.脚本./bin/run-example

可以通过该脚本运行Spark 2.4.3自带的案例代码。该脚本中会自动补全案例类的路径。

(1)该脚本的用法如下:

(2)该脚本的执行语句如下:

1. exec "${SPARK_HOME}"/bin/spark-submit run-example "$@"
6.脚本./bin/spark-submit

./bin/spark-submit是提交Spark应用程序最常用的一个脚本。从前面各个脚本的解析可以看出,各个脚本最终都调用了./bin/spark-submit脚本。

(1)该脚本的用法。

该脚本的用法需要从源码中获取,具体源码位置参考SparkSubmitArguments类的方法printUsageAndExit,代码如下:

(2)该脚本的执行语句如下:

7.脚本./bin/spark-class

该脚本是Spark 2.4.3所有其他脚本最终都调用到的一个执行Java类的脚本。其中关键的执行语句如下:

其中,负责运行的RUNNER变量设置如下:

在脚本中,LAUNCH_CLASSPATH变量对应Java命令运行时所需的classpath信息。最终Java命令启动的类是org.apache.spark.launcher.Main。Main类的入口函数main,会根据输入参数构建出最终执行的命令,即这里返回的${CMD[@]}信息,然后通过exec执行。

6.1.3 Application提交给集群源码详解

本节从应用部署的角度解析相关的源码,主要包括脚本提交时对应JVM进程启动的主类org.apache.spark.launcher.Main、定义应用程序提交的行为类型的类org.apache.spark.deploy.SparkSubmitAction、应用程序封装底层集群管理器和部署模式的类org.apache.spark.deploy.SparkSubmit,以及代表一个应用程序的驱动程序的类org.apache.spark.SparkContext。

1.Main解析

从前面的脚本分析,得出最终都是通过org.apache.spark.launcher.Main类(下面简称Main类)启动应用程序的。因此,首先解析一下Main类。

在Main类的源码中,类的注释如下:

对应地,在Main对象的入口方法main的注释如下。

Main.java源码如下:

Main类主要有两种工作模式,分别描述如下。

(1)spark-submit

启动器要启动的类为org.apache.spark.deploy.SparkSubmit时,对应为spark-submit工作模式。此时,使用SparkSubmitCommandBuilder类来构建启动命令。

(2)spark-class

启动器要启动的类是除SparkSubmit之外的其他类时,对应为spark-class工作模式。此时使用SparkClassCommandBuilder类的buildCommand方法来构建启动命令。

Spark 2.2.1版本的Main.java源码如下:

Spark 2.4.3版本的Main.java源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第5行删掉AbstractCommandBuilder builder的定义。

 上段代码中第8行、第11行builder变量调整为AbstractCommandBuilder builder。

以spark-submit工作模式为例,对应的在构建启动命令的SparkSubmitCommandBuilder类中,上述调用的SparkClassCommandBuilder构造函数定义如下。

Spark 2.2.1版本的SparkSubmitCommandBuilder.java的源码如下:

Spark 2.4.3版本的SparkSubmitCommandBuilder.java源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第5行后新增加一行代码this.userArgs = Collections.emptyList()。

 上段代码中第22行后新增加一行代码appResource = SparkLauncher.NO_RESOURCE。

 上段代码中第27行构建OptionParser实例时,新增一个参数True。

 上段代码中第29行将this.isAppResourceReq = parser.isAppResourceReq调整为this.isSpecialCommand = parser.isSpecialCommand。

 上段代码中第32行将this.isAppResourceReq = false调整为this.isSpecialCommand =true。

从这些初步的参数解析可以看出,前面脚本中的参数与最终对应的主资源间的对应关系见表6-1。

表6-1 脚本中的参数与主资源间的对应关系

如果继续跟踪appResource赋值的源码,可以跟踪到一些特殊类的类名与最终对应的主资源间的对应关系,见表6-2。

表6-2 特殊类的类名与主资源间的对应关系

如果有兴趣,可以继续跟踪SparkClassCommandBuilder类的buildCommand方法的源码,查看构建的命令具体有哪些。

通过Main类的简单解析,可以将前面的脚本分析结果与后面即将进行分析的SparkSubmit类关联起来,以便进一步解析与应用程序提交相关的其他源码。

从前面的脚本分析可以看到,提交应用程序时,Main启动的类,也就是用户最终提交执行的类是org.apache.spark.deploy.SparkSubmit。因此,下面开始解析SparkSubmit相关的源码,包括提交行为的定义、提交时的参数解析以及最终提交运行的代码解析。

2.SparkSubmitAction解析

SparkSubmitAction定义了提交应用程序的行为类型。SparkSubmit.scala的源码如下:

从源码中可以看到,分别定义了SUBMIT、KILL、REQUEST_STATUS这3种行为类型,对应提交应用、停止应用、查询应用的状态。

3.SparkSubmit解析

SparkSubmit的全路径为org.apache.spark.deploy.SparkSubmit。从SparkSubmit类的注释可以看出,SparkSubmit是启动一个Spark应用程序的主入口点,这和前面从脚本分析得到的结论一致。首先看一下SparkSubmit类的注释,格式如下:

SparkSubmit会帮助我们设置Spark相关依赖包的classpath,同时,为了帮助用户简化提交应用程序的复杂性,SparkSubmit提供了一个抽象层,封装了底层复杂的集群管理器与部署模式的各种差异点,即通过SparkSubmit的封装,集群管理器与部署模式对用户是透明的。

在SparkSubmit中体现透明性的集群管理器定义的源码如下所示。

Spark 2.2.1版本的SparkSubmit.scala的源码如下:

Spark 2.4.3版本的SparkSubmit.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第6行之后新增一行代码private val KUBERNETES = 16。

 上段代码中第7行新增加一个变量KUBERNETES。

在SparkSubmit中体现透明性的部署模式定义的源码如下:

作为提交应用程序的入口点,SparkSubmit中根据具体的集群管理器进行参数转换、参数校验等操作,如对模式的检查,代码中给出了针对特定情况,不支持的集群管理器与部署模式,在这些模式下提交应用程序会直接报错退出。

Spark 2.2.1版本的SparkSubmit.scala的源码如下:

Spark 2.4.3版本的SparkSubmit.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第4、6、8、10、12、14行的printErrorAndExit方法调整为error方法。

首先,一个程序运行的入口点对应单例对象的main函数,因此在执行SparkSubmit时,对应的入口点是objectSparkSubmit的main函数。

Spark 2.2.1版本的SparkSubmit.scala的源码如下:

Spark 2.4.3版本的SparkSubmit.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第3行代码调整为parseArguments方法。

 上段代码中第9行之后新增一行代码case SparkSubmitAction.PRINT_VERSION =>printVersion()。

printVersion()方法用于打印Spark的版本信息。

其中,SparkSubmitArguments类对应用户调用提交脚本spark-submit时传入的参数信息。对应的脚本的帮助信息(./bin/spark-submit --help),也是由该类的printUsageAndExit方法提供的。

找到上面的入口点代码之后,就可以开始分析其内部的源码。对应参数信息的SparkSubmitArguments可以参考脚本的帮助信息,来查看具体参数对应的含义。参数分析后,便是对各种提交行为的具体处理。SparkSubmit支持SparkSubmitAction包含的3种行为,下面以行为SparkSubmitAction.SUBMIT为例进行分析,其他行为也可以通过各自的具体处理代码进行分析。

对应处理SparkSubmitAction.SUBMIT行为的代码入口点为submit(appArgs),进入该方法,即进入提交应用程序的处理方法。

Spark 2.2.1版本的SparkSubmit.scala的源码如下:

Spark 2.4.3版本的SparkSubmit.scala源码与Spark 2.2.1版本相比具有如下特点。

 上段代码中第1行submit方法新增加一个参数uninitLog。

 上段代码中第3行、12行、28行的变量名称sysProps调整为sparkConf。

 上段代码中第20行的日志打印语句printStream.println调整为error方法。

 上段代码中第30行之后新增加代码,如果uninitLog为True,则让主类在日志系统启动后重新初始化。

 上段代码中第38行的日志打印语句printStream.println调整为logInfo方法。

 上段代码中第44行的日志打印语句printWarning调整为logWarning方法。

 上段代码中第48行的submit(args)新增一个传入参数false,调整为submit(args, false)。

其中,最终运行所需的参数都由prepareSubmitEnvironment方法负责解析、转换,然后根据其结果执行。解析的结果包含以下4部分。

 子进程运行所需的参数。

 子进程运行时的classpath列表。

 系统属性的映射。

 子进程运行时的主类。

解析之后调用runMain方法,该方法中除了一些环境设置等操作外,最终会调用解析得到的childMainClass的main方法。下面简单分析一下prepareSubmitEnvironment方法,通过该方法来了解SparkSubmit是如何帮助底层的集群管理器和部署模式的封装的。里面涉及的各种细节比较多,这里以不同集群管理器和部署模式下最终运行的childMainClass类的解析为主线进行分析。

(1)当部署模式为CLIENT时,将childMainClass设置为传入的mainClass,对应代码如下:

(2)当集群管理器为STANDALONE、部署模式为CLUSTER时,根据提交的两种方式将childMainClass分别设置为不同的类,同时将传入的args.mainClass(提交应用程序时设置的主类)及其参数根据不同集群管理器与部署模式进行转换,并封装到新的主类所需的参数中,对应的设置见表6-3。

表6-3 STANDALONE+CLUSTER时两种不同提交方式下的childMainClass封装

其中,表述性状态传递(Representational State Transfer,REST)是Roy Fielding博士在2000年他的博士论文中提出来的一种软件架构风格。

这些设置的主类相当于封装了应用程序提交时的主类,运行后负责向Master节点申请启动提交的应用程序。

(3)当集群管理器为YARN、部署模式为CLUSTER时,childMainClass以及对应的mainClass的设置见表6-4。

表6-4 YARN+CLUSTER时childMainClass下的childMainClass封装

(4)当集群管理器为MESOS、部署模式为CLUSTER时,childMainClass以及对应的mainClass的设置见表6-5。

表6-5 MESOS+CLUSTER时childMainClass下的childMainClass封装

从上面的分析中可以看到,使用CLIENT部署模式进行提交时,由于设置的childMainClass为应用程序提交时的主类,因此是直接在提交点执行设置的主类,即mainClass,当使用CLUSTER部署模式进行提交时,则会根据具体集群管理器等信息,使用相应的封装类。这些封装类会向集群申请提交应用程序的请求,然后在由集群调度分配得到的节点上,启动所申请的应用程序。

以封装类设置为org.apache.spark.deploy.Client为例,从该类主入口main方法查看,可以看到构建了一个ClientEndpoint实例,该实例构建时,会将提交应用程序时设置的mainClass等信息封装到DriverDescription实例中,然后发送到Master,申请执行用户提交的应用程序。

对应各种集群管理器与部署模式的组合,实际代码中的处理细节非常多。这里仅给出一种源码阅读的方式,和对应的大数据处理一样,通常采用化繁为简的方式去阅读复杂的源码。例如,这里在理解整个大框架的调用过程后,以childMainClass的设置作为主线去解读源码,对应地,在扩展阅读其他源码时,也可以采用这种方式,以某种集群管理器与部署模式为主线,详细阅读相关的代码。最后,在了解各种组合的处理细节之后,通过对比、抽象等方法,对整个SparkSubmit进行归纳总结。

提交的应用程序的驱动程序(Driver Program)部分对应包含了一个SparkContext实例。因此,接下来从该实例出发,解析驱动程序在不同的集群管理器的部署细节。

4.SparkContext解析

在详细解析SparkContext实例前,首先查看一下SparkContext类的注释部分,具体如下:

SparkContext类是Spark功能的主入口点。一个SparkContext实例代表了与一个Spark集群的连接,并且通过该实例,可以在集群中构建RDDs、累加器以及广播变量。SparkContext实例的构建参数config描述了应用程序的Spark配置。在该参数中指定的配置属性会覆盖默认的配置属性以及系统属性。

在SparkContext类文件中定义了一个描述集群管理器类型的单例对象SparkMasterRegex,在该对象中详细给出了当前Spark支持的各种集群管理器类型。

SparkContext.scala的源码如下:

在SparkContext类中的主要流程可以归纳如下。

(1)createSparkEnv:创建Spark的执行环境对应的SparkEnv实例。

对应代码如下:

(2)createTaskScheduler:创建作业调度器实例。

对应代码如下:

其中,TaskScheduler是低层次的任务调度器,负责任务的调度。通过该接口提供可插拔的任务调度器。每个TaskScheduler负责调度一个SparkContext实例中的任务,负责调度上层DAG调度器中每个Stage提交的任务集(TaskSet),并将这些任务提交到集群中运行,在任务提交执行时,可以使用失败重试机制设置失败重试的次数。上述对应高层的DAG调度器的实例构建参见下一步。

(3)new DAGScheduler:创建高层Stage调度的DAG调度器实例。

对应代码如下:

1.  _dagScheduler = new DAGScheduler(this)

DAGScheduler是高层调度模块,负责作业(Job)的Stage拆分,以及最终将Stage对应的任务集提交到低层次的任务调度器上。

下面基于这些主要流程,针对SparkMasterRegex单例对象中给出的各种集群部署模式进行解析。对应不同集群模式,这些流程中构建了包括TaskScheduler与SchedulerBackend的不同的具体子类,所构建的相关实例具体见表6-6。

表6-6 各种情况下TaskScheduler与SchedulerBackend的不同的具体子类

与TaskScheduler和SchedulerBackend不同的是,在不同集群模式中,应用程序的高层调度器DAGScheduler的实例是相同的,即对应在Spark on YARN与Mesos等集群管理器中,应用程序内部的高层Stage调度是相同的。