您好,登錄后才能下訂單哦!
Flink開發如何批處理應用程序,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
詞頻統計,即給一個文件,統計文件中每個單詞出現的次數,分隔符是\t。這個文件內容如下:
hello world welcome hello welcome
統計結果直接打印在控制臺。生產環境下一般Sink到目的地。
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local
groupId: com.vincent artifactId: springboot-flink-train version:1.0 這樣就創建了一個項目,使用Idea導入這個項目,項目結構如下:
里面有兩個自動為我們準備好的java類。
第一步:創建批處理上下文環境
// set up the batch execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
第二步:讀取數據
env.readTextFile(textPath);
第三步:transform operations,例如 filter() flatMap() join() coGroup(),這是開發的核心所在,一般就是業務邏輯
第四步:execute program
第一步:讀取數據
hello welcome
第二步:每一行的數據按照指定的分隔符拆分
hello welcome
第三步:為每一個單詞賦上次數為1
(hello,1) (welcome,1)
第四步:合并操作
/** * 使用Java API來開發Flink的批處理應用程序 */ public class BatchWCJavaApp { public static void main(String[] args) throws Exception { String input = "E:/test/input/test.txt"; // step1: 獲取運行環境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // step2: 讀取數據 DataSource<String> text = env.readTextFile(input); // step3: transform // FlatMapFunction<String, Tuple2<String, Integer>表示進來一個String, 轉換成一個<String, Integer>類型 text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { /** * * @param value 就是一行一行的字符串 * @param out 轉換成(單詞,次數) * @throws Exception */ @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\t"); for(String token: tokens) { if(token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).groupBy(0).sum(1).print(); } }
(world,1) (hello,2) (welcome,2)
JDK:1.8
Maven:3.6.1(最低Maven 3.0.4)
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local
groupId: com.vincent artifactId: springboot-flink-train-scala version:1.0 這樣就創建了一個項目,使用Idea導入這個項目:
接下來的開發步驟與使用java實現的開發步驟是一樣的:這里給出
import org.apache.flink.api.scala.ExecutionEnvironment /** * 使用Scala開發Flink的批處理應用程序 */ object BatchWCScalaApp { def main(args: Array[String]): Unit = { val input = "E:/test/input/test.txt" val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(input) // 引入隱式轉換 import org.apache.flink.api.scala._ text.flatMap(_.toLowerCase.split("\t")) .filter(_.nonEmpty) .map((_, 1)) .groupBy(0) .sum(1) .print() } }
也就是transform部分雖然原理是一樣的,但是實現的方式不一樣,scala更加簡潔
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。