第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號安全,請及時綁定郵箱和手機立即綁定

DAGScheduler之Job的提交劃分Stage

標簽:
Spark

整体流程图

webp

流程图

源码分析 spark 2.3

getOrCreateParentStages 创建所有祖先Stage
/**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {    // getShuffleDependencies 获取RDD的第一层直接宽依赖
    getShuffleDependencies(rdd).map { shuffleDep =>      //getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }
getShuffleDependencies 获取RDD的第一层直接宽依赖
  /**
   * Returns shuffle dependencies that are immediate parents of the given RDD.
   *
   * This function will not return more distant ancestors.  For example, if C has a shuffle
   * dependency on B which has a shuffle dependency on A:
   *
   * A <-- B <-- C
   *
   * calling this function with rdd C will only return the B <-- C dependency.
   *
   * This function is scheduler-visible for the purpose of unit testing.
   */
  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {    val parents = new HashSet[ShuffleDependency[_, _, _]]    val visited = new HashSet[RDD[_]]    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)    while (waitingForVisit.nonEmpty) {      val toVisit = waitingForVisit.pop()      if (!visited(toVisit)) {
        visited += toVisit
        toVisit.dependencies.foreach {          // 返回 所有的第一层宽依赖
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep          case dependency =>
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }
getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
/**
   * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
   * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
   * addition to any missing ancestor shuffle map stages.
   */
  private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {      case Some(stage) =>
        stage      case None =>        // Create stages for all missing ancestor shuffle dependencies.
        // 深度遍历获取所有祖先宽依赖,按照祖先->子辈的顺序 处理宽依赖
        getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {            // 创建宽依赖
            createShuffleMapStage(dep, firstJobId)
          }
        }        // Finally, create a stage for the given shuffle dependency.
        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }
getMissingAncestorShuffleDependencies  深度遍历获取所有祖先宽依赖
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
  private def getMissingAncestorShuffleDependencies(
      rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {    val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]    val visited = new HashSet[RDD[_]]    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new ArrayStack[RDD[_]]
    waitingForVisit.push(rdd)    while (waitingForVisit.nonEmpty) {      val toVisit = waitingForVisit.pop()      if (!visited(toVisit)) {
        visited += toVisit
        getShuffleDependencies(toVisit).foreach { shuffleDep =>          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {           // 子辈宽依赖先压栈
            ancestors.push(shuffleDep)
            waitingForVisit.push(shuffleDep.rdd)
          } // Otherwise, the dependency and its ancestors have already been registered.
        }
      }
    }    // 返回宽依赖 堆栈
    ancestors
  }

例子

RDDs原始依赖图

webp

RDDs原始依赖图

getShuffleDependencies

webp

获取RDD的第一层直接宽依赖

getMissingAncestorShuffleDependencies

webp

深度遍历顺序获取所有祖先的宽依赖

最后划分结果

webp

最后划分结果



作者:阿武z
链接:https://www.jianshu.com/p/14355e250e2f


點擊查看更多內(nèi)容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優(yōu)惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網(wǎng)微信公眾號

舉報

0/150
提交
取消