AlbertLee

前行的路上,点点滴滴……

  • 主页
  • 随笔
所有文章 友链 关于我

AlbertLee

前行的路上,点点滴滴……

  • 主页
  • 随笔

Spark源码阅读2——YarnCluster模式下应用提交流程

2018-05-24

1. org.apache.spark.deploy.yarn.Client类

在《Spark源码阅读1——spark-submit应用提交整体流程》文章中,我们已经了解到YarnCluster模式下会将用户的jar包加入到类加载路径中,并通过 childArgs += ("--class", args.mainClass)将用户的主类作为参数传入,最后利用反射机制调用org.apache.spark.deploy.yarn.Client类的main方法。那么下面我们就来看一下YarnCluster模式下,在调用yarn/Client类的main方法后,是如何在Yarn集群中启动我们的Spark Application的。

1.1 main方法

该方法的内容比较精简:

  1. 判断系统参数中是否包含SPARK_SUBMIT,若不包含则打印告警信息;
  2. 设置系统参数SPARK_YARN_MODE为true;
  3. 构建SparkConf对象;
  4. 由于Yarn模式下,SparkSubmit将会使用Yarn来分布式缓存file和jar包,故从sparkConf中删除saprk.jars和spark.files;
  5. 构建ClientArguments对象;
  6. 构建Client对象,并调用run()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
"future version of Spark. Use ./bin/spark-submit with \"--master yarn\"")
}

// Set an env variable indicating we are running in YARN mode.
// Note that any env variable with the SPARK_ prefix gets propagated to all (remote) processes
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf
// SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkConf here for yarn mode.
sparkConf.remove("spark.jars")
sparkConf.remove("spark.files")
val args = new ClientArguments(argStrings)
new Client(args, sparkConf).run()
}

1.2 org.apache.spark.SparkConf类

该类是Spark Application的配置类。通过new SparkConf()构建该类对象时,会从系统properties中提取出spark.开头的配置项,并赋值给SparkConf的setting字段,setting字段为ConcurrentHashMap[String,String]类型。用户在应用程序通过构建SparkConf对象,并配置的相关参数,会覆盖掉系统参数的相同配置项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {

import SparkConf._

/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

private val settings = new ConcurrentHashMap[String, String]()

@transient private lazy val reader: ConfigReader = {
val _reader = new ConfigReader(new SparkConfigProvider(settings))
_reader.bindEnv(new ConfigProvider {
override def get(key: String): Option[String] = Option(getenv(key))
})
_reader
}

if (loadDefaults) {
loadFromSystemProperties(false)
}

private[spark] def loadFromSystemProperties(silent: Boolean): SparkConf = {
// Load any spark.* system properties
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value, silent)
}
this
}
......
......
}

1.3 org.apache.spark.deploy.ClientArguments类

该类用于driver端的命令解析。参数的解析工作主要由该类的parse(args:List[String])方法完成。该方法通过模式匹配,完成cores、memory、jarUrl、master、mainClass、_driverOptions相关字段的解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
 @tailrec
private def parse(args: List[String]): Unit = args match {
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
parse(tail)

case ("--memory" | "-m") :: MemoryParam(value) :: tail =>
memory = value
parse(tail)

case ("--supervise" | "-s") :: tail =>
supervise = true
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--verbose" | "-v") :: tail =>
logLevel = Level.INFO
parse(tail)

case "launch" :: _master :: _jarUrl :: _mainClass :: tail =>
cmd = "launch"

if (!ClientArguments.isValidJarUrl(_jarUrl)) {
// scalastyle:off println
println(s"Jar url '${_jarUrl}' is not in valid format.")
println(s"Must be a jar file path in URL format " +
"(e.g. hdfs://host:port/XX.jar, file:///XX.jar)")
// scalastyle:on println
printUsageAndExit(-1)
}
// stanlone模式下会使用到下面字段
jarUrl = _jarUrl
masters = Utils.parseStandaloneMasterUrls(_master)
mainClass = _mainClass
_driverOptions ++= tail

case "kill" :: _master :: _driverId :: tail =>
cmd = "kill"
masters = Utils.parseStandaloneMasterUrls(_master)
driverId = _driverId

case _ =>
printUsageAndExit(1)
}

1.4 new Client(args, sparkConf)方法

该方法会从SparkConf中提取出spark.hadoop.开头的相关参数,构建成Configuration对象,然后调用

this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)方法。

1
2
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)

1.5 new Client(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)方法

  1. 构建YarnClient对象,YarnConfiguration对象;
  2. 根据deployMode的值设置Yarn ApplicationMaster的相关配置;
  3. 设置Executor的相关配置;
  4. 构建ClientDistributedCacheManager对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)

private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"

// AM related configurations
private val amMemory = if (isClusterMode) {
sparkConf.get(DRIVER_MEMORY).toInt
} else {
sparkConf.get(AM_MEMORY).toInt
}
private val amMemoryOverhead = {
val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
sparkConf.get(amMemoryOverheadEntry).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
}
private val amCores = if (isClusterMode) {
sparkConf.get(DRIVER_CORES)
} else {
sparkConf.get(AM_CORES)
}

// Executor related configurations
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt

private val distCacheMgr = new ClientDistributedCacheManager()

private var loginFromKeytab = false
private var principal: String = null
private var keytab: String = null
private var credentials: Credentials = null
private var amKeytabFileName: String = null

private val launcherBackend = new LauncherBackend() {
override def onStopRequest(): Unit = {
if (isClusterMode && appId != null) {
yarnClient.killApplication(appId)
} else {
setState(SparkAppHandle.State.KILLED)
stop()
}
}
}

1.6 run()方法

该方法主要是向Yarn ResourceManager提交应用程序。如果spark.yarn.submit.waitAppCompletion=true(默认值为true),该方法将会一直存活,报告application的状态,直到application因为某些原因退出;否则提交application后将会退出。如果application最后以失败、被杀掉或其他未知状态退出,将会抛出适当的SparkException。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Submit an application to the ResourceManager.
* If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
* reporting the application's status until the application has exited for any reason.
* Otherwise, the client process will exit after submission.
* If the application finishes with a failed, killed, or undefined status,
* throw an appropriate SparkException.
*/
def run(): Unit = {
this.appId = submitApplication()
if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
logInfo(formatReportDetails(report))
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
throw new SparkException(s"Application $appId finished with status: $state")
}
} else {
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
if (yarnApplicationState == YarnApplicationState.FAILED ||
finalApplicationStatus == FinalApplicationStatus.FAILED) {
throw new SparkException(s"Application $appId finished with failed status")
}
if (yarnApplicationState == YarnApplicationState.KILLED ||
finalApplicationStatus == FinalApplicationStatus.KILLED) {
throw new SparkException(s"Application $appId is killed")
}
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
throw new SparkException(s"The final status of application $appId is undefined")
}
}
}

1.7 submitApplication()方法

该方法向Yarn ResourceManager提交Application,运行我们的ApplicationMaster。

  1. 首先启动一个后台服务线程,通过LunncherProtocol协议,使用Socket通信,通过_SPARK_LAUNCHER_PORT指定的端口与要启动的application保持通信。
  2. 初始化yarnClient,并启动。
  3. yarnClient.createApplication()方法从ResourceManager创建新的application。
  4. 获取新application的appId,并报告状态。
  5. 验证Yarn集群是否有足够的资源启动ApplicationMaster。
  6. 调用createContainerLaunchContext(newAppResponse)和createApplicationSubmissionContext(newApp,containerContext),为启动ApplicationMaster启动合适的context,包括containerContext和appContext。
  7. 调用yarnClient.submitApplication(appContext)启动并监控application。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Submit an application running our ApplicationMaster to the ResourceManager.
*
* The stable Yarn API provides a convenience method (YarnClient#createApplication) for
* creating applications and setting up the application submission context. This was not
* available in the alpha API.
*/
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
yarnClient.init(yarnConf)
yarnClient.start()

logInfo("Requesting a new application from cluster with %d NodeManagers"
.format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

// Get a new application from our RM
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)

new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()

// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)

// Set up the appropriate contexts to launch our AM
val containerContext = createContainerLaunchContext(newAppResponse)
val appContext = createApplicationSubmissionContext(newApp, containerContext)

// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext)
appId
} catch {
case e: Throwable =>
if (appId != null) {
cleanupStagingDir(appId)
}
throw e
}
}

1.7.1 createContainerLaunchContext方法

该方法用来为启动我们的ApplicationMaster container准备ContainerLaunchContext。主要包括设置启动环境、java options、启动命令等,也包括将用户的app.jar分发到yarn集群,也包含设置用户的主类名等。

1.7.2 createApplicationSubmissionContext方法

该方法主要为提交Application准备ApplicationSubmissionContext。

1.7.3 YarnClient的submitApplication方法

该方法由YarnClientImpl类实现,主要是调用ApplicationClientProtocol rmClient的submitApplication方法实现,完成最终的application提交工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
appContext.setApplicationId(applicationId);
SubmitApplicationRequest request = (SubmitApplicationRequest)Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
this.rmClient.submitApplication(request);
int pollCount = 0;

while(true) {
YarnApplicationState state = this.getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) && !state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId + " to ResourceManager" + " at " + this.rmAddress);
return applicationId;
}

++pollCount;
if (pollCount % 10 == 0) {
LOG.info("Application submission is not finished, submitted application " + applicationId + " is still in " + state);
}

try {
Thread.sleep(this.statePollIntervalMillis);
} catch (InterruptedException var7) {
;
}
}
}
  • Spark

扫一扫,分享到微信

微信分享二维码
Spark读写HBase性能问题及解决方案
2017年度工作总结
© 2019 AlbertLee
Hexo Theme Yilia by Litten
  • 所有文章
  • 友链
  • 关于我

tag:

  • 随笔
  • Spark
  • ElasticSearch
  • HBase
  • JStorm
  • Java
  • SQL
  • ML

    缺失模块。
    1、在博客根目录(注意不是yilia根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    2、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: true
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 友情链接1
  • 友情链接2
  • 友情链接3
  • 友情链接4
  • 友情链接5
  • 友情链接6
前行的路上

点点滴滴……