中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink數據流DataStream和DataSet怎么使用

發布時間:2021-12-31 14:35:10 來源:億速云 閱讀:169 作者:iii 欄目:大數據

這篇文章主要介紹“Flink數據流DataStream和DataSet怎么使用”,在日常操作中,相信很多人在Flink數據流DataStream和DataSet怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink數據流DataStream和DataSet怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

Flink主要用來處理數據流,所以從抽象上來看就是對數據流的處理,正如前面大數據開發-Flink-體系結構 && 運行架構提到寫Flink程序實際上就是在寫DataSource、Transformation、Sink.

  • DataSource是程序的數據源輸入,可以通過StreamExecutionEnvironment.addSource(sourceFuntion)為程序 添加一個數據源

  • Transformation是具體的操作,它對一個或多個輸入數據源進行計算處理,比如Map、FlatMap和Filter等操作

  • Sink是程序的輸出,它可以把Transformation處理之后的數據輸出到指定的存儲介質中

DataStream的三種流處理Api

DataSource

Flink針對DataStream提供了兩種實現方式的數據源,可以歸納為以下四種:

  • 基于文件

    readTextFile(path) 讀取文本文件,文件遵循TextInputFormat逐行讀取規則并返回

  • 基于Socket

    socketTextStream 從Socket中讀取數據,元素可以通過一個分隔符分開

  • 基于集合

    fromCollection(Collection) 通過Java的Collection集合創建一個數據流,集合中的所有元素必須是相同類型的,需要注意的是,如果集合里面的元素要識別為POJO,需要滿足下面的條件

    總結:上面的要求其實就是為了讓Flink可以方便地序列化和反序列化這些對象為數據流

    • 該類有共有的無參構造方法

    • 該類是共有且獨立的(沒有非靜態內部類)

    • 類(及父類)中所有的不被static、transient修飾的屬性要么有公有的(且不被final修飾),要么是包含公有的getter和setter方法,這些方法遵循java bean命名規范

  • 自定義Source

    使用StreamExecutionEnvironment.addSource(sourceFunction)將一個流式數據源加到程序中,具體這個sourceFunction 是為非并行源implements SourceFunction,或者為并行源 implements ParallelSourceFunction接口,或者extends RichParallelSourceFunction,對于自定義Source,Sink, Flink內置了下面幾種Connector

連接器是否提供Source支持是否提供Sink支持
Apache Kafka
ElasticSearch
HDFS
Twitter Streaming PI

對于Source的使用,其實較簡單,這里給一個較常用的自定義Source的KafaSource的使用例子。更多相關源碼可以查看:

package com.hoult.stream;


public class SourceFromKafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String topic = "animalN";
        Properties props = new Properties();
        props.put("bootstrap.servers", "linux121:9092");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), props);

        DataStreamSource<String> data = env.addSource(consumer);

        SingleOutputStreamOperator<Tuple2<Long, Long>> maped = data.map(new MapFunction<String, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(String value) throws Exception {
                System.out.println(value);

                Tuple2<Long,Long> t = new Tuple2<Long,Long>(0l,0l);
                String[] split = value.split(",");

                try{
                    t = new Tuple2<Long, Long>(Long.valueOf(split[0]), Long.valueOf(split[1]));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return t;


            }
        });
        KeyedStream<Tuple2<Long,Long>, Long> keyed = maped.keyBy(value -> value.f0);
        //按照key分組策略,對流式數據調用狀態化處理
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMaped = keyed.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
            ValueState<Tuple2<Long, Long>> sumState;

            @Override
            public void open(Configuration parameters) throws Exception {
                //在open方法中做出State
                ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                        "average",
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
                        }),
                        Tuple2.of(0L, 0L)
                );

                sumState = getRuntimeContext().getState(descriptor);
//                super.open(parameters);
            }

            @Override
            public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                //在flatMap方法中,更新State
                Tuple2<Long, Long> currentSum = sumState.value();

                currentSum.f0 += 1;
                currentSum.f1 += value.f1;

                sumState.update(currentSum);
                out.collect(currentSum);


                /*if (currentSum.f0 == 2) {
                    long avarage = currentSum.f1 / currentSum.f0;
                    out.collect(new Tuple2<>(value.f0, avarage));
                    sumState.clear();
                }*/

            }
        });

        flatMaped.print();

        env.execute();
    }
}

Transformation

對于Transformation ,Flink提供了很多的算子,

  • map

    DataStream → DataStream Takes one element and produces one element. A map function that doubles the values of the input stream:

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
      return 2 * value;
    }
});
  • flatMap

    DataStream → DataStream Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

dataStream.flatMap(new FlatMapFunction<String, String>() {
  @Override
  public void flatMap(String value, Collector<String> out) throws Exception {
    for(String word: value.split(" ")){
      out.collect(word);
    }
  }
});
  • filter

    DataStream → DataStream Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter(new FilterFunction<Integer>() {
  @Override
  public boolean filter(Integer value) throws Exception {
    return value != 0;
  }
});
  • keyBy

    DataStream → KeyedStream Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys. This transformation returns a KeyedStream, which is, among other things, required to use keyed state.

    Attention A type cannot be a key if:

  • fold

  • aggregation

  • window/windowAll/window.apply/window.reduce/window.fold/window.aggregation

dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple

更多算子操作可以查看官網,官網寫的很好:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/

Sink

Flink針對DataStream提供了大量的已經實現的數據目的地(Sink),具體如下所示

  • writeAsText():講元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取

  • print()/printToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中

  • 自定義輸出:addSink可以實現把數據輸出到第三方存儲介質中, Flink提供了一批內置的Connector,其中有的Connector會提供對應的Sink支持

這里舉一個常見的例子,下層到Kafka

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
public class StreamToKafka {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<String> data = env.socketTextStream("teacher2", 7777);
    String brokerList = "teacher2:9092";
    String topic = "mytopic2";
    FlinkKafkaProducer producer = new FlinkKafkaProducer(brokerList, topic, new SimpleStringSchema());
    data.addSink(producer);
    env.execute();
  }
}

DataSet的常用Api

DataSource

對DataSet批處理而言,較為頻繁的操作是讀取HDFS中的文件數據,因為這里主要介紹兩個DataSource組件

  • 基于集合 ,用來測試和DataStream類似

  • 基于文件 readTextFile....

Transformation

Flink數據流DataStream和DataSet怎么使用 

Sink

Flink針對DataStream提供了大量的已經實現的數據目的地(Sink),具體如下所示

  • writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調用每個元素的toString()方法來獲取

  • writeAsCsv():將元組以逗號分隔寫入文件中,行及字段之間的分隔是可配置的,每個字段的值來自對象的

  • toString()方法

  • print()/pringToErr():打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中 Flink提供了一批內置的Connector,其中有的Connector會提供對應的Sink支持。

到此,關于“Flink數據流DataStream和DataSet怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

黄梅县| 曲松县| 九龙坡区| 兴安县| 建湖县| 齐河县| 兰考县| 桦甸市| 寿阳县| 古蔺县| 永泰县| 合阳县| 新乡县| 鹿泉市| 车险| 宁乡县| 怀柔区| 常德市| 凤冈县| 东乌珠穆沁旗| 九江市| 墨江| 汪清县| 南丹县| 仁寿县| 铜川市| 勐海县| 赫章县| 乐山市| 安乡县| 定远县| 南和县| 葵青区| 隆化县| 车险| 毕节市| 桃源县| 房产| 凤冈县| 兴城市| 高台县|