您好,登錄后才能下訂單哦!
[TOC]
主要是監聽網絡端口中的數據,并實時進行wc的計算。
測試代碼如下:
package cn.xpleaf.bigdata.spark.java.streaming.p1;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
/**
* 使用Java開發SparkStreaming的第一個應用程序
*
* 用于監聽網絡socket中的一個端口,實時獲取對應的文本內容
* 計算文本內容中的每一個單詞出現的次數
*/
public class _01SparkStreamingNetWorkWCOps {
public static void main(String[] args) {
if(args == null || args.length < 2) {
System.err.println("Parameter Errors! Usage: <hostname> <port>");
System.exit(-1);
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF);
SparkConf conf = new SparkConf()
.setAppName(_01SparkStreamingNetWorkWCOps.class.getSimpleName())
/*
* 設置為local是無法計算數據,但是能夠接收數據
* 設置為local[2]是既可以計算數據,也可以接收數據
* 當master被設置為local的時候,只有一個線程,且只能被用來接收外部的數據,所以不能夠進行計算,如此便不會做對應的輸出
* 所以在使用的本地模式時,同時是監聽網絡socket數據,線程個數必須大于等于2
*/
.setMaster("local[2]");
/**
* 第二個參數:Duration是SparkStreaming用于進行采集多長時間段內的數據將其拆分成一個個batch
* 該例表示每隔2秒采集一次數據,將數據打散成一個個batch(其實就是SparkCore中的一個個RDD)
*/
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));
String hostname = args[0].trim();
int port = Integer.valueOf(args[1].trim());
JavaReceiverInputDStream<String> lineDStream = jsc.socketTextStream(hostname, port);// 默認的持久化級別StorageLevel.MEMORY_AND_DISK_SER_2
JavaDStream<String> wordsDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairDStream<String, Integer> pairsDStream = wordsDStream.mapToPair(word -> {
return new Tuple2<String, Integer>(word, 1);
});
JavaPairDStream<String, Integer> retDStream = pairsDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
retDStream.print();
// 啟動流式計算
jsc.start();
// 等待執行結束
jsc.awaitTermination();
System.out.println("結束了沒有呀,哈哈哈~");
jsc.close();
}
}
啟動程序,同時在主機上使用nc命令進行操作:
[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me
輸出結果如下:
-------------------------------------------
Time: 1525929096000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)
同時也可以在Spark UI上查看相應的作業執行情況:
可以看到,每2秒就會執行一次計算,即每隔2秒采集一次數據,將數據打散成一個個batch(其實就是SparkCore中的一個個RDD)。
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object _01SparkStreamingNetWorkOps {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
System.err.println(
"""Parameter Errors! Usage: <hostname> <port>
|hostname: 監聽的網絡socket的主機名或ip地址
|port: 監聽的網絡socket的端口
""".stripMargin)
System.exit(-1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(2))
val hostname = args(0).trim
val port = args(1).trim.toInt
val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)
val wordsDStream:DStream[String] = linesDStream.flatMap({case line => line.split(" ")})
val pairsDStream:DStream[(String, Integer)] = wordsDStream.map({case word => (word, 1)})
val retDStream:DStream[(String, Integer)] = pairsDStream.reduceByKey{case (v1, v2) => v1 + v2}
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop() // stop中的boolean參數,設置為true,關閉該ssc對應的SparkContext,默認為false,只關閉自身
}
}
啟動程序,同時在主機上使用nc命令進行操作:
[uplooking@uplooking01 ~]$ nc -lk 4893
hello youe hello he hello me
輸出結果如下:
-------------------------------------------
Time: 1525929574000 ms
-------------------------------------------
(youe,1)
(hello,3)
(me,1)
(he,1)
1、在Spark中有兩種創建StreamingContext的方式
1)根據SparkConf進行創建
val conf = new SparkConf().setAppName(appname).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(10));
2)還可以根據SparkContext進行創建
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10));
appname,是用來在Spark UI上顯示的應用名稱。master,是一個Spark、Mesos或者Yarn集群的URL,或者是local[*]。
2、batch interval:Seconds(10)可以根據我們自己應用程序的情況進行不同的設置。
一、一個StreamingContext定義之后,必須執行以下程序進行實時計算的執行
1、創建輸入DStream來創建輸入不同的數據源。
2、對DStream定義transformation和output等各種算子操作,來定義我們需要的各種實時計算邏輯。
3、調用StreamingContext的start()方法,進行啟動我們的實時處理數據。
4、調用StreamingContext的awaitTermination()方法,來等待應用程序的終止。可以使用CTRL+C手動停止,或者就是讓它持續不斷的運行進行計算。
5、也可以通過調用StreamingContext的stop()方法,來停止應用程序。
二、備注(十分重要)
1、只要我們一個StreamingContext啟動之后,我們就不能再往這個Application其中添加任何計算邏輯了。比如執行start()方法之后,還給某個DStream執行一個算子,這是不允許的。
2、一個StreamingContext停止之后,是肯定不能夠重啟的。調用stop()之后,不能再調用start()
3、必須保證一個JVM同時只能有一個StreamingContext啟動。在你的應用程序中,不能創建兩個StreamingContext。
4、調用stop()方法時,會同時停止內部的SparkContext,如果不希望如此,還希望后面繼續使用SparkContext創建其他類型的Context,比如SQLContext,那么就用stop(false)。
5、一個SparkContext可以創建多個StreamingContext,只要上一個先用stop(false)停止,再創建下一個即可。(注意與第2點的區別,這里是再創建了一個StreamingContext)
輸入DStream代表了來自數據源的輸入數據流。我們之前做過了一些例子,比如從文件讀取、從TCP、從HDFS讀取等。每個DSteam都會綁定一個Receiver對象,該對象是一個關鍵的核心組件,用來從我們的各種數據源接受數據,并將其存儲在Spark的內存當中,這個內存的StorageLevel,我們可以自己進行指定,老師在以后的例子中會講解這部分。
Spark Streaming提供了兩種內置的數據源支持:
1、基礎數據源:SSC API中直接提供了對這些數據源的支持,比如文件、tcp socket、Akka Actor等。
2、高級數據源:比如Kafka、Flume、Kinesis和Twitter等數據源,要引入第三方的JAR來完成我們的工作。
3、自定義數據源:比如我們的ZMQ、RabbitMQ、ActiveMQ等任何格式的自定義數據源。關于自定義數據源,老師在講解最后一個項目的時候,會使用此自定義數據源如果從ZMQ中讀取數據。官方提供的Spark-ZMQ是基于zmq2.0版本的,因為現在的 生產環境都是基于ZMQ4以上的版本了,所以必須自己定義并實現了一個自定義的receiver機制。
1、如果我們想要在我們的Spark Streaming應用中并行讀取N多數據的話,我們可以啟動創建多個DStream。這樣子就會創建多個Receiver,老師最多的一個案例是啟動了128個Receive,每個receiver每秒的數據是1000W以上。
2、但是要注意的是,我們Spark Streaming Application的Executor進程,是個長時間運行的一個進程,因此它會獨占分給他的cpu core。所以它只能自己處理這件事情了,不能再干其他活了。
3、使用本地模式local運行我們的Spark Streaming程序時,絕對不能使用local或者 local[1]的模式。因為Spark Streaming運行的時候,必須要至少要有2個線程。如果只給了一條的話,Spark Streaming Application程序會直接hang在哪兒。 兩條線程的一條用來分配給Receiver接收數據,另外一條線程用來處理接受到的數據。因此我們想要進行本地測試的話,必須滿足local[N],這個N一定要大于2
4、如果我們想要在我們的Spark進群上運行的話,那么首先,必須要求我們的集群每個節點上,有>1個cpu core。其次,給Spark Streaming的每個executor分配的core,必須>1,這樣,才能保證分配到executor上運行的輸入DStream,兩條線程并行,一條運行Receiver,接收數據;一條處理數據。否則的話,只會接收數據,不會處理數據。
基于HDFS文件的實時計算,其實就是監控我們的一個HDFS目錄,只要其中有新文件出現,就實時處理。相當于處理實時的文件流。
===》Spark Streaming會監視指定的HDFS目錄,并且處理出現在目錄中的文件。
1)在HDFS中的所有目錄下的文件,必須滿足相同的格式,不然的話,不容易處理。必須使用移動或者重命名的方式,將文件移入目錄。一旦處理之后,文件的內容及時改變,也不會再處理了。
2)基于HDFS的數據結源讀取是沒有receiver的,因此不會占用一個cpu core。
3)實際上在下面的測試案例中,一直也沒有效果,也就是監聽不到HDFS中的文件,本地文件也沒有效果;
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* SparkStreaming監聽hdfs的某一個目錄的變化(新增文件)
*/
object _02SparkStreamingHDFSOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_02SparkStreamingHDFSOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val linesDStream:DStream[String] = ssc.textFileStream("hdfs://ns1/input/spark/streaming/")
// val linesDStream:DStream[String] = ssc.textFileStream("D:/data/spark/streaming")
linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
1、利用Kafka的Receiver方式進行集成
2、利用Kafka的Direct方式進行集成
Spark-Streaming獲取kafka數據的兩種方式-Receiver與Direct的方式,可以從代碼中簡單理解成Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據了。
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job會去處理那些數據。然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。
補充說明:
(1)、Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。所以,在KafkaUtils.createStream()中,提高partition的數量,只會增加一個Receiver中,讀取partition的線程的數量。不會增加Spark處理數據的并行度。
(2)、可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver并行接收數據。
(3)、如果基于容錯的文件系統,比如HDFS,啟用了預寫日志機制,接收到的數據都會被復制一份到預寫日志中。因此,在KafkaUtils.createStream()中,設置的持久化級別是StorageLevel.MEMORY_AND_DISK_SER。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
啟動kafka服務
kafka-server-start.sh -daemon config/server.properties
創建topic
kafka-topics.sh --create --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
列舉kafka中已經創建的topic
kafka-topics.sh --list --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
列舉每個節點都保護那些topic、Partition
kafka-topics.sh --describe --zookeeper uplooking01:2181, uplooking02:2181, uplooking03:21821 --topic spark-kafka
leader:負責處理消息的讀和寫,leader是從所有節點中隨機選擇的.
replicas:列出了所有的副本節點,不管節點是否在服務中.
isr:是正在服務中的節點.
產生數據
kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
消費數據
kafka-console-consumer.sh --topic spark-kafka --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Kafka和SparkStreaming基于Receiver的模式集成
*/
object _03SparkStreamingKafkaReceiverOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_03SparkStreamingKafkaReceiverOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka") // checkpoint文件保存到hdfs中
ssc.checkpoint("file:///D:/data/spark/streaming/checkpoint/streaming/kafka") // checkpoint文件保存到本地文件系統
/**
* 使用Kafka Receiver的方式,來創建的輸入DStream,需要使用SparkStreaming提供的Kafka整合API
* KafkaUtils
*/
val zkQuorum = "uplooking01:2181,uplooking02:2181,uplooking03:2181"
val groupId = "kafka-receiver-group-id"
val topics:Map[String, Int] = Map("spark-kafka"->3)
// ReceiverInputDStream中的key就是當前一條數據在kafka中的key,value就是該條數據對應的value
val linesDStream:ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
在kafka中生產數據:
[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me
輸出結果如下:
-------------------------------------------
Time: 1525965130000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)
在上面的代碼中,還啟用了Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。
如果數據保存在本地文件系統,則如下:
如果數據保存在HDFS中,則如下:
(1)Direct的方式是會直接操作kafka底層的元數據信息,這樣如果計算失敗了,可以把數據重新讀一下,重新處理。即數據一定會被處理。拉數據,是RDD在執行的時候直接去拉數據。
(2)由于直接操作的是kafka,kafka就相當于你底層的文件系統。這個時候能保證嚴格的事務一致性,即一定會被處理,而且只會被處理一次。而Receiver的方式則不能保證,因為Receiver和ZK中的數據可能不同步,Spark Streaming可能會重復消費數據,這個調優可以解決,但顯然沒有Direct方便。而Direct api直接是操作kafka的,spark streaming自己負責追蹤消費這個數據的偏移量或者offset,并且自己保存到checkpoint,所以它的數據一定是同步的,一定不會被重復。即使重啟也不會重復,因為checkpoint了,但是程序升級的時候,不能讀取原先的checkpoint,面對升級checkpoint無效這個問題,怎么解決呢?升級的時候讀取我指定的備份就可以了,即手動的指定checkpoint也是可以的,這就再次完美的確保了事務性,有且僅有一次的事務機制。那么怎么手動checkpoint呢?構建SparkStreaming的時候,有getorCreate這個api,它就會獲取checkpoint的內容,具體指定下這個checkpoint在哪就好了。
(3)由于底層是直接讀數據,沒有所謂的Receiver,直接是周期性(Batch Intervel)的查詢kafka,處理數據的時候,我們會使用基于kafka原生的Consumer api來獲取kafka中特定范圍(offset范圍)中的數據。這個時候,Direct Api訪問kafka帶來的一個顯而易見的性能上的好處就是,如果你要讀取多個partition,Spark也會創建RDD的partition,這個時候RDD的partition和kafka的partition是一致的。而Receiver的方式,這2個partition是沒任何關系的。這個優勢是你的RDD,其實本質上講在底層讀取kafka的時候,kafka的partition就相當于原先hdfs上的一個block。這就符合了數據本地性。RDD和kafka數據都在這邊。所以讀數據的地方,處理數據的地方和驅動數據處理的程序都在同樣的機器上,這樣就可以極大的提高性能。不足之處是由于RDD和kafka的patition是一對一的,想提高并行度就會比較麻煩。提高并行度還是repartition,即重新分區,因為產生shuffle,很耗時。這個問題,以后也許新版本可以自由配置比例,不是一對一。因為提高并行度,可以更好的利用集群的計算資源,這是很有意義的。
(4)不需要開啟wal機制,從數據零丟失的角度來看,極大的提升了效率,還至少能節省一倍的磁盤空間。從kafka獲取數據,比從hdfs獲取數據,因為zero copy的方式,速度肯定更快。
從高層次的角度看,之前的和Kafka集成方案(reciever方法)使用WAL工作方式如下:
1)運行在Spark workers/executors上的Kafka Receivers連續不斷地從Kafka中讀取數據,其中用到了Kafka中高層次的消費者API。
2)接收到的數據被存儲在Spark workers/executors中的內存,同時也被寫入到WAL中。只有接收到的數據被持久化到log中,Kafka Receivers才會去更新Zookeeper中Kafka的偏移量。
3)接收到的數據和WAL存儲位置信息被可靠地存儲,如果期間出現故障,這些信息被用來從錯誤中恢復,并繼續處理數據。
為了構建這個系統,新引入的Direct API采用完全不同于Receivers和WALs的處理方式。它不是啟動一個Receivers來連續不斷地從Kafka中接收數據并寫入到WAL中,而是簡單地給出每個batch區間需要讀取的偏移量位置,最后,每個batch的Job被運行,那些對應偏移量的數據在Kafka中已經準備好了。這些偏移量信息也被可靠地存儲(checkpoint),在從失敗中恢復
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import kafka.serializer.StringDecoder
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Kafka和SparkStreaming基于Direct的模式集成
*
* 在公司中使用Kafka-Direct方式
*/
object _04SparkStreamingKafkaDirectOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_04SparkStreamingKafkaDirectOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// ssc.checkpoint("hdfs://ns1/checkpoint/streaming/kafka") // checkpoint文件也是可以保存到hdfs中的,不過必要性不大了,對于direct的方式來說
val kafkaParams:Map[String, String] = Map("metadata.broker.list"-> "uplooking01:9092,uplooking02:9092,uplooking03:9092")
val topics:Set[String] = Set("spark-kafka")
val linesDStream:InputDStream[(String, String)] = KafkaUtils.
// 參數分別為:key類型,value類型,key的×××,value的×××
createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val retDStream = linesDStream.map(t => t._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
生產數據:
[uplooking@uplooking02 kafka]$ kafka-console-producer.sh --topic spark-kafka --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092
hello you hello he hello me
輸出結果如下:
-------------------------------------------
Time: 1525966750000 ms
-------------------------------------------
(hello,3)
(me,1)
(you,1)
(he,1)
測試代碼如下:
package cn.xpleaf.bigdata.spark.scala.streaming.p1
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* SparkStreaming自定義Receiver
* 通過模擬Network來學習自定義Receiver
*
* 自定義的步驟:
* 1.創建一個類繼承一個類或者實現某個接口
* 2.復寫啟動的個別方法
* 3.進行注冊調用
*/
object _05SparkStreamingCustomReceiverOps {
def main(args: Array[String]): Unit = {
if (args == null || args.length < 2) {
System.err.println(
"""Parameter Errors! Usage: <hostname> <port>
|hostname: 監聽的網絡socket的主機名或ip地址
|port: 監聽的網絡socket的端口
""".stripMargin)
System.exit(-1)
}
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val conf = new SparkConf()
.setAppName(_05SparkStreamingCustomReceiverOps.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
val hostname = args(0).trim
val port = args(1).trim.toInt
val linesDStream:ReceiverInputDStream[String] = ssc.receiverStream[String](new MyNetWorkReceiver(hostname, port))
val retDStream:DStream[(String, Int)] = linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
retDStream.print()
ssc.start()
ssc.awaitTermination()
ssc.stop()
}
}
/**
* 自定義receiver
*/
class MyNetWorkReceiver(storageLevel:StorageLevel) extends Receiver[String](storageLevel) {
private var hostname:String = _
private var port:Int = _
def this(hostname:String, port:Int, storageLevel:StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2) {
this(storageLevel)
this.hostname = hostname
this.port = port
}
/**
* 啟動及其初始化receiver資源
*/
override def onStart(): Unit = {
val thread = new Thread() {
override def run(): Unit = {
receive()
}
}
thread.setDaemon(true) // 設置成為后臺線程
thread.start()
}
// 接收數據的核心api 讀取網絡socket中的數據
def receive(): Unit = {
val socket = new Socket(hostname, port)
val ins = socket.getInputStream()
val br = new BufferedReader(new InputStreamReader(ins))
var line:String = null
while((line = br.readLine()) != null) {
store(line)
}
ins.close()
socket.close()
}
override def onStop(): Unit = {
}
}
啟動nc,并輸入數據:
[uplooking@uplooking01 ~]$ nc -lk 4893
hello you hello he hello me
輸出結果如下:
(hello,3)
(me,1)
(you,1)
(he,1)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。