SparkContext是Spark函数的主入口,一个SparkContext代表与Spark集群的连接,可以用来在集群创建RDD、累加器、广播变量等。每一个JVM只能有一个活跃的SparkContext实例,在创建新的SparkContext实例之前,必须stop正活跃的SparkContext实例。不过这个限制最终可能会移除掉。
1. SparkContext类初始化
纵观SpartContext类的初始化过程,主要创建了LiveListenerBus、SparkStatusTracker、HeartbeatReceiver、SchedulerBackend、TaskScheduler和DAGScheduler实例。这些实例,在后期会完成event监听、status追踪、heartbeat接受、后台调度、任务调度、DAG调度等工作,是Spark Application得以运行的基石。
1 | private var _conf: SparkConf = _ |
- 一些变量的初始化,包括新建
LiveListenerBus类实例listerBus。 - 复制SparkConf config到_conf,并对其中的设置进行验证,检查是否存在非法或弃用的参数。
- 若_conf中不包含
spark.master或spark.app.name则抛出异常。 - 在yarn cluster模式下,_conf必须包含
spark.yarn.app.id,否则抛出异常。 - 在_conf中设置
spark.driver.host和spark.executor.id=driver。 - 新建
JobProgressListener类实例_jobProgressListener,并执行listenerBus.addListener(jobProgressListener)。 - 调用
createSparkEnv(_conf,isLocal,listenerBus)方法,创建Driver的SparkEnv实例。SparkEnv类包含了运行Spark实例(master/worker)的运行时环境,包含serializer、block manager等。
1 | class SparkEnv ( |
- 创建
SparkStatusTracker实例。 - 初始化hadoop配置变量
_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)。 - 调用
jars.foreach(addJar),为所有的任务添加 jar dependency。 - 调用
files.foreach(addFIle),为每个node 加载文件。 - 设置
_executorMemory,为executor设置内存。 _heartbeatReceiver=env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)),创建HeartbeatReceiver类实例。- 创建
SchedulerBackend、TaskScheduler、DAGScheduler类实例。
1 | val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) |
- 启动
_taskScheduler.start()。 - 设置
spark.app.id。 - 启动
_env.metricsSystem.start(),并将driver metrics servlet handler 附加到 web ui。 - 创建
ContextCleaner类实例,启动cleaner。 - 调用
postEnvironmentUpdate()方法,通过listenerBus发送envirenmentUpdate event。 - 调用
postApplicationStart()方法,通过listenerBus发送application start event。该方法假设TaskScheduler已经初始化并已经让cluster manager获取到了application ID。
1 | /** Post the application start event */ |
- 调用
_taskScheduler.postStartHook()方法,当系统初始化成功后,会调用该方法,Yarn使用该方法引导基于机架感知、等待从机注册的资源分配。
2. createTaskScheduler(sc, master, deployMode)方法
那么,SparkContext是在哪一步分配Executor呢,在步骤14中,val (sched, ts)=SparkContext.createTaskScheduler(this, master, deployMode)命令,会根据不同的master, deployMode,启动不同的SchedulerBackend和TaskScheduler。
1 | /** |
从代码中我们可以发现,该方法会根据master和depolyMode的不同,来构造不同的SchedulerBackend和TaskScheduler。从而实现了,Spark可以适配local、standalone、yarn等多种不同模式。Yarn Cluster模式下会实例化CoarseGrainedSchedulerBackend类。该类会持有Executor资源。
3. CoarseGrainedSchedulerBackend类
CoarseGrainedSchedulerBackend类的start()方法会被TaskSchedulerImpl类调用,在start()方法中,会实例化DriverEndpoint。
在DriverEndpoint类的receiverAndReply方法中,driver会根据收到RegisterExecutor、StopDriver、StopExecutors、RemoveExecutor、RetrieveSparkAppConfig等不同的命令,进行相应的操作。如会根据接收到的RegisterExecutor RPC命令,完成Executor的注册。
1 | override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { |