第七色在线视频,2021少妇久久久久久久久久,亚洲欧洲精品成人久久av18,亚洲国产精品特色大片观看完整版,孙宇晨将参加特朗普的晚宴

        為了賬號安全,請及時綁定郵箱和手機(jī)立即綁定

        HBase實(shí)操:Spark-Read-HBase-Snapshot-Demo 分享

        標(biāo)簽:
        大數(shù)據(jù)

        **前言:**之前给大家分享了Spark通过接口直接读取HBase的一个小demo:HBase-Spark-Read-Demo,但如果在数据量非常大的情况下,Spark直接扫描HBase表必然会对HBase集群造成不小的压力。基于此,今天再给大家分享一下Spark通过Snapshot直接读取HBase HFile文件的方式。

        首先我们先创建一个HBase表:test,并插入几条数据,如下:

        hbase(main):003:0> scan 'test'
        ROW                                              COLUMN+CELL                                                                                                                                                                                                                       
         r1                                              column=f:name, timestamp=1583318512414, value=zpb                                                                                               
         r2                                              column=f:name, timestamp=1583318517079, value=lisi                                                                                               
         r3                                              column=f:name, timestamp=1583318520839, value=wang                                                                                               
        

        接着,我们创建该HBase表的快照,其在HDFS上路径如下:

        hbase(main):005:0> snapshot 'test', 'test-snapshot'
        0 row(s) in 0.3690 seconds
        
        $ hdfs dfs -ls /apps/hbase/data/.hbase-snapshot
        Found 1 items
        drwxr-xr-x   - hbase hdfs          0 2020-03-21 21:24 /apps/hbase/data/.hbase-snapshot/test-snapshot
        

        代码如下:

        import org.apache.hadoop.fs.Path
        import org.apache.hadoop.conf.Configuration
        import org.apache.hadoop.hbase._
        import org.apache.hadoop.mapreduce.Job
        import org.apache.hadoop.hbase.client.Scan
        import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
        import org.apache.hadoop.hbase.protobuf.ProtobufUtil
        import org.apache.hadoop.hbase.util.{Base64, Bytes}
        import org.apache.spark.{SparkConf, SparkContext}
        
        
        object SparkReadHBaseSnapshotDemo {
        
          //   主函数
          def main(args: Array[String]) {
        
            // 设置spark访问入口
            val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo")
              .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
              .setMaster("local")//调试
        
            val sc = new SparkContext(conf)
            // 获取HbaseRDD
            val job = Job.getInstance(getHbaseConf())
            TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp"))
        
            val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
              classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
              classOf[org.apache.hadoop.hbase.client.Result])
              hbaseRDD.map(_._2).map(getRes(_)).count()
          }
        
        
          def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
            val rowkey = Bytes.toString(result.getRow())
            val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
            println(rowkey+"---"+name)
            name
          }
          // 构造 Hbase 配置信息
          def getHbaseConf(): Configuration = {
            val conf: Configuration = HBaseConfiguration.create()
            conf.set(TableInputFormat.SCAN, getScanStr())
            conf
          }
        
          // 获取扫描器
          def getScanStr(): String = {
            val scan = new Scan()
            // scan.set....  各种过滤
            val proto = ProtobufUtil.toScan(scan)
            Base64.encodeBytes(proto.toByteArray())
          }
        }
        

        **注:**上述代码需将core-site.xml&hdfs-site.xml&hbase-site.xml文件放在资源目录resources下。否则,应在代码中进行配置,代码如下:

        package com.xcar.etl
        
        import org.apache.hadoop.fs.Path
        import org.apache.hadoop.conf.Configuration
        import org.apache.hadoop.hbase._
        import org.apache.hadoop.mapreduce.Job
        import org.apache.hadoop.hbase.client.Scan
        import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
        import org.apache.hadoop.hbase.protobuf.ProtobufUtil
        import org.apache.hadoop.hbase.util.{Base64, Bytes}
        import org.apache.spark.{SparkConf, SparkContext}
        
        
        object SparkReadHBaseSnapshotDemo2 {
        
          val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn"
        
          //   主函数
          def main(args: Array[String]) {
        
            // 设置spark访问入口
            val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo2")
              .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
              .setMaster("local")//调试
        
            val sc = new SparkContext(conf)
            // 获取HbaseRDD
            val job = Job.getInstance(getHbaseConf())
            TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp"))
        
            val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
              classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
              classOf[org.apache.hadoop.hbase.client.Result])
            hbaseRDD.map(_._2).map(getRes(_)).count()
          }
        
          def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
            val rowkey = Bytes.toString(result.getRow())
            val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
            println(rowkey+"---"+name)
            name
          }
        
          // 构造 Hbase 配置信息
          def getHbaseConf(): Configuration = {
            val conf: Configuration = HBaseConfiguration.create()
            conf.set("hbase.zookeeper.property.clientPort", "2181")
            conf.set("zookeeper.znode.parent", "/hbase")
            conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
            conf.set("hbase.rootdir", "/apps/hbase")
            // 设置查询的表名
            conf.set(TableInputFormat.INPUT_TABLE, "test")
            conf.set("fs.defaultFS","hdfs://xxxxxx:8020") 
            conf.set(TableInputFormat.SCAN, getScanStr())
            conf
          }
        
          // 获取扫描器
          def getScanStr(): String = {
            val scan = new Scan()
            // scan.set....  各种过滤
            val proto = ProtobufUtil.toScan(scan)
            Base64.encodeBytes(proto.toByteArray())
          }
        }
        

        TableSnapshotInputFormat.setInput 方法参数解析:

        public static void setInput(org.apache.hadoop.mapreduce.Job job,
                                    String snapshotName,
                                    org.apache.hadoop.fs.Path restoreDir)
                                    throws IOException
        参数解析:
        job - the job to configure
        snapshotName - the name of the snapshot to read from
        restoreDir - a temporary directory to restore the snapshot into. 
        Current user should have write permissions to this directory, and this should not be a subdirectory of rootdir. 
        After the job is finished, restoreDir can be deleted.
        

        项目用到的 pom.xml 文件:

        <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <modelVersion>4.0.0</modelVersion>
        
            <groupId>com.zpb.test</groupId>
            <artifactId>spark-read-hbase-snapshot-demo</artifactId>
            <version>1.0-SNAPSHOT</version>
            <packaging>jar</packaging>
        
            <name>spark-read-hbase-snapshot-demo</name>
            <url>http://maven.apache.org</url>
        
            <repositories>
                <repository>
                    <id>cloudera</id>
                    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
                </repository>
            </repositories>
        
            <properties>
                <cdh.hbase.version>1.2.0-cdh5.7.0</cdh.hbase.version>
                <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version>
                <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            </properties>
        
            <dependencies>
                <dependency>
                    <groupId>junit</groupId>
                    <artifactId>junit</artifactId>
                    <version>3.8.1</version>
                    <scope>test</scope>
                </dependency>
                <dependency>
                    <groupId>com.alibaba</groupId>
                    <artifactId>fastjson</artifactId>
                    <version>1.2.62</version>
                </dependency>
                <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.10</artifactId>
                    <version>${cdh.spark.version}</version>
                    <!--<scope>provided</scope>-->
                </dependency>
                <dependency>
                    <groupId>org.apache.hbase</groupId>
                    <artifactId>hbase-server</artifactId>
                    <version>${cdh.hbase.version}</version>
                </dependency>
            </dependencies>
        </project>
        
        點(diǎn)擊查看更多內(nèi)容
        TA 點(diǎn)贊

        若覺得本文不錯,就分享一下吧!

        評論

        作者其他優(yōu)質(zhì)文章

        正在加載中
        • 推薦
        • 評論
        • 收藏
        • 共同學(xué)習(xí),寫下你的評論
        感謝您的支持,我會繼續(xù)努力的~
        掃碼打賞,你說多少就多少
        贊賞金額會直接到老師賬戶
        支付方式
        打開微信掃一掃,即可進(jìn)行掃碼打賞哦
        今天注冊有機(jī)會得

        100積分直接送

        付費(fèi)專欄免費(fèi)學(xué)

        大額優(yōu)惠券免費(fèi)領(lǐng)

        立即參與 放棄機(jī)會
        微信客服

        購課補(bǔ)貼
        聯(lián)系客服咨詢優(yōu)惠詳情

        幫助反饋 APP下載

        慕課網(wǎng)APP
        您的移動學(xué)習(xí)伙伴

        公眾號

        掃描二維碼
        關(guān)注慕課網(wǎng)微信公眾號

        舉報

        0/150
        提交
        取消