中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

三、flink--DataStreamAPI原理以及用法

發布時間:2020-06-29 18:20:45 來源:網絡 閱讀:909 作者:隔壁小白 欄目:大數據

一、DataStream基本概述

1.1 datastream是什么?

datastream是flink提供給用戶使用的用于進行流計算和批處理的api,是對底層流式計算模型的api封裝,便于用戶編程。

1.2 datastream運行模型

一個完整的datastream運行模型一般由三部分組成,分別為Source、Transformation、Sink。DataSource主要負責數據的讀取(也就是從數據源讀取,可以批數據源,也可以是流式數據數據源),Transformation主要負責對屬于的轉換操作(也就是正常的業務處邏輯),Sink負責最終數據的輸出(計算結果的導出)。

1.3 datastream程序架構

一般來說,使用datastream api編寫flink程序,包括以下流程:
1、獲得一個執行環境;(Execution Environment)
2、加載/創建初始數據;(Source)
3、指定轉換這些數據;(Transformation)
4、指定放置計算結果的位置;(Sink)
5、觸發程序執行(這是流式計算必須的操作,如果是批處理則不需要)

二、DataStream api的使用

2.1 maven依賴配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>SparkDemo</groupId>
    <artifactId>SparkDemoTest</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.3</hadoop.version>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.version>1.6.1</flink.version>

    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.12</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.9.0</version>
        </dependency>

        <!--flink-->
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.6.1</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.6.1</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.22</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

    </dependencies>

    <!--下面這是maven打包scala的插件,一定要,否則直接忽略scala代碼-->
    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

2.2 獲取執行環境(Execution Environment)

有三種類型的執行環境:

1、StreamExecutionEnvironment.getExecutionEnvironment()
創建一個執行環境,表示當前執行程序的上下文。 如果程序是獨立調用的,則此方法返回本地執行環境;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。

2、StreamExecutionEnvironment.createLocalEnvironment()
返回本地執行環境,需要在調用時指定默認的并行度。

3、StreamExecutionEnvironment.createRemoteEnvironment()
返回集群執行環境,將Jar提交到遠程服務器。需要在調用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包。

2.3 常用數據源(source)

2.3.1 基于file的數據源

1、env.readTextFile(path)
一列一列的讀取遵循TextInputFormat規范的文本文件,并將結果作為String返回。

package flinktest;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExampleDemo {
    public static void main(String[] args) throws Exception {
        //1、創建環境對象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、讀取文件作為數據源
        DataStreamSource<String> fileSource = env.readTextFile("/tmp/test.txt");
        //3、打印數據
        fileSource.print();
        //4、啟動任務執行
        env.execute("test file source");
    }
}

2、env.readFile(fileInputFormat,path)
按照指定的fileinputformat格式來讀取文件。這里的fileinputformat可以自定義類

package flinktest;

import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExampleDemo {
    public static void main(String[] args) throws Exception {
        //1、創建環境對象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、讀取文件作為數據源
        DataStreamSource<String> fileSource = env.readFile(new TextInputFormat(new Path("/tmp/test.txt")), "/tmp/test.txt");
        //3、打印數據
        fileSource.print();
        //4、啟動任務執行
        env.execute("test file source");
    }
}

2.3.2 基于socket數據源

socketTextStream(host,port)

package flinktest;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ExampleDemo {
    public static void main(String[] args) throws Exception {
        //1、創建環境對象
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2、讀取socket作為數據源
        DataStreamSource<String> sourceSocket = env.socketTextStream("127.0.0.1", 1000);
        //3、打印數據
        sourceSocket.print();
        //4、啟動任務執行
        env.execute("test socket source");
    }
}

2.3.3 基于集合collection的數據源

1、fromCollection(Collection)
從集合中創建一個數據流,集合中所有元素的類型是一致的。

List<String> list = new ArrayList<>();
DataStreamSource<String> sourceCollection = env.fromCollection(list);       

2、fromCollection(Iterator)
從迭代(Iterator)中創建一個數據流,指定元素數據類型的類由iterator返回。

3、fromElements(Object)
從一個給定的對象序列中創建一個數據流,所有的對象必須是相同類型的

4、generateSequence(from, to)
從給定的間隔中并行地產生一個數字序列。讀取一定范圍的sequnce對象

2.3.4 自定義數據源

env.addSource(SourceFuntion)
自定義一個數據源實現類,然后 addSource 到到env中。比如場景的從kafka讀取數據,從mysql讀取數據

2.4 常用輸出(sink)

Data Sink 消費DataStream中的數據,并將它們轉發到文件、套接字、外部系統或者打印出。Flink有許多封裝在DataStream操作里的內置輸出格式。
1、 writeAsText
將元素以字符串形式逐行寫入(TextOutputFormat),這些字符串通過調用每個元素的toString()方法來獲取。

2、WriteAsCsv
將元組以逗號分隔寫入文件中(CsvOutputFormat),行及字段之間的分隔是可配置的。每個字段的值來自對象的toString()方法。

3、print/printToErr
打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。或者也可以在輸出流中添加一個前綴,這個可以幫助區分不同的打印調用,如果并行度大于1,那么輸出也會有一個標識由哪個任務產生的標志。

4、 writeUsingOutputFormat
自定義文件輸出的方法和基類(FileOutputFormat),支持自定義對象到字節的轉換。

5、writeToSocket
根據SerializationSchema 將元素寫入到socket中。

6、stream.addSink(SinkFunction)
使用自定義的sink類

2.5 常用算子(transformation operator)

2.5.1 map

DataStream → DataStream:輸入一個參數經過處理產生一個新的參數

DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    //這里將每個參數 * 2,然后返回
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

2.5.2 flatMap

DataStream → DataStream:輸入一個參數,產生0個、1個或者多個輸出。

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        //切割字符串,將處理之后的數據放到 collector 中。
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

2.5.3 filter

DataStream → DataStream:計算每個元素的布爾值,并返回布爾值為true的元素。下面這個例子是過濾出非0的元素:

dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

2.5.4 keyBy

DataStream → KeyedStream:要求輸入是tuple,或者是一個復合對象,里面有多個屬性(例如student類,里面有name、age等2個以上的屬性),反正就是必須有作為key和value的數據。根據key進行分區,相同key的在同一個分區,在內部使用hash實現。

//有不同方式指定key
dataStream.keyBy("someKey") // 指定key的字段名稱,常用于復合對象中
dataStream.keyBy(0) // 指定key的位置,常用于tuple中

2.5.5 reduce

KeyedStream → DataStream:一個分組數據流的聚合操作,合并當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果,也就是每一次聚合的結果都會返回,直到最后一次聚合結束,所以不是只返回最后一個聚合結果。

keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

2.5.6 fold

KeyedStream → DataStream
一個有初始值的分組數據流的滾動折疊操作,合并當前元素和前一次折疊操作的結果,并產生一個新的值,返回的流中包含每一次折疊的結果,而不是只返回最后一次折疊的最終結果。

DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  });

運行結果為:
假設數據源為 (1,2,3,4,5)
結果為:start-1,start-1-2...... 

2.5.7 aggregations

KeyedStream →DataStream:分組數據流上的滾動聚合操作。min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含最小值的元素(同樣原理適用于max和maxBy),返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

注意:在2.3.10之前的算子都是可以直接作用在Stream上的,因為他們不是聚合類型的操作,但是到2.3.10后你會發現,我們雖然可以對一個無邊界的流數據直接應用聚合算子,但是它會記錄下每一次的聚合結果,這往往不是我們想要的,其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的結果。

2.5.8 connect、coMap、coFlatMap

1、connect:
DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。

DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

2、coMap、coFlatMap
ConnectedStreams → DataStream:專門用于connect之后的stream操作的map和flatmap算子。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

2.5.9 split和select

split:
DataStream → SplitStream:將一個數據流拆分成兩個或者多個數據流.并且會給每個數據流起一個別名

select:SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。

SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

split.select("even").print();
split.select("odd").print();

2.5.10 union

DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。這和connect不一樣,connect并沒有合并多個stream

dataStream.union(otherStream1, otherStream2, ...);
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

扬中市| 武义县| 乌兰浩特市| 兴化市| 冀州市| 沅江市| 宜兴市| 怀集县| 白沙| 大丰市| 防城港市| 淳化县| 济南市| 资中县| 庆城县| 凤翔县| 湟中县| 密山市| 鹿泉市| 台北市| 长顺县| 桃源县| 林甸县| 铜川市| 鹿邑县| 教育| 乐山市| 奉节县| 通州市| 体育| 扶沟县| 嘉义市| 乾安县| 六枝特区| 东至县| 麟游县| 彰武县| 韶山市| 东城区| 乌拉特前旗| 兴文县|