中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么利用docker和docker-compose部署單機kafka

發布時間:2021-07-29 23:15:27 來源:億速云 閱讀:406 作者:chen 欄目:云計算

本篇內容介紹了“怎么利用docker和docker-compose部署單機kafka”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

前提

  1. docker

  2. docker-compose

其中docker-compose不是必須的,單單使用docker也是可以的,這里主要介紹docker和docker-compose兩種方式

docker部署

docker部署kafka非常簡單,只需要兩條命令即可完成kafka服務器的部署。

docker run -d --name zookeeper -p 2181:2181  wurstmeister/zookeeper
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --link zookeeper -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.60(機器IP):9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

由于kafka是需要和zookeeper共同工作的,所以需要部署一個zookeeper,但有了docker這對部署來說非常輕松.
可以通過docker ps查看到兩個容器的狀態,這里不再展示.

接下來可以進行生產者和消費者的嘗試

通過kafka自帶工具生產消費消息測試

  1. 首先,進入到kafka的docker容器中

docker exec -it kafka sh
  1. 運行消費者,進行消息的監聽

kafka-console-consumer.sh --bootstrap-server 192.168.1.60:9094 --topic kafeidou --from-beginning
  1. 打開一個新的ssh窗口,同樣進入kafka的容器中,執行下面這條命令生產消息

kafka-console-producer.sh --broker-list 192.168.1.60(機器IP):9092 --topic kafeidou

輸入完這條命令后會進入到控制臺,可以輸入任何想發送的消息,這里發送一個hello

>>
>hello
>
>
>
  1. 可以看到,在生產者的控制臺中輸入消息后,消費者的控制臺立刻看到了消息

到目前為止,一個kafka完整的hello world就完成了.kafka的部署加上生產者消費者測試.

通過java代碼進行測試

  1. 新建一個maven項目并加入以下依賴

<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>0.11.0.2</version>
    </dependency>
  1. 生產者代碼
    producer.java

import org.apache.kafka.clients.producer.*;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

public class HelloWorldProducer {
  public static void main(String[] args) {
    long events = 30;
    Random rnd = new Random();

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.1.60:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("message.timeout.ms", "3000");

    Producer<String, String> producer = new KafkaProducer<>(props);

    String topic = "kafeidou";

    for (long nEvents = 0; nEvents < events; nEvents++) {
      long runtime = new Date().getTime();
      String ip = "192.168.2." + rnd.nextInt(255);
      String msg = runtime + ",www.example.com," + ip;
      System.out.println(msg);
      ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, ip, msg);
      producer.send(data,
          new Callback() {
            public void onCompletion(RecordMetadata metadata, Exception e) {
              if(e != null) {
                e.printStackTrace();
              } else {
                System.out.println("The offset of the record we just sent is: " + metadata.offset());
              }
            }
          });
    }
    System.out.println("send message done");
    producer.close();
    System.exit(-1);
  }
}
  1. 消費者代碼
    consumer.java

import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloWorldConsumer2 {

  public static void main(String[] args) {
    Properties props = new Properties();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.60:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG ,"kafeidou_group") ;
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put("auto.offset.reset", "earliest");

    Consumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("kafeidou"));

    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(1000);
      for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
      }
    }
  }
}
  1. 分別運行生產者和消費者即可
    生產者打印消息

1581651496176,www.example.com,192.168.2.219
1581651497299,www.example.com,192.168.2.112
1581651497299,www.example.com,192.168.2.20

消費者打印消息

offset = 0, key = 192.168.2.202, value = 1581645295298,www.example.com,192.168.2.202
offset = 1, key = 192.168.2.102, value = 1581645295848,www.example.com,192.168.2.102
offset = 2, key = 192.168.2.63, value = 1581645295848,www.example.com,192.168.2.63

源碼地址:FISHStack/kafka-demo

通過docker-compose部署kafka

首先創建一個docker-compose.yml文件

version: '3.7'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    volumes:
      - ./data:/data
    ports:
      - 2182:2181
       
  kafka9094:
    image: wurstmeister/kafka
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.60:9092
      KAFKA_CREATE_TOPICS: "kafeidou:2:0"   #kafka啟動后初始化一個有2個partition(分區)0個副本名叫kafeidou的topic 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
    volumes:
      - ./kafka-logs:/kafka
    depends_on:
      - zookeeper

部署起來很簡單,在docker-compose.yml文件的目錄下執行docker-compose up -d就可以了,測試方式和上面的一樣。
這個docker-compose做的東西比上面docker方式部署的東西要多一些

  1. 數據持久化,在當前目錄下掛在了兩個目錄分別存儲zookeeper和kafka的數據,當然在docker run 命令中添加 -v 選項也是可以做到這樣的效果的

  2. kafka在啟動后會初始化一個有分區的topic,同樣的,docker run的時候添加-e KAFKA_CREATE_TOPICS=kafeidou:2:0也是可以做到的。

總結:優先推薦docker-compose方式部署

為什么呢?

因為單純使用docker方式部署的話,如果有改動(例如:修改對外開放的端口號)的情況下,docker需要把容器停止docker stop 容器ID/容器NAME,然后刪除容器docker rm 容器ID/容器NAME,最后啟動新效果的容器docker run ...

而如果在docker-compose部署的情況下如果修改內容只需要修改docker-compose.yml文件對應的地方,例如2181:2181改成2182:2182,然后再次在docker-compose.yml文件對應的目錄下執行docker-compose up -d就能達到更新后的效果。

“怎么利用docker和docker-compose部署單機kafka”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

金沙县| 平潭县| 永靖县| 乐至县| 盐池县| 洪江市| 西林县| 石棉县| 永顺县| 武城县| 许昌县| 昌黎县| 靖边县| 丹凤县| 紫金县| 昌邑市| 靖安县| 汤原县| 清远市| 明水县| 黔江区| 达孜县| 稷山县| 韶山市| 饶河县| 万山特区| 新密市| 昭觉县| 阿拉善左旗| 永年县| 巴彦县| 吉水县| 易门县| 六盘水市| 剑川县| 大竹县| 泸水县| 蒲江县| 江油市| 安阳市| 新郑市|