Spark运行时消息通信
这里主要说明一下,当你launch一个Application之后,启动Main方法时,初始化SparkContext到注册Application,注册Executor以及TaskSchedulerImpl分配完Task之后交由Executor来进行执行的这个过程。这里我都是以Standalone形式就行阅读代码的,其他的如Yarn, Mesos, Local需看对应的代码部分。其时序图如下:
消息通信.png
流程
(1)执行应用程序首先执行Main方法,启动SparkContext, 在SparkContext初始化中会先实例化StandaloneScheduleBackend对象
StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) with StandaloneAppClientListener,在该对象启动start过程中会继承DriverEndpoint和创建AppClient的ClientEndpoint(实际上是StandaloneAppClient)的两个终端点。SparkContext初始化会先new TaskSchedulerImpl,其会调用backend.start(),实际上 就是调用了CoarseGrainedSchedulerBackend的start方法,再次方法中:
override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { properties += ((key, value)) } } // TODO (prashant) send conf instead of properties driverEndpoint = createDriverEndpointRef(properties) } protected def createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef = { rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) } protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { new DriverEndpoint(rpcEnv, properties) }
可以知道创建了Driver的终端点并进行了注册。而StandaloneSchedulerBackend extends CoarseGrainedSchedulerBackend继承关系导致了StandaloneSchedulerBackend会继承driverEndpoint。
后面执行了
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start()
创建了StandaloneAppClient
(2)在StandaloneAppClient通过tryResisterAllMasters来实现Application向Master的注册
(3)当Master收到注册请求之后进行处理, 注册完毕之后会发送注册成功消息给StandaloneApplClient, 然后调用startExecutorsOnWorkers方法运行应用。
(4)Executor注册过程
a)调用startExecutorsOnWorkers会分配资源来运行应用程序, 调用allcateWorkerResourceToExecutors实现在Worker中启动Executor,allcateWorkerResourceToExecutors里面有个lanchExecutor方法,这里面会调用send(LaunchTask)给Worker, Worker收到后会实例化ExecutorRunner对象,在ExecutorRunner创建进程生成器ProcessBuilder,然后此生成器根据ApplicationInfo中的command创建CoarseGrainedExecutorBackend对象,也就是Executor运行的容器, 最后Worker向Master发送ExecutorStateChanged通知Executor容器创建完毕,
b)进程生成器创建CoarseGrainedExecutorBackend对象时,调用了start方法,其半生对象会注册Executor终端点,会触发onStart方法,会发送注册Executor消息RegisterExecutor到Driverpoint,如果注册成功Driverpoint会返回RegisteredExecutor消息给ExecutorEndppoint。当ExecutorEndppoint实际上是CoarseGrainedExecutorBackend收到注册成功, 则会创建Executor对象。
c)DriverEndpoint会创建一个守护线程,监听是否有taskSets过来
private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") override def onStart() { // Periodically revive offers to allow delay scheduling to work val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReviveOffers)) } }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) }
这时候会去调用makeOffers()
d)makeoffers会调用launchTasks
// Make fake resource offers on all executors private def makeOffers() { // Filter out executors under killing val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val workOffers = activeExecutors.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) }
进而转向
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
进而CoarseGrainedExecutorBackend终端点(ExecutorEndpoint)接收到LaunchTask
case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
launchTask会初始化TaskRunner(它实际上是个Runnable对象),然后通过threadPool将其加入线程池执行。
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer): Unit = { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
TaskRunner的run方法体内就会执行Task, 当执行完毕时会向Driver汇报此Task在Executor上执行完毕了。
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) ------------------------------------------------------------------------------------ override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }
作者:kason_zhang
链接:https://www.jianshu.com/p/d7dcbc6ff1d6
共同學(xué)習(xí),寫下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章