您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關Flink中Connectors如何連接RabbitMq的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
通過使用Flink DataStream Connectors 數據流連接器連接到RabbitMq消息隊列中間件,并提供數據流輸入與輸出操作;
示例環境
java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7
示例數據源 (項目碼云下載)
Flink 系例 之 搭建開發環境與數據
示例模塊 (pom.xml)
Flink 系例 之 DataStream Connectors 與 示例模塊
數據流輸入
DataStreamSource.java
package com.flink.examples.rabbitmq; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; /** * @Description 從MQ中獲取數據并輸出到DataStream流中 */ public class DataStreamSource { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); final DataStream<String> stream = env .addSource(new RMQSource<String>( connectionConfig, "test", true, new SimpleStringSchema())) .setParallelism(1); stream.print(); env.execute("flink rabbitMq source"); } }
數據流輸出
DataStreamSink.java
package com.flink.examples.rabbitmq; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; /** * @Description 將DataStream流中的數據輸出到rabbitMq隊列中 */ public class DataStreamSink { /** * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); String [] words = new String[]{"props","student","build","name","execute"}; final DataStream<String> stream = env.fromElements(words); stream.addSink(new RMQSink<String>(connectionConfig,"test",new SimpleStringSchema())); env.execute("flink rabbitMq sink"); } }
數據展示
感謝各位的閱讀!關于“Flink中Connectors如何連接RabbitMq”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。