中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

消息隊列之kafka(API)

發布時間:2020-07-30 05:50:05 來源:網絡 閱讀:270 作者:原生zzy 欄目:大數據

1.模擬實現kafka的生產者消費者(原生API)

解決相關依賴:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
    <version>2.1.0</version>
</dependency>

生產者:

packagecom.zy.kafka;

importjava.util.Properties;

importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
    publicstaticvoidmain(String[] args) {
        //1.加載配置文件
        //1.1封裝配置文件對象
        Properties prps=newProperties();
        //配置broker地址
        prps.put("bootstrap.servers", "hadoop02:9092");
        //配置ack級別:0 1 -1(all)
        prps.put("acks", "all");
        //重試次數
        prps.put("retries", 3);

        prps.put("batch.size", 16384);
        prps.put("linger.ms",1);
        prps.put("buffer.memory", 33554432);

        //指定(message的K-V)的序列化
        prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //2.創建生產者對象(指定的key和value的泛型)
        Producer<String, String>producer=new KafkaProducer<>(prps);
        //生產者發送消息
        for(inti=0;i<100;i++) {
            /**
             * ProducerRecord<String, String>(topic, value)
             * topic:主題名稱
             * key:
             * value:
             */
            //消息的封裝對象
            ProducerRecord<String, String>pr=newProducerRecord<String, String>("test_topic", "key"+i, "value"+i);
            producer.send(pr);
        }
producer.close();
    }
}

消費者:

packagecom.zy.kafka;

importjava.util.Arrays;
importjava.util.Properties;

importorg.apache.kafka.clients.consumer.ConsumerRecord;
importorg.apache.kafka.clients.consumer.ConsumerRecords;
importorg.apache.kafka.clients.consumer.KafkaConsumer;
importorg.apache.kafka.clients.producer.KafkaProducer;
importorg.apache.kafka.clients.producer.Producer;
importorg.apache.kafka.clients.producer.ProducerRecord;
publicclassKafkaTest {
    publicstaticvoidmain(String[] args) {
        //1.加載配置文件
        //1.1封裝配置文件對象
        Properties prps=newProperties();
        //配置broker地址
        prps.put("bootstrap.servers", "hadoop02:9092");
        //指定消費的組的ID
        prps.put("group.id", "test");
        //是否啟動自動提交(是否自動提交反饋信息,向zookeeper提交)
        prps.put("enable.auto.commit", "true");
        //自動提交的時間間隔
        prps.put("auto.commit.interval.ms", "1000");

        //指定(message的K-V)的序列化
        prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //創建kafka的消費者
        KafkaConsumer<String, String>consumer=newKafkaConsumer<>(prps);
        //添加消費主題
        consumer.subscribe(Arrays.asList("kafka_test"));
        //開始消費
        while(true) {
            //設置從哪里開始消費,返回的是一個消費記錄
            ConsumerRecords<String, String>poll = consumer.poll(10);
            for(ConsumerRecord<String, String>p:poll) {
                System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value());
            }
        }
    }
}

2.以shell命令的方式API

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import kafka.admin.TopicCommand;

public class KafkaAPI {
    public static void main(String[] args) throws IOException {
        /* 
            kafka-topics.sh \
            --create \
            --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \
            --replication-factor 3 \
            --partitions 10 \
            --topic kafka_test11
         */
        //創建一個topic
        String ops[]=new String []{
            "--create",
            "--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181",
            "--replication-factor","3",
            "--topic","zy_topic","--partitions","5"
        };
        String list[]=new String[] {
                "--list",
                "--zookeeper",
                "hadoop01:2181,hadoop02:2181,hadoop03:2181"
        };
        //以命令的方式提交
        TopicCommand.main(list);
    }
}

3. 高級API操作

shell中常用操作:

#!/usr/bin/env bash
#查看kafka的topic
kafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181
#查看kafkatopic的偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic kafka_api_r1p1
#創建topic
kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --partitions 3  --replication-factor 1  --topic kafka_api_r1p3
#刪除topic
kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic act_inventory_r1p1_test1
#查看具體的group 的偏移量
kafka-consumer-groups.sh 

①簡單實現,kafka的消費者,并且將由kafka自動管理偏移量(單分區消費)

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * ?* Created with IntelliJ IDEA.
 * ?* User: ZZY
 * ?* Date: 2019/9/9
 * ?* Time: 19:44
 * ?* Description:? 簡單實現,kafka的消費者,并且將由kafka自動管理偏移量(單分區消費)
 */
public class MyConsumer01 {
    private static Properties props = new Properties();

    static {
        props.put("group.id", "kafka_api_group_2");
        //設置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //開啟offset自動提交
        props.put("enable.auto.commit", "true");
        //手動提交偏移量
        //props.put("enable.auto.commit", "false");
        //設置自動提交時間
        props.put("auto.commit.interval.ms", "100");
        //設置消費方式
        props.put("auto.offset.reset","earliest");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public static void main(String[] args) throws InterruptedException {
        String topic = "kafka_api_r1p1";
        //實例化一個消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消費者訂閱主題,可以訂閱多個主題
//        consumer.subscribe(Collections.singleton(topic));
        consumer.subscribe(Arrays.asList(topic));
        //死循環不停的從broker中拿數據
        while(true){
            ConsumerRecords<String, String> records = consumer.poll(10);
            for(ConsumerRecord<String, String> record : records){
                System.out.printf("offset=%d,key=%s,value=%s",record.offset(),
                        record.key(),record.value());
            }
            Thread.sleep(2000);
        }
        //consumer.commitAsync(); 提交偏移量信息
    }
}

②實現多分區消費

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

/**
 * ?* Created with IntelliJ IDEA.
 * ?* User: ZZY
 * ?* Date: 2019/9/10
 * ?* Time: 8:55
 * ?* Description:?實現多分區消費
 */
public class MyConsumer02 {
    private static Properties props = new Properties();
    static{
        //設置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //設置消費者組,組名字自定義,組名字相同的消費者在一個組
        props.put("group.id", "kafka_api_group_1");
        //開啟offset自動提交
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public static void main(String[] args) {
        String topicName="kafka_api_r1p3";
        //實例化一個消費者
        KafkaConsumer<String,String> consumer =new KafkaConsumer<>(props);
        //消費者訂閱主題,可以訂閱多個主題
        consumer.subscribe(Arrays.asList(topicName));
        while(true){
            ConsumerRecords<String, String> records  = consumer.poll(Long.MAX_VALUE);
            //獲取每個分區的數據
            for(TopicPartition partition :records.partitions()){
                System.out.println("開始消費第"+partition.partition()+"分區數據!");
                List<ConsumerRecord<String, String>> partitionRecords  = records.records(partition);
                //獲取每個分區里的records
                for(ConsumerRecord<String, String> partitionRecord:partitionRecords){
                    System.out.println("partition:"+partition.partition()+",key:"+partitionRecord.key()+",value"
                    +partitionRecord.value()+",offset:"+partitionRecord.offset());
                }
                //更新每個分區的偏移量(取分區中最后一個record的偏移量,就是這個分區的偏移量)
                long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();
                consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset +1)));
            }
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

③實現消費者從指定分區拉取數據
注意:
  (1)kafka提供的消費者組內的協調功能就不再有效
  (2)樣的寫法可能出現不同消費者分配了相同的分區,為了避免偏移量提交沖突,每個消費者實例的group_id要不重復

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

/**
 * ?* Created with IntelliJ IDEA.
 * ?* User: ZZY
 * ?* Date: 2019/9/10
 * ?* Time: 10:10
 * ?* Description:?消費者從指定分區拉取數據
 *      一旦指定特定的分區消費需要注意:
 *          (1)kafka提供的消費者組內的協調功能就不再有效
 *          (2)樣的寫法可能出現不同消費者分配了相同的分區,為了避免偏移量提交沖突,每個消費者實例的group_id要不重復
 */
public class MyConsumer03 {
    private static Properties props = new Properties();
    //實例化一個消費者
    static KafkaConsumer<String, String> consumer;
    static {
        //設置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //設置消費者組,組名字自定義,組名字相同的消費者在一個組
        props.put("group.id", "kafka_api_group_1");
        //開啟offset自動提交
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<>(props);
    }

    public static void main(String[] args) {
        //消費者訂閱主題,并設置要拉取的分區
        String topic="kafka_api_r1p3";
        int partitionNum=0;
        //消費者訂閱主題,并設置要拉取的分區
        TopicPartition partition0 =new TopicPartition(topic,partitionNum);
        consumer.assign(Arrays.asList(partition0));
        while(true){
            ConsumerRecords<String, String> records  = consumer.poll(Long.MAX_VALUE);
            for(TopicPartition partition : records.partitions()){
                List<ConsumerRecord<String, String>> partitionRecords  = records.records(partition);
                for(ConsumerRecord<String, String> partitionRecord:partitionRecords){
                    System.out.println("分區:"+partitionRecord.partition()+",key:"+partitionRecord.key()+",value:"
                            +partitionRecord.value()+"offset:"+partitionRecord.offset());
                }
                long lastOffset =partitionRecords.get(partitionRecords.size()-1).offset();
                consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(lastOffset+1)));
            }
        }
    }
}

④重置kafka組的offset

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

/**
 * ?* Created with IntelliJ IDEA.
 * ?* User: ZZY
 * ?* Date: 2019/9/10
 * ?* Time: 9:46
 * ?* Description:? 該API用于重置kafka組的offset
 */
public class ReSetOffset {
    //用于重置的offset
    final private static String group="kafka_api_group_1";
    final private static Properties props = new Properties();
    static KafkaConsumer<String,String> consumer;
    static{
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        props.put("group.id",group);
        props.put("enable.auto.commit", "true");
    //props.put("auto.offset.reset","earliest");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        consumer=new KafkaConsumer<String, String>(props);
    }
    public static String resetOffset(String topic,long offset){
        int partitionNums=getTopicPartitionNum(topic);
        for(int i=0;i<partitionNums;i++){
            TopicPartition tp=new TopicPartition(topic,i);
            //這里每重置一個分區的offset,就需要重新創建一個新的KafkaConsumer
            KafkaConsumer consumer_temp= new KafkaConsumer<String, String>(props);
            consumer_temp.assign(Arrays.asList(tp));
            consumer_temp.seek(tp,offset);
            consumer_temp.close();
        }
        consumer.close();
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss ");
        return dateFormat.format(new Date())+ group +" ResetOffset Succeed!!";
    }
    private  static int  getTopicPartitionNum(String topic){
        int partitionNums=consumer.partitionsFor(topic).size();
        return partitionNums;
    }

    public static void main(String[] args) {
        String topic="kafka_api_r1p1";
        System.out.println(ReSetOffset.resetOffset(topic,0));
    }
}

⑤多線程版本的消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * ?* Created with IntelliJ IDEA.
 * ?* User: ZZY
 * ?* Date: 2019/9/10
 * ?* Time: 10:45
 * ?* Description:?這是一個consumer的線程
 */
public class ConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String, String> consumer;
    private final CountDownLatch latch;

    public ConsumerRunner(KafkaConsumer<String, String> consumer, CountDownLatch latch) {
        this.consumer = consumer;
        this.latch = latch;
    }

    @Override
    public void run() {
        System.out.println("threadName....." + Thread.currentThread().getName());
        try {
            consumer.subscribe(Arrays.asList("kafka_api_r1p1"));
            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(150);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(), record.offset(), record.key(), record.value());
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (WakeupException e) {
            if(!closed.get()){
                throw e;
            }
        }finally {
            consumer.close();
            latch.countDown();
        }
    }
    public void shutdown(){
        System.out.println("close ConsumerRunner");
        closed.set(true);
        consumer.wakeup();
    }
}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * ?* Created with IntelliJ IDEA.
 * ?* User: ZZY
 * ?* Date: 2019/9/10
 * ?* Time: 10:52
 * ?* Description:? 這里主要測試多線程下的Consumer
 */
public class RunConsumer {
    private static Properties props = new Properties();
    static{
        //設置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //設置消費者組,組名字自定義,組名字相同的消費者在一個組
        props.put("group.id", "kafka_api_group_1");
        //開啟offset自動提交
        props.put("enable.auto.commit", "true");
        //自動提交時間間隔
        props.put("auto.commit.interval.ms", "1000");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    }

    public static void main(String[] args) {
        //實例化一個消費者
        final List<ConsumerRunner> consumers = new ArrayList<>();
        final List<KafkaConsumer<String, String>> kafkaConsumers = new ArrayList<>();
        for(int i=0;i<2;i++){
            kafkaConsumers.add(new KafkaConsumer<String, String>(props));
        }
        //倒計時,利用await方法使主線程阻塞,利用countDown遞減,當遞減到0時,喚醒主線程,功能類似于join
        final CountDownLatch latch = new CountDownLatch(2);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        for(int i=0;i<2;i++){
            ConsumerRunner  c= new ConsumerRunner(kafkaConsumers.get(i),latch);
            consumers.add(c);
            executor.submit(c);
        }
        /**
         * 這個方法的意思就是在jvm中增加一個關閉的鉤子,當JVM關閉時,會執行系統中已經設置的所有
         * 方法addShutdownHook添加的鉤子,當系統執行完成這些鉤子后,jvm才會關閉,
         *  所以這些鉤子可以在jvm關閉的時候進行內存清理、對象銷毀、關閉連接等操作。
         */
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                System.out.println("....................");
                for(ConsumerRunner consumer:consumers){
                    consumer.shutdown();
                }
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MICROSECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

台山市| 铅山县| 广丰县| 连州市| 东港市| 仙桃市| 泰顺县| 南和县| 麻栗坡县| 清新县| 武鸣县| 穆棱市| 江都市| 泽库县| 新蔡县| 文山县| 壶关县| 芦溪县| 修文县| 阿克苏市| 万宁市| 乌海市| 新营市| 内乡县| 基隆市| 哈密市| 安达市| 西畴县| 盱眙县| 新津县| 宁波市| 南靖县| 阜阳市| 讷河市| 鄱阳县| 洪泽县| 尚志市| 明水县| 乃东县| 日喀则市| 彭泽县|