中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark 讀取Hbase表數據并實現類似groupByKe

發布時間:2020-02-25 07:16:37 來源:網絡 閱讀:2960 作者:CaramelLatte 欄目:關系型數據庫


一、概述 
程序運行環境很重要,本次測試基于: 
hadoop-2.6.5 
spark-1.6.2 
hbase-1.2.4 
zookeeper-3.4.6 
jdk-1.8 
廢話不多說了,直接上需求


Andy column=baseINFO:age,  value=21

Andy column=baseINFO:gender,  value=0

Andy column=baseINFO:telphone_number, value=110110110

Tom  column=baseINFO:age, value=18

Tom  column=baseINFO:gender, value=1

Tom  column=baseINFO:telphone_number, value=120120120

如上表所示,將之用spark進行分組,達到這樣的效果:

[Andy,(21,0,110110110)] 
[Tom,(18,1,120120120)] 
需求比較簡單,主要是熟悉一下程序運行過程

二、具體代碼


package com.union.bigdata.spark.hbase;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.mapreduce.TableSplit;import org.apache.hadoop.hbase.util.Base64;import org.apache.hadoop.hbase.util.Bytes;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.client.Scan;import org.apache.hadoop.hbase.client.Result;import org.apache.hadoop.hbase.io.ImmutableBytesWritable;import org.apache.hadoop.hbase.mapreduce.TableInputFormat;import org.apache.spark.api.java.JavaPairRDD;import org.apache.hadoop.hbase.protobuf.ProtobufUtil;import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple10;import scala.Tuple2;import java.io.IOException;import java.util.ArrayList;import java.util.List;public class ReadHbase {

    private static String appName = "ReadTable";

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();

    //we can also run it at local:"local[3]"  the number 3 means 3 threads
        sparkConf.setMaster("spark://master:7077").setAppName(appName);
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master"); 
        conf.set("hbase.zookeeper.property.clientPort", "2181"); 
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("baseINFO"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
        scan.addColumn(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));

        String scanToString = "";
        try {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            scanToString = Base64.encodeBytes(proto.toByteArray());
        } catch (IOException io) {
            System.out.println(io);
        }

        for (int i = 0; i < 2; i++) {
            try {
                String tableName = "VIPUSER";
                conf.set(TableInputFormat.INPUT_TABLE, tableName);
                conf.set(TableInputFormat.SCAN, scanToString);

                //get the Result of query from the Table of Hbase
                JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD = jsc.newAPIHadoopRDD(conf,
                        TableInputFormat.class, ImmutableBytesWritable.class,
                        Result.class);

                //group by row key like : [(Andy,110,21,0),(Tom,120,18,1)]
                JavaPairRDD<String, List<Integer>> art_scores = hBaseRDD.mapToPair(
                        new PairFunction<Tuple2<ImmutableBytesWritable, Result>, String, List<Integer>>() {
                            @Override
                            public Tuple2<String, List<Integer>> call(Tuple2<ImmutableBytesWritable, Result> results) {

                                List<Integer> list = new ArrayList<Integer>();

                                byte[] telphone_number = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("telphone_number"));
                                byte[] age = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("age"));
                                byte[] gender = results._2().getValue(Bytes.toBytes("baseINFO"), Bytes.toBytes("gender"));

                //the type of storage at Hbase is Byte Array,so we must let it be normal like Int,String and so on 
                                list.add(Integer.parseInt(Bytes.toString(telphone_number)));
                                list.add(Integer.parseInt(Bytes.toString(age)));
                                list.add(Integer.parseInt(Bytes.toString(gender)));

                                return new Tuple2<String, List<Integer>>(Bytes.toString(results._1().get()), list);
                            }
                        }
                );

                //switch to Cartesian product 
                JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart = art_scores.cartesian(art_scores);

                //use Row Key to delete the repetition from the last step "Cartesian product"  
                JavaPairRDD<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> cart2 = cart.filter(
                        new Function<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>, Boolean>() {
                            public Boolean call(Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>> tuple2Tuple2Tuple2) throws Exception {

                                return tuple2Tuple2Tuple2._1()._1().compareTo(tuple2Tuple2Tuple2._2()._1()) < 0;
                            }
                        }
                );

                System.out.println("Create the List 'collect'...");

        //get the result we need
                 List<Tuple2<Tuple2<String, List<Integer>>, Tuple2<String, List<Integer>>>> collect = cart2.collect();
                 System.out.println("Done..");
                 System.out.println(collect.size() > i ? collect.get(i):"STOP");

                 if (collect.size() > i ) break;
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}


三、程序運行過程分析 
1、spark自檢以及Driver和excutor的啟動過程 
實例化一個SparkContext(若在spark2.x下,這里初始化的是一個SparkSession對象),這時候啟動SecurityManager線程去檢查用戶權限,OK之后創建sparkDriver線程,spark底層遠程通信模塊(akka框架實現)啟動并監聽sparkDriver,之后由sparkEnv對象來注冊BlockManagerMaster線程,由它的實現類對象去監測運行資源 
2、zookeeper與Hbase的自檢和啟動 
第一步順利完成之后由sparkContext對象去實例去啟動程序訪問Hbase的入口,觸發之后zookeeper完成自己的一系列自檢活動,包括用戶權限、操作系統、數據目錄等,一切OK之后初始化客戶端連接對象,之后由Hbase的ClientCnxn對象來建立與master的完整連接 
3、spark job 的運行 
程序開始調用spark的action類方法,比如這里調用了collect,會觸發job的執行,這個流程網上資料很詳細,無非就是DAGScheduler搞的一大堆事情,連帶著出現一大堆線程,比如TaskSetManager、TaskScheduler等等,最后完成job,返回結果集 
4、結束程序 
正確返回結果集之后,sparkContext利用反射調用stop()方法,這之后也會觸發一系列的stop操作,主要線程有這些:BlockManager,ShutdownHookManager,后面還有釋放actor的操作等等,最后一切結束,臨時數據和目錄會被刪除,資源會被釋放

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

南木林县| 边坝县| 台州市| 娄烦县| 松原市| 孝义市| 花莲市| 桂阳县| 乐昌市| 泽州县| 长武县| 沁源县| 云浮市| 棋牌| 桓台县| 东莞市| 青海省| 宁晋县| 衢州市| 德化县| 上高县| 五河县| 苏州市| 赤水市| 南靖县| 浦江县| 津南区| 类乌齐县| 察隅县| 九江县| 福安市| 和田市| 涞水县| 蒙自县| 康平县| 陆丰市| 芜湖市| 驻马店市| 南投市| 垦利县| 开远市|