中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

kafka發送消息的方式有哪些

發布時間:2023-05-08 17:25:59 來源:億速云 閱讀:190 作者:iii 欄目:開發技術

今天小編給大家分享一下kafka發送消息的方式有哪些的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

    kafka發送消息的方式

    package com.zl.kafkademo;
     
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.quartz.*;
    import org.quartz.impl.StdSchedulerFactory;
     
    import java.util.Properties;
     
    /**
     * @Auther: le
     * @Date: 2019/4/23 22:05
     * @Description:
     */
    public class MyProducer implements Job {
        private static KafkaProducer<String,String> producer;
     
        static {
            Properties properties = new Properties();
            properties.put("bootstrap.servers","127.0.0.1:9092");
            properties.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<String, String>(properties);
        }
     
        /**
         * 第一種直接發送,不管結果
         */
        private static void sendMessageForgetResult(){
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","Forget_result"
            );
            producer.send(record);
            producer.close();
        }
     
        /**
         * 第二種同步發送,等待執行結果
         * @return
         * @throws Exception
         */
        private static RecordMetadata sendMessageSync() throws Exception{
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","sync"
            );
            RecordMetadata result = producer.send(record).get();
            System.out.println(result.topic());
            System.out.println(result.partition());
            System.out.println(result.offset());
            return result;
        }
     
        /**
         * 第三種執行回調函數
         */
        private static void sendMessageCallback(){
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(
                    "kafka-study","name","callback"
            );
            producer.send(record,new MyProducerCallback());
        }
     
        //定時任務
        @Override
        public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
            try {
                sendMessageSync();
            }catch (Exception e){
                System.out.println("error:"+e);
            }
     
        }
     
        private static class MyProducerCallback implements Callback{
     
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e !=null){
                    e.printStackTrace();
                    return;
                }
                System.out.println(recordMetadata.topic());
                System.out.println(recordMetadata.partition());
                System.out.println(recordMetadata.offset());
                System.out.println("Coming in MyProducerCallback");
            }
        }
     
     
        public static void main(String[] args){
            //sendMessageForgetResult();
            //sendMessageCallback();
            JobDetail job = JobBuilder.newJob(MyProducer.class).build();
     
            Trigger trigger = TriggerBuilder.newTrigger()
                    .withSchedule(SimpleScheduleBuilder.repeatSecondlyForever()).build();
     
            try {
                Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
                scheduler.scheduleJob(job,trigger);
                scheduler.start();
            }catch (SchedulerException e){
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
     
        }
     
     
    }

    需要引入文件

            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.10.0.1</version>
            </dependency>
     
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.3.0</version>
            </dependency>

    測試方法

    MAC下操作指令

    1、創建主題:

    ./kafka-topics.sh --create --topic kafka-study --zookeeper 127.0.0.1:2181 --config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1

    2、運行上述程序,執行定時任務

    3、查看消費情況

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-study --from-beginning

    windows操作指令

     1、進入  D:\zookeeper-3.4.14\bin   打開新的cmd,輸入“zkServer“,運行Zookeeper

     2、進入 D:\kafka_2.11-0.11.0.0 運行cmd

    .\bin\windows\kafka-server-start.bat .\config\server.properties

    3、 創建主題

    進入D:\kafka_2.11-0.11.0.0運行cmd,輸入:

    .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

    查看已創建主題:

    .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

    查看指定主題的詳細信息:

    .\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test

    查看主題消費詳情:

    .\bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic kafka-study --from-beginning

    以上就是“kafka發送消息的方式有哪些”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    贵港市| 阳东县| 鲁山县| 深水埗区| 赤城县| 靖远县| 玉树县| 郴州市| 明水县| 海伦市| 上高县| 富平县| 营口市| 上思县| 皮山县| 黑龙江省| 怀集县| 拉萨市| 南涧| 内丘县| 安吉县| 太和县| 徐汇区| 濉溪县| 锡林郭勒盟| 长垣县| 金山区| 北京市| 长宁县| 长汀县| 鸡泽县| 峨眉山市| 贵港市| 静宁县| 德清县| 嘉黎县| 永丰县| 清水县| 桃园县| 蕲春县| 札达县|