中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

發布時間:2020-08-31 13:02:48 來源:網絡 閱讀:7981 作者:xpleaf 欄目:大數據

[TOC]


實時WordCount案例

主要是監聽網絡端口中的數據,并實時進行wc的計算。

Java版

測試代碼如下:

package cn.xpleaf.bigdata.spark.java.streaming.p1;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * 使用Java開發SparkStreaming的第一個應用程序
 *
 * 用于監聽網絡socket中的一個端口,實時獲取對應的文本內容
 * 計算文本內容中的每一個單詞出現的次數
 */
public class _01SparkStreamingNetWorkWCOps {
    public static void main(String[] args) {
        if(args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usage: <hostname> <port>");
            System.exit(-1);
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);

        SparkConf conf = new SparkConf()
                .setAppName(_01SparkStreamingNetWorkWCOps.class.getSimpleName())
        /*
         *  設置為local是無法計算數據,但是能夠接收數據
         *  設置為local[2]是既可以計算數據,也可以接收數據
         *      當master被設置為local的時候,只有一個線程,且只能被用來接收外部的數據,所以不能夠進行計算,如此便不會做對應的輸出
         *      所以在使用的本地模式時,同時是監聽網絡socket數據,線程個數必須大于等于2
         */
                .setMaster("local[2]");
        /**
         * 第二個參數:Duration是SparkStreaming用于進行采集多長時間段內的數據將其拆分成一個個batch
         * 該例表示每隔2秒采集一次數據,將數據打散成一個個batch(其實就是SparkCore中的一個個RDD)
         */
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));

        String hostname = args[0].trim();
        int port = Integer.valueOf(args[1].trim());

        JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);// 默認的持久化級別StorageLevel.MEMORY_AND_DISK_SER_2

        JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String line) throws Exception {

                return Arrays.asList(line.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
            return new Tuple2<String, Integer>(word, 1);
        });

        JavaPairDStream<String, Integer> retDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        retDStream.print();

        // 啟動流式計算
        jsc.start();
        // 等待執行結束
        jsc.awaitTermination();
        System.out.println("結束了沒有呀,哈哈哈~");
        jsc.close();
    }
}

啟動程序,同時在主機上使用nc命令進行操作:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me

輸出結果如下:

-------------------------------------------
Time: 1525929096000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)

同時也可以在Spark UI上查看相應的作業執行情況:

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

可以看到,每2秒就會執行一次計算,即每隔2秒采集一次數據,將數據打散成一個個batch(其實就是SparkCore中的一個個RDD)。

Scala版

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object _01SparkStreamingNetWorkOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port>
                |hostname: 監聽的網絡socket的主機名或ip地址
                |port:    監聽的網絡socket的端口
              """.stripMargin)
            System.exit(-1)
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
                            .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
                            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(2))
        val hostname = args(0).trim
        val port = args(1).trim.toInt

        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
        val wordsDStream:DStream[String] = linesDStream.flatMap({case line => line.split(" ")})
        val pairsDStream:DStream[(String, Integer)] = wordsDStream.map({case word => (word, 1)})
        val retDStream:DStream[(String, Integer)] = pairsDStream.reduceByKey{case (v1, v2) => v1 + v2}

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()  // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
    }
}

啟動程序,同時在主機上使用nc命令進行操作:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me

輸出結果如下:

-------------------------------------------
Time: 1525929574000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)

StreamingContext和DStream詳解

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

StreamingContext的創建方式

1、在Spark中有兩種創建StreamingContext的方式

1)根據SparkConf進行創建

val conf = new SparkConf().setAppName(appname).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(10));

2)還可以根據SparkContext進行創建

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10));

appname,是用來在Spark UI上顯示的應用名稱。master,是一個Spark、Mesos或者Yarn集群的URL,或者是local[*]。

2、batch interval:Seconds(10)可以根據我們自己應用程序的情況進行不同的設置。

StreamingContext的創建、啟動和銷毀

一、一個StreamingContext定義之后,必須執行以下程序進行實時計算的執行

1、創建輸入DStream來創建輸入不同的數據源。

2、對DStream定義transformation和output等各種算子操作,來定義我們需要的各種實時計算邏輯。

3、調用StreamingContext的start()方法,進行啟動我們的實時處理數據。

4、調用StreamingContext的awaitTermination()方法,來等待應用程序的終止。可以使用CTRL+C手動停止,或者就是讓它持續不斷的運行進行計算。

5、也可以通過調用StreamingContext的stop()方法,來停止應用程序。

二、備注(十分重要)

1、只要我們一個StreamingContext啟動之后,我們就不能再往這個Application其中添加任何計算邏輯了。比如執行start()方法之后,還給某個DStream執行一個算子,這是不允許的。

2、一個StreamingContext停止之后,是肯定不能夠重啟的。調用stop()之后,不能再調用start()

3、必須保證一個JVM同時只能有一個StreamingContext啟動。在你的應用程序中,不能創建兩個StreamingContext。

4、調用stop()方法時,會同時停止內部的SparkContext,如果不希望如此,還希望后面繼續使用SparkContext創建其他類型的Context,比如SQLContext,那么就用stop(false)。

5、一個SparkContext可以創建多個StreamingContext,只要上一個先用stop(false)停止,再創建下一個即可。(注意與第2點的區別,這里是再創建了一個StreamingContext)

輸入DStream和Receiver

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

輸入DStream代表了來自數據源的輸入數據流。我們之前做過了一些例子,比如從文件讀取、從TCP、從HDFS讀取等。每個DSteam都會綁定一個Receiver對象,該對象是一個關鍵的核心組件,用來從我們的各種數據源接受數據,并將其存儲在Spark的內存當中,這個內存的StorageLevel,我們可以自己進行指定,老師在以后的例子中會講解這部分。

Spark Streaming提供了兩種內置的數據源支持:

1、基礎數據源:SSC API中直接提供了對這些數據源的支持,比如文件、tcp socket、Akka Actor等。

2、高級數據源:比如Kafka、Flume、Kinesis和Twitter等數據源,要引入第三方的JAR來完成我們的工作。

3、自定義數據源:比如我們的ZMQ、RabbitMQ、ActiveMQ等任何格式的自定義數據源。關于自定義數據源,老師在講解最后一個項目的時候,會使用此自定義數據源如果從ZMQ中讀取數據。官方提供的Spark-ZMQ是基于zmq2.0版本的,因為現在的 生產環境都是基于ZMQ4以上的版本了,所以必須自己定義并實現了一個自定義的receiver機制。

Spark Streaming的運行機制local[*]分析

1、如果我們想要在我們的Spark Streaming應用中并行讀取N多數據的話,我們可以啟動創建多個DStream。這樣子就會創建多個Receiver,老師最多的一個案例是啟動了128個Receive,每個receiver每秒的數據是1000W以上。

2、但是要注意的是,我們Spark Streaming Application的Executor進程,是個長時間運行的一個進程,因此它會獨占分給他的cpu core。所以它只能自己處理這件事情了,不能再干其他活了。

3、使用本地模式local運行我們的Spark Streaming程序時,絕對不能使用local或者 local[1]的模式。因為Spark Streaming運行的時候,必須要至少要有2個線程。如果只給了一條的話,Spark Streaming Application程序會直接hang在哪兒。 兩條線程的一條用來分配給Receiver接收數據,另外一條線程用來處理接受到的數據。因此我們想要進行本地測試的話,必須滿足local[N],這個N一定要大于2

4、如果我們想要在我們的Spark進群上運行的話,那么首先,必須要求我們的集群每個節點上,有>1個cpu core。其次,給Spark Streaming的每個executor分配的core,必須>1,這樣,才能保證分配到executor上運行的輸入DStream,兩條線程并行,一條運行Receiver,接收數據;一條處理數據。否則的話,只會接收數據,不會處理數據。

DStream與HDFS集成

輸入DFStream基礎數據源

基于HDFS文件的實時計算,其實就是監控我們的一個HDFS目錄,只要其中有新文件出現,就實時處理。相當于處理實時的文件流。

===》Spark Streaming會監視指定的HDFS目錄,并且處理出現在目錄中的文件。

1)在HDFS中的所有目錄下的文件,必須滿足相同的格式,不然的話,不容易處理。必須使用移動或者重命名的方式,將文件移入目錄。一旦處理之后,文件的內容及時改變,也不會再處理了。

2)基于HDFS的數據結源讀取是沒有receiver的,因此不會占用一個cpu core。

3)實際上在下面的測試案例中,一直也沒有效果,也就是監聽不到HDFS中的文件,本地文件也沒有效果;

基于HDFS的實時WordCounter案例實戰

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming監聽hdfs的某一個目錄的變化(新增文件)
  */
object _02SparkStreamingHDFSOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_02SparkStreamingHDFSOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))

        val linesDStream:DStream[String] = ssc.textFileStream("hdfs://ns1/input/spark/streaming/")
//        val linesDStream:DStream[String] = ssc.textFileStream("D:/data/spark/streaming")
        linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

DStream與Kafka集成(基于Receiver方式)

Spark與Kafka集成的方式

1、利用Kafka的Receiver方式進行集成

2、利用Kafka的Direct方式進行集成

Spark-Streaming獲取kafka數據的兩種方式-Receiver與Direct的方式,可以從代碼中簡單理解成Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據了。

基于Kafka的Receiver方式集成

這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。

補充說明:

(1)、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。所以,在KafkaUtils.createStream()中,提高partition的數量,只會增加一個Receiver中,讀取partition的線程的數量。不會增加Spark處理數據的并行度。

(2)、可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收數據。

(3)、如果基于容錯的文件系統,比如HDFS,啟用了預寫日志機制,接收到的數據都會被復制一份到預寫日志中。因此,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。

與Kafka的集成--Maven

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>1.6.2</version>
</dependency>

Kafka啟動、驗證和測試

啟動kafka服務

kafka-server-start.sh -daemon config/server.properties

創建topic

kafka-topics.sh --create --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3

列舉kafka中已經創建的topic

kafka-topics.sh --list --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

列舉每個節點都保護那些topic、Partition

kafka-topics.sh --describe --zookeeper uplooking01:2181, uplooking02:2181, uplooking03:21821 --topic spark-kafka
  leader:負責處理消息的讀和寫,leader是從所有節點中隨機選擇的.
  replicas:列出了所有的副本節點,不管節點是否在服務中.
  isr:是正在服務中的節點.

產生數據

kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092

消費數據

kafka-console-consumer.sh --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181

案例

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Kafka和SparkStreaming基于Receiver的模式集成
  */
object _03SparkStreamingKafkaReceiverOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_03SparkStreamingKafkaReceiverOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
//        ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka")   // checkpoint文件保存到hdfs中
        ssc.checkpoint("file:///D:/data/spark/streaming/checkpoint/streaming/kafka")    // checkpoint文件保存到本地文件系統

        /**
          * 使用Kafka Receiver的方式,來創建的輸入DStream,需要使用SparkStreaming提供的Kafka整合API
          * KafkaUtils
          */
        val zkQuorum = "uplooking01:2181,uplooking02:2181,uplooking03:2181"
        val groupId = "kafka-receiver-group-id"
        val topics:Map[String, Int] = Map("spark-kafka"->3)
        // ReceiverInputDStream中的key就是當前一條數據在kafka中的key,value就是該條數據對應的value
        val linesDStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)

        val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

在kafka中生產數據:

[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me

輸出結果如下:

-------------------------------------------
Time: 1525965130000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)

在上面的代碼中,還啟用了Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。

如果數據保存在本地文件系統,則如下:

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

如果數據保存在HDFS中,則如下:

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

DStream與Kafka集成(基于Direct方式)

Spark和Kafka集成Direct的特點

(1)Direct的方式是會直接操作kafka底層的元數據信息,這樣如果計算失敗了,可以把數據重新讀一下,重新處理。即數據一定會被處理。拉數據,是RDD在執行的時候直接去拉數據。

(2)由于直接操作的是kafka,kafka就相當于你底層的文件系統。這個時候能保證嚴格的事務一致性,即一定會被處理,而且只會被處理一次。而Receiver的方式則不能保證,因為Receiver和ZK中的數據可能不同步,Spark Streaming可能會重復消費數據,這個調優可以解決,但顯然沒有Direct方便。而Direct api直接是操作kafka的,spark streaming自己負責追蹤消費這個數據的偏移量或者offset,并且自己保存到checkpoint,所以它的數據一定是同步的,一定不會被重復。即使重啟也不會重復,因為checkpoint了,但是程序升級的時候,不能讀取原先的checkpoint,面對升級checkpoint無效這個問題,怎么解決呢?升級的時候讀取我指定的備份就可以了,即手動的指定checkpoint也是可以的,這就再次完美的確保了事務性,有且僅有一次的事務機制。那么怎么手動checkpoint呢?構建SparkStreaming的時候,有getorCreate這個api,它就會獲取checkpoint的內容,具體指定下這個checkpoint在哪就好了。

(3)由于底層是直接讀數據,沒有所謂的Receiver,直接是周期性(Batch Intervel)的查詢kafka,處理數據的時候,我們會使用基于kafka原生的Consumer api來獲取kafka中特定范圍(offset范圍)中的數據。這個時候,Direct Api訪問kafka帶來的一個顯而易見的性能上的好處就是,如果你要讀取多個partition,Spark也會創建RDD的partition,這個時候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個partition是沒任何關系的。這個優勢是你的RDD,其實本質上講在底層讀取kafka的時候,kafka的partition就相當于原先hdfs上的一個block。這就符合了數據本地性。RDD和kafka數據都在這邊。所以讀數據的地方,處理數據的地方和驅動數據處理的程序都在同樣的機器上,這樣就可以極大的提高性能。不足之處是由于RDD和kafka的patition是一對一的,想提高并行度就會比較麻煩。提高并行度還是repartition,即重新分區,因為產生shuffle,很耗時。這個問題,以后也許新版本可以自由配置比例,不是一對一。因為提高并行度,可以更好的利用集群的計算資源,這是很有意義的。

(4)不需要開啟wal機制,從數據零丟失的角度來看,極大的提升了效率,還至少能節省一倍的磁盤空間。從kafka獲取數據,比從hdfs獲取數據,因為zero copy的方式,速度肯定更快。

Kafka Direct VS Receiver

從高層次的角度看,之前的和Kafka集成方案(reciever方法)使用WAL工作方式如下:

1)運行在Spark workers/executors上的Kafka Receivers連續不斷地從Kafka中讀取數據,其中用到了Kafka中高層次的消費者API。

2)接收到的數據被存儲在Spark workers/executors中的內存,同時也被寫入到WAL中。只有接收到的數據被持久化到log中,Kafka Receivers才會去更新Zookeeper中Kafka的偏移量。

3)接收到的數據和WAL存儲位置信息被可靠地存儲,如果期間出現故障,這些信息被用來從錯誤中恢復,并繼續處理數據。

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

  • 這個方法可以保證從Kafka接收的數據不被丟失。但是在失敗的情況下,有些數據很有可能會被處理不止一次!這種情況在一些接收到的數據被可靠地保存到WAL中,但是還沒有來得及更新Zookeeper中Kafka偏移量,系統出現故障的情況下發生。這導致數據出現不一致性:Spark Streaming知道數據被接收,但是Kafka那邊認為數據還沒有被接收,這樣在系統恢復正常時,Kafka會再一次發送這些數據
  • 這種不一致產生的原因是因為兩個系統無法對那些已經接收到的數據信息保存進行原子操作。為了解決這個問題,只需要一個系統來維護那些已經發送或接收的一致性視圖,而且,這個系統需要擁有從失敗中恢復的一切控制權利。基于這些考慮,社區決定將所有的消費偏移量信息只存儲在Spark Streaming中,并且使用Kafka的低層次消費者API來從任意位置恢復數據

為了構建這個系統,新引入的Direct API采用完全不同于Receivers和WALs的處理方式。它不是啟動一個Receivers來連續不斷地從Kafka中接收數據并寫入到WAL中,而是簡單地給出每個batch區間需要讀取的偏移量位置,最后,每個batch的Job被運行,那些對應偏移量的數據在Kafka中已經準備好了。這些偏移量信息也被可靠地存儲(checkpoint),在從失敗中恢復

Spark Streaming筆記整理(二):案例、SSC、數據源與自定義Receiver

  • 需要注意的是,Spark Streaming可以在失敗以后重新從Kafka中讀取并處理那些數據段。然而,由于僅處理一次的語義,最后重新處理的結果和沒有失敗處理的結果是一致的。
  • 因此,Direct API消除了需要使用WAL和Receivers的情況,而且確保每個Kafka記錄僅被接收一次并被高效地接收。這就使得我們可以將Spark Streaming和Kafka很好地整合在一起。總體來說,這些特性使得流處理管道擁有高容錯性,高效性,而且很容易地被使用。

案例

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Kafka和SparkStreaming基于Direct的模式集成
  *
  * 在公司中使用Kafka-Direct方式
  */
object _04SparkStreamingKafkaDirectOps {
    def main(args: Array[String]): Unit = {
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_04SparkStreamingKafkaDirectOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))

//        ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka")   // checkpoint文件也是可以保存到hdfs中的,不過必要性不大了,對于direct的方式來說

        val kafkaParams:Map[String, String] = Map("metadata.broker.list"-> "uplooking01:9092,uplooking02:9092,uplooking03:9092")
        val topics:Set[String] = Set("spark-kafka")
        val linesDStream:InputDStream[(String, String)] = KafkaUtils.
            // 參數分別為:key類型,value類型,key的×××,value的×××
            createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

        val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

生產數據:

[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me

輸出結果如下:

-------------------------------------------
Time: 1525966750000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)

自定義Receiver

測試代碼如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * SparkStreaming自定義Receiver
  * 通過模擬Network來學習自定義Receiver
  *
  * 自定義的步驟:
  *     1.創建一個類繼承一個類或者實現某個接口
  *     2.復寫啟動的個別方法
  *     3.進行注冊調用
  */
object _05SparkStreamingCustomReceiverOps {
    def main(args: Array[String]): Unit = {
        if (args == null || args.length < 2) {
            System.err.println(
                """Parameter Errors! Usage: <hostname> <port>
                  |hostname: 監聽的網絡socket的主機名或ip地址
                  |port:    監聽的網絡socket的端口
                """.stripMargin)
            System.exit(-1)
        }
        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

        val conf = new SparkConf()
            .setAppName(_05SparkStreamingCustomReceiverOps.getClass.getSimpleName)
            .setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
        val hostname = args(0).trim
        val port = args(1).trim.toInt

        val linesDStream:ReceiverInputDStream[String] = ssc.receiverStream[String](new MyNetWorkReceiver(hostname, port))
        val retDStream:DStream[(String, Int)] = linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)

        retDStream.print()

        ssc.start()
        ssc.awaitTermination()
        ssc.stop()
    }
}

/**
  * 自定義receiver
  */
class MyNetWorkReceiver(storageLevel:StorageLevel) extends Receiver[String](storageLevel) {

    private var hostname:String = _
    private var port:Int = _

    def this(hostname:String, port:Int, storageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) {
        this(storageLevel)
        this.hostname = hostname
        this.port = port
    }

    /**
      * 啟動及其初始化receiver資源
      */
    override def onStart(): Unit = {
        val thread = new Thread() {
            override def run(): Unit = {
                receive()
            }
        }
        thread.setDaemon(true)  // 設置成為后臺線程
        thread.start()
    }

    // 接收數據的核心api 讀取網絡socket中的數據
    def receive(): Unit = {
        val socket = new Socket(hostname, port)
        val ins = socket.getInputStream()
        val br = new BufferedReader(new InputStreamReader(ins))
        var line:String = null
        while((line = br.readLine()) != null) {
            store(line)
        }
        ins.close()
        socket.close()
    }

    override def onStop(): Unit = {

    }
}

啟動nc,并輸入數據:

[uplooking@uplooking01 ~]$ nc -lk 4893
hello you hello he hello me

輸出結果如下:

(hello,3)
(me,1)
(you,1)
(he,1)
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

弥勒县| 确山县| 宁海县| 田林县| 沙坪坝区| 措美县| 嘉鱼县| 杭锦后旗| 鲜城| 太康县| 武乡县| 隆昌县| 依安县| 洪雅县| 资讯| 岳阳县| 尼玛县| 抚松县| 曲阜市| 呼伦贝尔市| 平南县| 浦北县| 吴川市| 开封县| 株洲市| 青田县| 肇州县| 赤峰市| 香港| 邵阳县| 南靖县| 江都市| 专栏| 泸州市| 东辽县| 元谋县| 建始县| 固安县| 汉川市| 页游| 洞头县|