简介
spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。
集群中运行的每个任务都会连接驱动器获取变量。如果获取的变量比较大,执行效率会非常低下。
每个任务都会得到这些变量的一份新的副本,更新这些副本的值不会影响驱动器中的对应变量。如果驱动器需要获取变量的结果值,这种方式是不可行的。
spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。
广播变量用于高效分发较大的对象。会在每个执行器本地缓存一份大对象,而避免每次都连接驱动器获取。
累加器用于在驱动器中对数据结果进行聚合。
广播变量
原理
广播变量.png
广播变量只能在Driver端定义,不能在Executor端定义。
在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本;如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
用法
通过对一个类型T的对象调用SparkContext.broadcast创建出一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
通过value属性访问该对象的值
变量只会被发到各个节点一次,应作为只读值处理。(修改这个值不会影响到别的节点)
实例
查询每个国家的呼号个数
python
# 将呼号前缀(国家代码)作为广播变量signPrefixes = sc.broadcast(loadCallSignTable())def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count) countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y))) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
scala
// 将呼号前缀(国家代码)作为广播变量val signPrefixes = sc.broadcast(loadCallSignTable()) def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes.value) count = sign_count[1] return (country, count) val countryContactCounts = contactCounts.map{case (sign, count) => { val country = lookupInArray(sign, signPrefixes.value) (country, count) }}.reduceByKey((x, y) => x+y) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
java
// 将呼号前缀(国家代码)作为广播变量final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable()); JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) { String sign = callSignCount._1(); String country = lookupCountry(sign, signPrefixes.value()); return new Tuple2(country, callSignCount._2()); } }).reduceByKey(new SumInts()); countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
累加器
原理
累加器.png
累加器在Driver端定义赋初始值。
累加器只能在Driver端读取最后的值,在Excutor端更新。
用法
通过调用sc.accumulator(initivalValue)方法,创建出存有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值
驱动器程序可以调用累加器的value属性来访问累加器的值
实例
累加空行
python
file = sc.textFile(inputFile)# 创建Accumulator[Int]并初始化为0blankLines = sc.accumulator(0)def extractCallSigns(line): global blankLines # 访问全局变量 if (line == ""): blankLines += 1 return line.split(" ") callSigns = file.flatMap(extractCallSigns) callSigns.saveAsTextFile(outputDir + "/callsigns")print "Blank lines: %d" % blankLines.value
scala
val file = sc.textFile("file.txt") val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0val callSigns = file.flatMap(line => { if (line == "") { blankLines += 1 //累加器加1 } line.split(" ") }) callSigns.saveAsTextFile("output.txt") println("Blank lines:" + blankLines.value)
java
JavaRDD<String> rdd = sc.textFile(args[1]); final Accumulator<Integer> blankLines = sc.accumulator(0); JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String line) { if ("".equals(line)) { blankLines.add(1); } return Arrays.asList(line.split(" ")); } }); callSigns.saveAsTextFile("output.text"); System.out.println("Blank lines:" + blankLines.value());
作者:java大数据编程
链接:https://www.jianshu.com/p/e5fb8c221635
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章