本文通过一个demon向读者展示,如何用spark 实现word count 功能。
创建项目
创建maven项目,添加spark核心依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>2.4.0</version> </dependency>
如果使用java8版本还需要加入
<dependency> <groupId>com.thoughtworks.paranamer</groupId> <artifactId>paranamer</artifactId> <version>2.8</version> </dependency>
否则读取文件时候出现异常
JavaRDD rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt");
java.lang.ArrayIndexOutOfBoundsException: 10582
word count 代码实现
main 方法
public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("WordCountDemon"); //设置master属性 conf.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); wordCount1(sc); }
实现计数方法
public static void wordCount1(JavaSparkContext sc) { JavaRDD<String> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt"); //压扁 JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception { List<String> list = new ArrayList<String>(); String[] arr = s.split(" "); for(String ss : arr){ list.add(ss) ; } return list.iterator() ; } }); //映射 JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String, Integer>(s,1); } }); //聚合 JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //收集,打印输出 for(Object o : rdd4.collect()){ System.out.println(o); } }
也可以采用lambda 表达式更优雅的实现
public void wordCount2(JavaSparkContext sc){ JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt") .flatMap( s -> Arrays.asList(s.split(" ")).iterator()) .mapToPair(s -> new Tuple2<>(s, 1)) .reduceByKey((v1,v2)-> (v1+v2)); System.out.println(); rdd1.collect().forEach(t-> System.out.println(t)); }
结果如下
(are,1)
(you,1)
(how,1)
(,1)
(river,3)
(hello,3)
(boy,1)
(good,1)
发现将key 值为 blank 的也统计了,我们可以用filter去掉不想要的结果
public static void wordCount2(JavaSparkContext sc){ JavaPairRDD<String,Integer> rdd1 = sc.textFile("/Users/riverfan/mytest/spark/hello.txt") .flatMap( s -> Arrays.asList(s.split(" ")).iterator()) .mapToPair(s -> new Tuple2<>(s, 1)) .filter(t-> StringUtils.isNoneBlank(t._1)) .reduceByKey((v1,v2)-> (v1+v2)); System.out.println(); rdd1.collect().forEach(t-> System.out.println(t)); }
看到结果已经ok啦
(are,1)
(you,1)
(how,1)
(river,3)
(hello,3)
(boy,1)
(good,1)
谢谢你的阅读。
作者:良人与我
链接:https://www.jianshu.com/p/2d04e52e3c16
。
點(diǎn)擊查看更多內(nèi)容
為 TA 點(diǎn)贊
評(píng)論
評(píng)論
共同學(xué)習(xí),寫(xiě)下你的評(píng)論
評(píng)論加載中...
作者其他優(yōu)質(zhì)文章
正在加載中
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說(shuō)多少就多少
贊賞金額會(huì)直接到老師賬戶(hù)
支付方式
打開(kāi)微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊(cè)有機(jī)會(huì)得
100積分直接送
付費(fèi)專(zhuān)欄免費(fèi)學(xué)
大額優(yōu)惠券免費(fèi)領(lǐng)