1. org.apache.spark.deploy.yarn.Client类
在《Spark源码阅读1——spark-submit应用提交整体流程》文章中,我们已经了解到YarnCluster模式下会将用户的jar包加入到类加载路径中,并通过 childArgs += ("--class", args.mainClass)
将用户的主类作为参数传入,最后利用反射机制调用org.apache.spark.deploy.yarn.Client
类的main方法。那么下面我们就来看一下YarnCluster模式下,在调用yarn/Client
类的main方法后,是如何在Yarn集群中启动我们的Spark Application的。
1.1 main方法
该方法的内容比较精简:
- 判断系统参数中是否包含
SPARK_SUBMIT
,若不包含则打印告警信息; - 设置系统参数
SPARK_YARN_MODE
为true; - 构建SparkConf对象;
- 由于Yarn模式下,SparkSubmit将会使用Yarn来分布式缓存file和jar包,故从sparkConf中删除
saprk.jars
和spark.files
; - 构建
ClientArguments
对象; - 构建Client对象,并调用run()方法。
1 | def main(argStrings: Array[String]) { |
1.2 org.apache.spark.SparkConf类
该类是Spark Application的配置类。通过new SparkConf()
构建该类对象时,会从系统properties中提取出spark.
开头的配置项,并赋值给SparkConf的setting字段,setting
字段为ConcurrentHashMap[String,String]
类型。用户在应用程序通过构建SparkConf对象,并配置的相关参数,会覆盖掉系统参数的相同配置项。
1 | class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable { |
1.3 org.apache.spark.deploy.ClientArguments类
该类用于driver端的命令解析。参数的解析工作主要由该类的parse(args:List[String])
方法完成。该方法通过模式匹配,完成cores、memory、jarUrl、master、mainClass、_driverOptions
相关字段的解析。
1 |
|
1.4 new Client(args, sparkConf)方法
该方法会从SparkConf中提取出spark.hadoop.
开头的相关参数,构建成Configuration对象,然后调用
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
方法。
1 | def this(clientArgs: ClientArguments, spConf: SparkConf) = |
1.5 new Client(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)方法
- 构建
YarnClient
对象,YarnConfiguration
对象; - 根据
deployMode
的值设置Yarn ApplicationMaster的相关配置; - 设置Executor的相关配置;
- 构建
ClientDistributedCacheManager
对象。
1 | private val yarnClient = YarnClient.createYarnClient |
1.6 run()方法
该方法主要是向Yarn ResourceManager提交应用程序。如果spark.yarn.submit.waitAppCompletion=true
(默认值为true),该方法将会一直存活,报告application的状态,直到application因为某些原因退出;否则提交application后将会退出。如果application最后以失败、被杀掉或其他未知状态退出,将会抛出适当的SparkException。
1 | /** |
1.7 submitApplication()方法
该方法向Yarn ResourceManager提交Application,运行我们的ApplicationMaster。
- 首先启动一个后台服务线程,通过
LunncherProtocol
协议,使用Socket通信,通过_SPARK_LAUNCHER_PORT
指定的端口与要启动的application保持通信。 - 初始化yarnClient,并启动。
- yarnClient.createApplication()方法从ResourceManager创建新的application。
- 获取新application的appId,并报告状态。
- 验证Yarn集群是否有足够的资源启动ApplicationMaster。
- 调用
createContainerLaunchContext(newAppResponse)和createApplicationSubmissionContext(newApp,containerContext)
,为启动ApplicationMaster启动合适的context,包括containerContext和appContext。 - 调用
yarnClient.submitApplication(appContext)
启动并监控application。
1 | /** |
1.7.1 createContainerLaunchContext方法
该方法用来为启动我们的ApplicationMaster container准备ContainerLaunchContext。主要包括设置启动环境、java options、启动命令等,也包括将用户的app.jar分发到yarn集群,也包含设置用户的主类名等。
1.7.2 createApplicationSubmissionContext方法
该方法主要为提交Application准备ApplicationSubmissionContext。
1.7.3 YarnClient的submitApplication方法
该方法由YarnClientImpl
类实现,主要是调用ApplicationClientProtocol rmClient
的submitApplication
方法实现,完成最终的application提交工作。
1 | public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { |