第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

為了賬號(hào)安全,請及時(shí)綁定郵箱和手機(jī)立即綁定

當(dāng)MongoDB遇見Spark

標(biāo)簽:
Spark

适宜读者人群

  • 正在使用Mongodb的开发者

传统Spark生态系统 和 MongoDB在Spark生态的角色

传统Spark生态系统

Spark生态系统

Spark生态系统

那么Mongodb作为一个database, 可以担任什么样的角色呢? 就是数据存储这部分, 也就是图中的黑色圈圈HDFS的部分, 如下图

用MongoDB替换HDFS后的Spark生态系统

Spark+Mongodb生态系统

Spark+Mongodb生态系统

为什么要用MongoDB替换HDFS

  1. 存储方式上, HDFS以文件为单位,每个文件64MB~128MB不等, 而MongoDB作为文档数据库则表现得更加细颗粒化

  2. MongoDB支持HDFS所没有的索引的概念, 所以在读取上更加快

  3. MongoDB支持的增删改功能比HDFS更加易于修改写入后的数据

  4. HDFS的响应级别为分钟, 而MongoDB通常是毫秒级别

  5. 如果现有数据库已经是MongoDB的话, 那就不用再转存一份到HDFS上了

  6. 可以利用MongoDB强大的Aggregate做数据的筛选或预处理

MongoDB Spark Connector介绍

  1. 支持读取和写入,即可以将计算后的结果写入MongoDB

  2. 将查询拆分为n个子任务, 如Connector会将一次match,拆分为多个子任务交给spark来处理, 减少数据的全量读取

MongoDB Spark 示例代码

计算用类型Type=1的message字符数并按userid进行分组
开发Maven dependency配置

这里用的是mongo-spark-connector_2.11 的2.0.0版本和spark的spark-core_2.11的2.0.2版本

    <dependency>
        <groupId>org.mongodb.spark</groupId>
        <artifactId>mongo-spark-connector_2.11</artifactId>
        <version>2.0.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.0.2</version>
    </dependency>
示例代码
    import com.mongodb.spark._    import org.apache.spark.{SparkConf, SparkContext}    import org.bson._    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Mingdao-Score")      //同时还支持mongo驱动的readPreference配置, 可以只从secondary读取数据
      .set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")
      .set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")    val sc = new SparkContext(conf)    // 创建rdd
    val originRDD = MongoSpark.load(sc)    // 构造查询
    val dateQuery = new BsonDocument()
      .append("$gte", new BsonDateTime(start.getTime))
      .append("$lt", new BsonDateTime(end.getTime))    val matchQuery = new Document("$match", BsonDocument.parse("{\"type\":\"1\"}"))    // 构造Projection
    val projection1 = new BsonDocument("$project", BsonDocument.parse("{\"userid\":\"$userid\",\"message\":\"$message\"}")    val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))    //比如计算用户的消息字符数
    val rdd1 = aggregatedRDD.keyBy(x=>{      Map(        "userid" -> x.get("userid")
      )
    })    val rdd2 = rdd1.groupByKey.map(t=>{
      (t._1, t._2.map(x => {
        x.getString("message").length
      }).sum)
    })
    
    rdd2.collect().foreach(x=>{
        println(x)
    })    //保持统计结果至MongoDB outputurl 所指定的数据库
    MongoSpark.save(rdd2)

总结

MongoDB Connector 的文档只有基础的示例代码, 具体详情需要看GitHub中的example和部分源码



作者:RavenZZ
链接:https://www.jianshu.com/p/dbac491317cc


點(diǎn)擊查看更多內(nèi)容
TA 點(diǎn)贊

若覺得本文不錯(cuò),就分享一下吧!

評論

作者其他優(yōu)質(zhì)文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學(xué)習(xí),寫下你的評論
感謝您的支持,我會(huì)繼續(xù)努力的~
掃碼打賞,你說多少就多少
贊賞金額會(huì)直接到老師賬戶
支付方式
打開微信掃一掃,即可進(jìn)行掃碼打賞哦
今天注冊有機(jī)會(huì)得

100積分直接送

付費(fèi)專欄免費(fèi)學(xué)

大額優(yōu)惠券免費(fèi)領(lǐng)

立即參與 放棄機(jī)會(huì)
微信客服

購課補(bǔ)貼
聯(lián)系客服咨詢優(yōu)惠詳情

幫助反饋 APP下載

慕課網(wǎng)APP
您的移動(dòng)學(xué)習(xí)伙伴

公眾號(hào)

掃描二維碼
關(guān)注慕課網(wǎng)微信公眾號(hào)

舉報(bào)

0/150
提交
取消