您好,登錄后才能下訂單哦!
這篇文章主要介紹Flink中Connectors如何連接Kafka,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
通過使用Flink DataStream Connectors 數據流連接器連接到ElasticSearch搜索引擎的文檔數據庫Index,并提供數據流輸入與輸出操作;
示例環境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
數據流輸入
DataStreamSource.java
package com.flink.examples.kafka; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; /** * @Description 從Kafka中消費數據 */ public class DataStreamSource { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置并行度(使用幾個CPU核心) env.setParallelism(1); //每隔2000ms進行啟動一個檢查點 env.enableCheckpointing(2000); //設置模式為exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有進行500 ms的進度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //1.消費者客戶端連接到kafka Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); //setStartFromEarliest()會從最早的數據開始進行消費,忽略存儲的offset信息 //consumer.setStartFromEarliest(); //Flink從topic中指定的時間點開始消費,指定時間點之前的數據忽略 //consumer.setStartFromTimestamp(1559801580000L); //Flink從topic中最新的數據開始消費 //consumer.setStartFromLatest(); //Flink從topic中指定的group上次消費的位置開始消費,所以必須配置group.id參數 //consumer.setStartFromGroupOffsets(); //2.在算子中進行處理 DataStream<TUser> sourceStream = env.addSource(consumer) .filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value)) .map((MapFunction<String, TUser>) value -> { System.out.println("print:" + value); //注意,因已開啟enableCheckpointing容錯定期檢查狀態機制,當算子出現錯誤時, //會導致數據流恢復到最新checkpoint的狀態,并從存儲在checkpoint中的offset開始重新消費Kafka中的消息。 //因此會有可能導制數據重復消費,重復錯誤,陷入死循環。加上try|catch,捕獲錯誤后再正確輸出。 Gson gson = new Gson(); try { TUser user = gson.fromJson(value, TUser.class); return user; }catch(Exception e){ System.out.println("error:" + e.getMessage()); } return new TUser(); }) .returns(TUser.class); sourceStream.print(); //3.執行 env.execute("flink kafka source"); } }
數據流輸出
DataStreamSink.java
package com.flink.examples.kafka; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; /** * @Description 將生產者數據寫入到kafka */ public class DataStreamSink { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //必需設置setParallelism并行度,否則不會輸出 env.setParallelism(1); //每隔2000ms進行啟動一個檢查點 env.enableCheckpointing(2000); //設置模式為exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有進行500 ms的進度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鐘內完成,或者被丟棄 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只允許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //1.連接kafka Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props); //2.創建數據,并寫入數據到流中 TUser user = new TUser(); user.setId(8); user.setName("liu3"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(1598889600000L); DataStream<String> sourceStream = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value)); //3.將數據流輸入到kafka sourceStream.addSink(producer); sourceStream.print(); env.execute("flink kafka sink"); } }
在kafka上創建名稱為test的topic
先啟動DataStreamSource.java獲取輸出流,在啟動DataStreamSink.java輸入流
數據展示
以上是“Flink中Connectors如何連接Kafka”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。