您好,登錄后才能下訂單哦!
如何入門ApacheFlink中的Flinksink,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
將DataSet中的數據Sink到哪里去。使用的是對應的OutPutFormat,也可以使用自定義的sink,有可能寫到hbase中,hdfs中。
writeAsText() / TextOutputFormat ,以String的形式寫入
writeAsCsv(...) / CsvOutputFormat,以CSV的方式寫進去
print() / printToErr() / print(String msg) / printToErr(String msg)以標準輸出
object DataSetSinkApp { def main(args: Array[String]): Unit = { val environment = ExecutionEnvironment.getExecutionEnvironment val data = 1.to(10) val text = environment.fromCollection(data) val filePath = "E:/test" text.writeAsText(filePath) environment.execute("DataSetSinkApp") } }
如果E:/test文件或者文件夾存在,將無法執行成功。除非增加一個WriteMode.OVERWRITE
text.writeAsText(filePath, WriteMode.OVERWRITE)
這樣就在E盤下新建了一個test文件,內容是1到10。
那么如何保存到文件夾中?
text.writeAsText(filePath, WriteMode.OVERWRITE).setParallelism(2)
設置并行度為2,這樣就存到test文件夾下,兩個文件1和2
默認情況下,不設置并行度,會把結果寫到一個文件中,如果設置并行度,那么每一個并行度都對應一個輸出。
public static void main(String[] args) throws Exception { ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); List<Integer> info = new ArrayList<>(); for(int i = 1;i <=10; i++) { info.add(i); } DataSource<Integer> data1 = executionEnvironment.fromCollection(info); String filePath = "E:/test2"; data1.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE); executionEnvironment.execute("JavaDataSetSinkApp"); }
看完上述內容,你們掌握如何入門ApacheFlink中的Flinksink的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。