第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定

【Spark學(xué)習(xí)筆記】Scheduler模塊

標(biāo)簽:
Spark

webp

spark 调度模块详解.png

webp

调度流程

源码分析

webp

Spark作业调度源码跟踪.png

第一步:准备工作

  • SparkContext中创建DAGScheduler、TaskScheduler和SchedulerBackend对象

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor
    _taskScheduler.start()

    _applicationId = _taskScheduler.applicationId()
    _applicationAttemptId = taskScheduler.applicationAttemptId()

由些可看在创建SparkContext的时候会调用createTaskScheduler生成SchedulerBackend和TaskScheduler对象

DAGScheduler对象也是在这个时候直接new出来

第二步:提交作业

1.RDD中调用runJob执行作业

  • 步骤1:Rdd#runJob

  • 步骤2:SparkConext#runJob

  • 步骤3:DagScheduler#runJob

  • 步骤4:DagScheduler#submitJob

  • 步骤5:DAGSchedulerEventProcessLoop#post
    将Job提交到一个队列中,等待处理。这是一个典型的生产者消费者模式。这些消息都是通过handleJobSubmitted来处理。

  • 步骤6:DAGSchedulerEventProcessLoop#doOnReceive中接收任务(EventLoop的子类)

  • 步骤7:DAGSchedulerEventProcessLoop#handleJobSubmitted 将Job划分成不同的stage,创建一个activeJob,生成一个任务

  • 步骤8:DAGScheduler#handleJobSubmitted

  • 步骤9-1:DAGScheduler#createResultStage

  • 步骤9-2:DAGScheduler#submitStage

  • 步骤10:DAGScheduler#submitMissingTasks
    会完成DAGScheduler最后的工作:它判断出哪些Partition需要计算,为每个Partition生成Task,然后这些Task就会封闭到TaskSet,最后提交给TaskScheduler进行处理。

以count这个action为例子跟踪源码

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

步骤2源码:SparkConext#runJob

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   * partitions of the target RDD, e.g. for operations like `first()`
   * @param resultHandler callback to pass each result to
   */
  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {    if (stopped.get()) {      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

步骤3源码:DagScheduler#runJob

/**
   * Run an action job on the given RDD and pass all the results to the resultHandler function as
   * they arrive.
   *
   * @param rdd target RDD to run tasks on
   * @param func a function to run on each partition of the RDD
   * @param partitions set of partitions to run on; some jobs may not want to compute on all
   *   partitions of the target RDD, e.g. for operations like first()
   * @param callSite where in the user program this job was called
   * @param resultHandler callback to pass each result to
   * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
   *
   * @note Throws `Exception` when the job fails
   */
  def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)        throw exception
    }
  }

步骤4源码:DagScheduler#submitJob

  def submitJob[T, U](
      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      resultHandler: (Int, U) => Unit,      properties: Properties): JobWaiter[U] = {    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>      throw new IllegalArgumentException(        "Attempting to access a non-existent partition: " + p + ". " +          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()    if (partitions.size == 0) {      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }
  • 获取一个新的jobId

  • 生成一个JobWaiter,它会监听Job的执行状态,而Job是由多个Task组成的,因此只有当Job的所有Task均已完成,Job才会标记成功

  • 最后调用eventProcessLoop.post()将Job提交到一个队列中,等待处理。这是一个典型的生产者消费者模式。这些消息都是通过handleJobSubmitted来处理。

看看handleJobSubmitted是如何被调用

  /**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

DAGSchedulerEventProcessLoop是EventLoop的子类,它重写了EventLoop的onReceive方法。
doOnReceive会调用handleJobSubmitted。

stage的划分

handleJobSubmitted会从eventProcessLoop中取出Job来进行处理,处理的第一步就是将Job划分成不同的stage。handleJobSubmitted主要2个工作,
一是进行stage的划分;

DAGScheduler代码

  private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      callSite: CallSite,      listener: JobListener,      properties: Properties) {    var finalStage: ResultStage = null
    try {      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)        return
    }

    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)
  }  /**
   * Create a ResultStage associated with the provided jobId.
   */
  private def createResultStage(
      rdd: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      jobId: Int,      callSite: CallSite): ResultStage = {
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

newResultStage()经过多层调用后,最终会调用getParentStages()。
因为是从最终的stage往回推算的,这需要计算最终stage所依赖的各个stage。

二是创建一个activeJob,并生成一个任务。

submitStage(finalStage)

submitStage会提交finalStage,如果这个stage的某些parentStage未提交,则递归调用submitStage(),直至所有的stage均已计算完成。

第三步:执行作业

由上面DAGScheduler#submitMissingTasks执行了这个方法后,会执行以下代码

taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

从这里开始往下的执行逻辑:
(1)taskScheduler#submitTasks()
(2) schedulableBuilder#addTaskSetManager()
(3)CoarseGrainedSchedulerBackend#reviveOffers()
(4)CoarseGrainedSchedulerBackend#makeOffers()
(5)TaskSchedulerImpl#resourceOffers
(6)CoarseGrainedSchedulerBackend#launchTasks
(7)executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

步骤一、二中主要将这组任务的TaskSet加入到一个TaskSetManager中。TaskSetManager会根据数据就近原则为task分配计算资源,监控task的执行状态等,比如失败重试,推测执行等。
步骤三、四逻辑较为简单。
步骤五为每个task具体分配资源,它的输入是一个Executor的列表,输出是TaskDescription的二维数组。TaskDescription包含了TaskID, Executor ID和task执行的依赖信息等。
步骤六、七就是将任务真正的发送到executor中执行了,并等待executor的状态返回。



作者:代码足迹
链接:https://www.jianshu.com/p/69e616b9f7c5


點擊查看更多內(nèi)容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評論
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊有機(jī)會得

100積分直接送

付費專欄免費學(xué)

大額優(yōu)惠券免費領(lǐng)

立即參與 放棄機(jī)會
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學(xué)習(xí)伙伴

公眾號

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號

舉報

0/150
提交
取消