SparkContext是Spark函数的主入口,一个SparkContext代表与Spark集群的连接,可以用来在集群创建RDD、累加器、广播变量等。每一个JVM只能有一个活跃的SparkContext实例,在创建新的SparkContext实例之前,必须stop正活跃的SparkContext实例。不过这个限制最终可能会移除掉。
1. SparkContext类初始化
纵观SpartContext类的初始化过程,主要创建了LiveListenerBus
、SparkStatusTracker
、HeartbeatReceiver
、SchedulerBackend
、TaskScheduler
和DAGScheduler
实例。这些实例,在后期会完成event监听、status追踪、heartbeat接受、后台调度、任务调度、DAG调度等工作,是Spark Application得以运行的基石。
1 | private var _conf: SparkConf = _ |
- 一些变量的初始化,包括新建
LiveListenerBus
类实例listerBus
。 - 复制SparkConf config到_conf,并对其中的设置进行验证,检查是否存在非法或弃用的参数。
- 若_conf中不包含
spark.master
或spark.app.name
则抛出异常。 - 在yarn cluster模式下,_conf必须包含
spark.yarn.app.id
,否则抛出异常。 - 在_conf中设置
spark.driver.host
和spark.executor.id=driver
。 - 新建
JobProgressListener
类实例_jobProgressListener
,并执行listenerBus.addListener(jobProgressListener)
。 - 调用
createSparkEnv(_conf,isLocal,listenerBus)
方法,创建Driver的SparkEnv
实例。SparkEnv
类包含了运行Spark实例(master/worker)的运行时环境,包含serializer
、block manager
等。
1 | class SparkEnv ( |
- 创建
SparkStatusTracker
实例。 - 初始化hadoop配置变量
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
。 - 调用
jars.foreach(addJar)
,为所有的任务添加 jar dependency。 - 调用
files.foreach(addFIle)
,为每个node 加载文件。 - 设置
_executorMemory
,为executor设置内存。 _heartbeatReceiver=env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
,创建HeartbeatReceiver
类实例。- 创建
SchedulerBackend
、TaskScheduler
、DAGScheduler
类实例。
1 | val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) |
- 启动
_taskScheduler.start()
。 - 设置
spark.app.id
。 - 启动
_env.metricsSystem.start()
,并将driver metrics servlet handler 附加到 web ui。 - 创建
ContextCleaner
类实例,启动cleaner。 - 调用
postEnvironmentUpdate()
方法,通过listenerBus
发送envirenmentUpdate event。 - 调用
postApplicationStart()
方法,通过listenerBus
发送application start event。该方法假设TaskScheduler已经初始化并已经让cluster manager获取到了application ID。
1 | /** Post the application start event */ |
- 调用
_taskScheduler.postStartHook()
方法,当系统初始化成功后,会调用该方法,Yarn使用该方法引导基于机架感知、等待从机注册的资源分配。
2. createTaskScheduler(sc, master, deployMode)方法
那么,SparkContext是在哪一步分配Executor呢,在步骤14中,val (sched, ts)=SparkContext.createTaskScheduler(this, master, deployMode)
命令,会根据不同的master, deployMode,启动不同的SchedulerBackend
和TaskScheduler
。
1 | /** |
从代码中我们可以发现,该方法会根据master
和depolyMode
的不同,来构造不同的SchedulerBackend
和TaskScheduler
。从而实现了,Spark可以适配local、standalone、yarn等多种不同模式。Yarn Cluster模式下会实例化CoarseGrainedSchedulerBackend
类。该类会持有Executor资源。
3. CoarseGrainedSchedulerBackend类
CoarseGrainedSchedulerBackend
类的start()方法会被TaskSchedulerImpl
类调用,在start()方法中,会实例化DriverEndpoint
。
在DriverEndpoint
类的receiverAndReply
方法中,driver会根据收到RegisterExecutor
、StopDriver
、StopExecutors
、RemoveExecutor
、RetrieveSparkAppConfig
等不同的命令,进行相应的操作。如会根据接收到的RegisterExecutor
RPC命令,完成Executor的注册。
1 | override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { |