1. spark-submit脚本
- 判断${SPARK_HOME}环境变量是否存在,不存在的话调用当前目录下的find-spark-home脚本设置该环境变量。
- 执行
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
命令,调用spark-class脚本。
2. spark-class脚本
- 判断${SPARK_HOME}环境变量是否存在,不存在的话调用当前目录下的find-spark-home脚本设置该环境变量。
- 执行load-spark-env.sh脚本,设置一些环境变量。
- 检查java环境变量。
- 检查${SPARK_HOME}中的jars。
- 定义如下函数,解析spark-submit脚本传递的参数,并打印解析后的参数。函数中$RUNNER变量是java
1 | build_command() { |
- 执行如下命令,将build_command函数解析后返回的参数赋值给CMD。
1 | set +o posix |
- 执行CMD中的摸个参数类,若是提交应用程序,则参数类为:org.apache.spark.deploy.SparkSubmit。SparkSubmit参数类会完成启动Spark应用程序的工作。
1 | CMD=("${CMD[@]:0:$LAST}") |
3. org.apache.spark.launcher.Main类
该类为Spark启动器的命令行接口,被Spark脚本内部使用,主要实现对传入参数的解析工作。最终打印出解析后的参数格式,类似于java -cp 各种option 各种路径 org.apache.spark.deploy.SparkSubmit --master .. --deploy-mode .. --class 用户主类 其他参数
3.1 main方法
- main方法中将传入的第一个参数解析为className,若className为
org.apache.spark.deploy.SparkSubmit
类,则构建builder = new SparkSubmitCommandBuilder(args)
;否则构建builder = new SparkClassCommandBUilder(className, args)
。 - 构建builder后,执行
List<String> cmd = builder.buildCommand(env)
方法,获取解析后的命令行cmd。 - 将cmd根据不同的运行环境(Windows/Linux)打印出来。spark-class脚本会根据
$?
获取到main方法最后打印的数据。
1 | if (isWindows()) { |
3.2 org.apache.spark.launcher.SparkSubmitCommandBuilder类
- 该类的构造器中会调用
OptionParser
类的parse(submitArgs)
方法,将传入的参数通过正则表达式Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)")
匹配后,调用OptionParser的handle
方法,这个即可解析出master、deployMode、propertiesFile、driveMermory等参数。
1 | protected boolean handle(String opt, String value) { |
buildSparkSubmitCommand(Map<String, String> env)
方法。
1 | private List<String> buildSparkSubmitCommand(Map<String, String> env) |
- 该方法首先根据propertiesFile参数加载配置文件中的对应配置项,若propertiesFile为null,则从默认的配置文件
spark-defaults.conf
中加载。其中spark-defaults.conf
从SPARK_CONF_DIR
或SPARK_HOME/conf
中获取。 - 调用
buildJavaCommand(extraClassPath)
方法,该方法会构建一个运行java程序的参数列表。
1 | List<String> buildJavaCommand(String extraClassPath) throws IOException { |
List<String> cmd
构建好以后,cmd中已经有了java -cp等运行java的命令参数,后面添加-Xmx memory
、spark.driver.extraJavaOptions
、spark.driver.extraLibraryPath
等额外参数,最后添加org.apache.spark.deploy.SparkSubmit
和调用buildSparkSubmitArgs()方法,添加SparkSubmit相应的参数,包括–master、–deployMode、–conf、–class和用户程序对应的传入参数。
1 | List<String> buildSparkSubmitArgs() { |
其中sparkArgs
是SparkSubmitCommandBuilder
类未解析的spark相应配置,如–executor-memory等参数。appArgs
为用户主类需要用到的参数。
4. org.apache.spark.deploy.SparkSubmit类
spark-class脚本中最后会执行exec java -cp ... org.apache.spark.deploy.SparkSubmit ...
来运行SparkSubmit类。该类是启动Spark Application的主入口。
4.1 main方法
该方法首先会根据传入的args
创建SparkSubmitArguments对象appArgs
,然后根据appArgs.action
的不同值调用相应的submit(appArgs)
、kill(appArgs)
、requestStatus(appArgs)
方法,后两种方法只standalone
和Mesos cluster
模式下支持。
1 | def main(args: Array[String]): Unit = { |
4.2 SparkSubmitArguments类
该类包含了运行Spark Application所需要的参数。
1 | var master: String = null |
SparkSubmitArguments
类的构造器中,一次调用了下面五种方法,最终将所有需要使用到的参数都解析到了SparkSubmitArguments
类相应的字段中:
1 | // Set parameters from command line arguments |
4.2.1 parse(args.asJava)方法
SparkSubmitOptionParser类的parse(submitArgs)
方法,将传入的参数通过正则表达式Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)")
匹配后,调用SparkSubmitArguments的handle
方法,使用Scala的模式匹配从submitArgs
中解析出相应参数并赋值给SparkSubmitArguments
的相应字段,其中用户主类对应的参数存放在SparkSubmitArguments的childArgs
字段中。
1 | /** Fill in values by parsing user options. */ |
4.2.2 mergeDefaultSparkProperties()方法
sparkProperties
已经在前面的handle方法中将用户--conf
命令提交的相关配置加载过,mergeDefaultSparkProperties()
主要完成加载propertiesFile
配置文件中的相关配置,若propertiesFile
没有配置则使用默认的spark-default.conf
。需要注意的是sparkProperties
已经在—conf
配置过的数据不会被覆盖,即--conf
比配置文件的优先级高。
1 | /** |
4.2.3 ignoreNonSparkProperties()
该方法主要是去除掉sparkProperties
中不是spark.
开头的无效配置项。
1 | /** |
4.2.4 loadEnvironmentArguments()
该方法主要对SparkSubmitArguments
类中的一些必备的字段进行验证,若该字段未设置,则从sparkProperties
中提取,若sparkProperties
中不存在,则从环境变量中取,若仍然不存在,则取Null或相应的默认值。
1 | /** |
4.2.5 validateArguments()方法
该方法用来确保所有要求的字段存在。
1 | /** Ensure that required fields exists. Call this only once all defaults are loaded. */ |
1 | private def validateSubmitArguments(): Unit = { |
4.3 submit(args:SparkSubmitArguments)方法
在将传入的args字符串数组构造成SparkSubmitArguments对象appArgs
后调用submit(appArgs)方法。该方法根据提供的参数,提交Spark Application。主要分为两步:第一步是为main class准备启动环境,包含设置相应的classpath、sysytem properties、application arguments等,第二步是使用反射机制运行main class的main方法。
1 | /** |
4.3.1 prepareSubmitEnvironment(args)方法
该方法是为提交spark application准备环境,返回4种数据:1. childArgs——子进程参数;2.childClasspath——子进程类路径列表;3. sysProps——sysytem properties map;4. childMainClass——子进程主类。
该方法会根据args判断用户设定的master
、deployMode
等,根据用户要求的运行模式准备相应的运行环境。其中也会根据不同的master设置不同的childMainClass。比如Client模式下会直接将childMainClass=args.mainClass
;YarnCluster模式下,会将childMainClass = "org.apache.spark.deploy.yarn.Client"
,将用户的主类通过childArgs += ("--class", args.mainClass)
设置。同时也会把用户传入的jar包,添加到chlidClasspath
中。
1 | ...... |
4.3.2 runMain(childArgs, childClasspath, sysProps, childMainClass, verbose)方法
该方法使用反射机制启动childMainClass
中的main方法。需要特别注意的是,从上一步也可以了解到,当运行在cluster deploy mode或python application时,childMainClass
并不是用户提供的主类。如YarnCluster
模式下childMainClass = "org.apache.spark.deploy.yarn.Client"
。
1 | // 设置类加载器 |