您好,登錄后才能下訂單哦!
這篇文章主要介紹“Flink數據流DataStream和DataSet怎么使用”,在日常操作中,相信很多人在Flink數據流DataStream和DataSet怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink數據流DataStream和DataSet怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Flink主要用來處理數據流,所以從抽象上來看就是對數據流的處理,正如前面大數據開發-Flink-體系結構 && 運行架構提到寫Flink程序實際上就是在寫DataSource、Transformation、Sink.
DataSource是程序的數據源輸入,可以通過StreamExecutionEnvironment.addSource(sourceFuntion)為程序 添加一個數據源
Transformation是具體的操作,它對一個或多個輸入數據源進行計算處理,比如Map、FlatMap和Filter等操作
Sink是程序的輸出,它可以把Transformation處理之后的數據輸出到指定的存儲介質中
Flink針對DataStream提供了兩種實現方式的數據源,可以歸納為以下四種:
基于文件
readTextFile(path)
讀取文本文件,文件遵循TextInputFormat逐行讀取規則并返回
基于Socket
socketTextStream
從Socket中讀取數據,元素可以通過一個分隔符分開
基于集合
fromCollection(Collection)
通過Java的Collection集合創建一個數據流,集合中的所有元素必須是相同類型的,需要注意的是,如果集合里面的元素要識別為POJO,需要滿足下面的條件
總結:上面的要求其實就是為了讓Flink可以方便地序列化和反序列化這些對象為數據流
該類有共有的無參構造方法
該類是共有且獨立的(沒有非靜態內部類)
類(及父類)中所有的不被static、transient修飾的屬性要么有公有的(且不被final修飾),要么是包含公有的getter和setter方法,這些方法遵循java bean命名規范
自定義Source
使用StreamExecutionEnvironment.addSource(sourceFunction)
將一個流式數據源加到程序中,具體這個sourceFunction
是為非并行源implements SourceFunction
,或者為并行源 implements ParallelSourceFunction
接口,或者extends RichParallelSourceFunction
,對于自定義Source,Sink, Flink內置了下面幾種Connector
連接器 | 是否提供Source支持 | 是否提供Sink支持 |
---|---|---|
Apache Kafka | 是 | 是 |
ElasticSearch | 否 | 是 |
HDFS | 否 | 是 |
Twitter Streaming PI | 是 | 否 |
對于Source的使用,其實較簡單,這里給一個較常用的自定義Source的KafaSource的使用例子。更多相關源碼可以查看:
package com.hoult.stream; public class SourceFromKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String topic = "animalN"; Properties props = new Properties(); props.put("bootstrap.servers", "linux121:9092"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props); DataStreamSource<String> data = env.addSource(consumer); SingleOutputStreamOperator<Tuple2<Long, Long>> maped = data.map(new MapFunction<String, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(String value) throws Exception { System.out.println(value); Tuple2<Long,Long> t = new Tuple2<Long,Long>(0l,0l); String[] split = value.split(","); try{ t = new Tuple2<Long, Long>(Long.valueOf(split[0]), Long.valueOf(split[1])); } catch (Exception e) { e.printStackTrace(); } return t; } }); KeyedStream<Tuple2<Long,Long>, Long> keyed = maped.keyBy(value -> value.f0); //按照key分組策略,對流式數據調用狀態化處理 SingleOutputStreamOperator<Tuple2<Long, Long>> flatMaped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { ValueState<Tuple2<Long, Long>> sumState; @Override public void open(Configuration parameters) throws Exception { //在open方法中做出State ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }), Tuple2.of(0L, 0L) ); sumState = getRuntimeContext().getState(descriptor); // super.open(parameters); } @Override public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { //在flatMap方法中,更新State Tuple2<Long, Long> currentSum = sumState.value(); currentSum.f0 += 1; currentSum.f1 += value.f1; sumState.update(currentSum); out.collect(currentSum); /*if (currentSum.f0 == 2) { long avarage = currentSum.f1 / currentSum.f0; out.collect(new Tuple2<>(value.f0, avarage)); sumState.clear(); }*/ } }); flatMaped.print(); env.execute(); } }
對于Transformation ,Flink提供了很多的算子,
map
DataStream → DataStream Takes one element and produces one element. A map function that doubles the values of the input stream:
DataStream<Integer> dataStream = //... dataStream.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return 2 * value; } });
flatMap
DataStream → DataStream Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
filter
DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value != 0; } });
keyBy
DataStream → KeyedStream Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys. This transformation returns a KeyedStream, which is, among other things, required to use keyed state.
Attention A type cannot be a key if:
fold
aggregation
window/windowAll/window.apply/window.reduce/window.fold/window.aggregation
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey" dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
更多算子操作可以查看官網,官網寫的很好:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/
Flink針對DataStream提供了大量的已經實現的數據目的地(Sink),具體如下所示
writeAsText():講元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取
print()/printToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中
自定義輸出:addSink可以實現把數據輸出到第三方存儲介質中, Flink提供了一批內置的Connector,其中有的Connector會提供對應的Sink支持
這里舉一個常見的例子,下層到Kafka
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; public class StreamToKafka { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> data = env.socketTextStream("teacher2", 7777); String brokerList = "teacher2:9092"; String topic = "mytopic2"; FlinkKafkaProducer producer = new FlinkKafkaProducer(brokerList, topic, new SimpleStringSchema()); data.addSink(producer); env.execute(); } }
對DataSet批處理而言,較為頻繁的操作是讀取HDFS中的文件數據,因為這里主要介紹兩個DataSource組件
基于集合 ,用來測試和DataStream類似
基于文件 readTextFile....
Flink針對DataStream提供了大量的已經實現的數據目的地(Sink),具體如下所示
writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取
writeAsCsv():將元組以逗號分隔寫入文件中,行及字段之間的分隔是可配置的,每個字段的值來自對象的
toString()方法
print()/pringToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中 Flink提供了一批內置的Connector,其中有的Connector會提供對應的Sink支持。
到此,關于“Flink數據流DataStream和DataSet怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。