3 回答

TA貢獻1772條經(jīng)驗 獲得超5個贊
foldApache Spark中的內(nèi)容與fold未分發(fā)的集合中的內(nèi)容不同。實際上,它需要交換函數(shù)才能產(chǎn)生確定性的結(jié)果:
這與以Scala之類的功能語言為非分布式集合實現(xiàn)的折疊操作有些不同。該折疊操作可以單獨應(yīng)用于分區(qū),然后將那些結(jié)果折疊為最終結(jié)果,而不是以某些定義的順序?qū)⒄郫B應(yīng)用于每個元素。對于非交換函數(shù),結(jié)果可能與應(yīng)用于非分布式集合的折疊結(jié)果不同。
Mishael Rosenthal 已證明了這一點,Make42在其評論中建議了這一點。
有人建議觀察到的行為與HashPartitioner何時parallelize不洗牌和不使用有關(guān)HashPartitioner。
import org.apache.spark.sql.SparkSession
/* Note: standalone (non-local) mode */
val master = "spark://...:7077"
val spark = SparkSession.builder.master(master).getOrCreate()
/* Note: deterministic order */
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 4).sortBy(identity[String])
require(rdd.collect.sliding(2).forall { case Array(x, y) => x < y })
/* Note: all posible permutations */
require(Seq.fill(1000)(rdd.fold("")(_ + _)).toSet.size == 24)
解釋:
foldRDD的結(jié)構(gòu)
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
var jobResult: T
val cleanOp: (T, T) => T
val foldPartition = Iterator[T] => T
val mergeResult: (Int, T) => Unit
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
與RDD的結(jié)構(gòu)reduce相同:
def reduce(f: (T, T) => T): T = withScope {
val cleanF: (T, T) => T
val reducePartition: Iterator[T] => Option[T]
var jobResult: Option[T]
val mergeResult = (Int, Option[T]) => Unit
sc.runJob(this, reducePartition, mergeResult)
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}
在runJob不考慮分區(qū)順序的情況下執(zhí)行,導致需要交換功能。
foldPartition并且reducePartition在處理順序上有效,reduceLeft并且foldLeft在上有效執(zhí)行(通過繼承和委派)TraversableOnce。
結(jié)論:foldRDD不能依賴于塊的順序,而是需要可交換性和關(guān)聯(lián)性。
- 3 回答
- 0 關(guān)注
- 1088 瀏覽
添加回答
舉報