第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請(qǐng)及時(shí)綁定郵箱和手機(jī)立即綁定

舉例說明Spark RDD的分區(qū)、依賴

標(biāo)簽:
Spark

例子如下:

scala> val textFileRDD = sc.textFile("/Users/zhuweibin/Downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt")15/08/03 07:00:08 INFO MemoryStore: ensureFreeSpace(57160) called with curMem=0, maxMem=27801944015/08/03 07:00:08 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 55.8 KB, free 265.1 MB)15/08/03 07:00:08 INFO MemoryStore: ensureFreeSpace(17237) called with curMem=57160, maxMem=27801944015/08/03 07:00:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.8 KB, free 265.1 MB)15/08/03 07:00:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:51675 (size: 16.8 KB, free: 265.1 MB)15/08/03 07:00:08 INFO SparkContext: Created broadcast 0 from textFile at <console>:21textFileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21scala>     println( textFileRDD.partitions.size )15/08/03 07:00:09 INFO FileInputFormat: Total input paths to process : 12scala>     textFileRDD.partitions.foreach { partition =>     |       println("index:" + partition.index + "  hasCode:" + partition.hashCode())
     |     }index:0  hasCode:1681index:1  hasCode:1682scala>     println("dependency size:" + textFileRDD.dependencies)
dependency size:List(org.apache.spark.OneToOneDependency@543669de)

scala>     println( textFileRDD )
MapPartitionsRDD[1] at textFile at <console>:21scala>     textFileRDD.dependencies.foreach { dep =>     |       println("dependency type:" + dep.getClass)
     |       println("dependency RDD:" + dep.rdd)     |       println("dependency partitions:" + dep.rdd.partitions)
     |       println("dependency partitions size:" + dep.rdd.partitions.length)     |     }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:/Users/zhuweibin/Downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt HadoopRDD[0] at textFile at <console>:21
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2

scala> 

scala>     val flatMapRDD = textFileRDD.flatMap(_.split(" "))
flatMapRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23

scala>     println( flatMapRDD )
MapPartitionsRDD[2] at flatMap at <console>:23

scala>     flatMapRDD.dependencies.foreach { dep =>
     |       println("dependency type:" + dep.getClass)     |       println("dependency RDD:" + dep.rdd)
     |       println("dependency partitions:" + dep.rdd.partitions)     |       println("dependency partitions size:" + dep.rdd.partitions.length)
     |     }
dependency type:class org.apache.spark.OneToOneDependencydependency RDD:MapPartitionsRDD[1] at textFile at <console>:21dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2scala> 

scala>     val mapRDD = flatMapRDD.map(word => (word, 1))mapRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25scala>     println( mapRDD )
MapPartitionsRDD[3] at map at <console>:25scala>     mapRDD.dependencies.foreach { dep =>     |       println("dependency type:" + dep.getClass)
     |       println("dependency RDD:" + dep.rdd)     |       println("dependency partitions:" + dep.rdd.partitions)
     |       println("dependency partitions size:" + dep.rdd.partitions.length)     |     }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:MapPartitionsRDD[2] at flatMap at <console>:23
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2

scala> 

scala> 

scala>     val counts = mapRDD.reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:27

scala>     println( counts )
ShuffledRDD[4] at reduceByKey at <console>:27

scala>     counts.dependencies.foreach { dep =>
     |       println("dependency type:" + dep.getClass)     |       println("dependency RDD:" + dep.rdd)
     |       println("dependency partitions:" + dep.rdd.partitions)     |       println("dependency partitions size:" + dep.rdd.partitions.length)
     |     }
dependency type:class org.apache.spark.ShuffleDependencydependency RDD:MapPartitionsRDD[3] at map at <console>:25dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2scala>

从输出我们可以看出,对于任意一个RDD x来说,其dependencies代表了其直接依赖的RDDs(一个或多个)。那dependencies又是怎么能够表明RDD之间的依赖关系呢?假设dependency为dependencies成员

  • dependency的类型(NarrowDependency或ShuffleDependency)说明了该依赖是窄依赖还是宽依赖

  • 通过dependency的def getParents(partitionId: Int): Seq[Int]方法,可以得到子RDD的每个分区依赖父RDD的哪些分区

  • dependency包含RDD成员,即子RDD依赖的父RDD,该RDD的compute函数说明了对该父RDD的分区进行怎么样的计算能得到子RDD的分区

  • 该父RDD中同样包含dependency成员,该dependency同样包含上述特点,同样可以通过该父RDD的dependency成员来确定该父RDD依赖的爷爷RDD。同样可以通过dependency.getParents方法和爷爷RDD.compute来得出如何从父RDD回朔到爷爷RDD,依次类推,可以回朔到第一个RDD

那么,如果某个RDD的partition计算失败,要回朔到哪个RDD为止呢?上例中打印出的dependency.RDD如下:

MapPartitionsRDD[1] at textFile at <console>:21MapPartitionsRDD[2] at flatMap at <console>:23MapPartitionsRDD[3] at map at <console>:25ShuffledRDD[4] at reduceByKey at <console>:27

可以看出每个RDD都有一个编号,在回朔的过程中,每向上回朔一次变回得到一个或多个相对父RDD,这时系统会判断该RDD是否存在(即被缓存),如果存在则停止回朔,如果不存在则一直向上回朔到某个RDD存在或到最初RDD的数据源为止。



作者:牛肉圆粉不加葱
链接:https://www.jianshu.com/p/6b9e4001723d


點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺得本文不錯(cuò),就分享一下吧!

評(píng)論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評(píng)論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評(píng)論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購(gòu)課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消