中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

消息隊列之kafka(集群搭建及shell操作)

發布時間:2020-07-19 08:23:29 來源:網絡 閱讀:429 作者:原生zzy 欄目:大數據

1.kafka集群搭建

? kafka安裝包下載地址:
?官網網址:http://kafka.apache.org/quickstart
?中文官網:http://kafka.apachecn.org/quickstart.html
?在 windows 平臺,從官網下載:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/
?在 centos 平臺:wgethttp://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

(1)集群部署的基礎環境準備:

?A: 安裝 JDK 1.8
?A: 安裝 zookeeper 集群(也可以使用自帶 ZooKeeper,但是不推薦)

(2)集體搭建:

? 版本:kafka_2.11-1.1.0
?集群規劃:hadoop01、hadoop02、hadoop03 (三個節點)
① 解壓安裝包到對應的目錄
?tar zxvfkafka_2.11-1.1.0.tgz -C /application/
② 修改配置文件
?[hadoop@hadoop01 ~]$ cd /application/kafka_2.11-1.1.0/config/
?[hadoop@hadoop01 ~]$ vim server.properties
??broker.id=5 ## 當前集群中的每個 broker 節點的一個唯一編號,每個節點都不一樣
消息隊列之kafka(集群搭建及shell操作)
?
??listeners=PLAINTEXT://:9092
??listeners=PLAINTEXT://hadoop01:9092
??host.name=hadoop01## 每個節點指定為當前主機名,上面也是
消息隊列之kafka(集群搭建及shell操作)
?
??log.dirs=/home/hadoop/data/kafka-logs ## kafkabroke工作節點數據存儲目錄
??num.partitions=1 ## kafka 的 topic 的默認分區數
消息隊列之kafka(集群搭建及shell操作)
?
??log.retention.hours=168 ## 日志的最長保存時間
消息隊列之kafka(集群搭建及shell操作)
?
??zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181 ## zookeeper 地址
消息隊列之kafka(集群搭建及shell操作)
③ 批量發送
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop02:$PWD
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop03:$PWD
千萬注意:要修改$KAFKA_HOME/config/server.properties 文件中的對應 broker 節點的信息

broker.id=your broker id 
host.name=your broker hostname 
advertised.listeners=PLAINTEXT:// your broker hostname:9092

④ 配置環境變量
[hadoop@hadoop01 application]$ sudo etc/profile
export KAFKA_HOME=/application/kafka_2.11-1.1.0
[hadoop@hadoop01 application]$source/etc/profile

⑤ 啟動集群,進行驗證(每一個節點都要啟動)

nohup kafka-server-start.sh \
/application/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \
2>~/logs/kafka_err.log &

2.kafka相關shell

(1)啟動集群每個節點的進程

nohup kafka-server-start.sh \
 /home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties \
1>~/logs/kafka_std.log \ 2>~/logs/kafka_err.log &

(2) 創建 topic

kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 1 \
--topic kafka_test
**參數介紹**
--create 創建 kafka topic
--zookeeper 指定 kafka 的 zookeeper 地址
--partitions 指定分區的個數
--replication-factor 指定每個分區的副本個數

(3) 查看已經創建的所有 kafka topic

kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--list \

(4) 查看某個指定的 kafka topic 的詳細信息

kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--describe \   #查看詳信息
--topic kafka_test  #指定需要查看的topic

消息隊列之kafka(集群搭建及shell操作)
Topic:topic的名稱
Partition:topic的分區編號
Leader:負責處理消息和讀寫,leader是從所有節點中隨機選出
Replicas:列出了所有的副本節點,不管節點是否在服務中。
isr:正在服務中的節點。
(5) 開啟生產者模擬生成數據:

kafka-console-producer.sh \
--broker-list hadoop01:9092 \  # broker的節點列表
--topic kafka_test

(6) 開啟消費者模擬消費數據:

kafka-console-consumer.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--from-beginning \  #從哪里開始消費
--topic kafka_test

(7) 查看某 topic 某個分區的偏移量最大值和最小值

kafka-run-class.sh \
kafka.tools.GetOffsetShell \
--topic kafka_test \
--time -1 \
--broker-list hadoop01:9092 \
--partitions 1

(8) 增加 topic 分區數(這個操作是不被允許的)

kafka-topics.sh \
--alter \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \
--partitions 5 /
--replication-factor 2

(9) 刪除 Topic

kafka-topics.sh \
--delete \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--topic kafka_test \

3.kafka的consumer group 位移offset重設

   這一節,說來也巧,最近接收的項目是準實時(OOG+kafka+stream),并同事也跟我聊了聊小編博客的建議,大體上說,就是博客寫的太過簡單,雖然通俗易懂,但是真正實際應用的地方并不多,建議我能加上一些復雜并且經常用到的東西;說白了就是懶的找資料,想看小編現成的;沒辦法,那就加上一些比較常用的。
  首先小編就介紹下,針對kafka通過group 消費了topic的數據后,如何自定義kafka數據消費的位置,之前的操作都是“--from-beginning”,每次都是從頭開始消費,如果消費語句為精準一次,那么該如何操作呢?
  這里通過如何使用Kafka自帶的kafka-consumer-groups.sh腳本隨意設置消費者組(consumer group)的位移。需要特別強調的是,這是2.11-0.11.0.0版本提供的新功能且只適用于新版本consumer。在新版本之前,如果要為已有的consumer group調整位移必須要手動編寫Java程序調用KafkaConsumer#seek方法,費時費力不說還容易出錯。0.11.0.0版本豐富了kafka-consumer-groups腳本的功能,用戶可以直接使用該腳本很方便地為已有的consumer group重新設置位移,但前提必須是consumer group必須是inactive的,即不能是處于正在工作中的狀態
  這里首先介紹一下重設位移的三個步驟:
    - 確定consumer group在topic下的作用域
      → --all-topics :為consumer group下所有topic的所有分區調整位移
      → --topic t1 --topic t2:為指定的若干個topic的所有分區調整位移
      → --topic t1:0,1,2:為指定的topic分區調整位移

    - 確定位移重設策略
      → --to-earliest:把位移調整到分區當前最小位移
      → --to-latest: 把位移調整到分區當前最新位移
      → --to-current:把位移調整到分區當前位移
      → --to-offset <offset>:把位移調整到指定位移處
      → --shift-by N:把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
      → --to-datetime <datetime>:把位移調整到大于給定時間的最早位移處,datetime格式是yyyy-MM-ddTHH:mm:ss.xxx,比如2017-08-04T00:00:00.000
      → --by-duration <duration>:把位移調整到距離當前時間指定間隔的位移處,duration格式是PnDTnHnMnS,比如PT0H5M0S
    - 確定執行方案
      → 什么參數都不加:只是打印出位移調整方案,不具體執行
      → --execute:執行真正的位移調整
      → -export:把位移調整方案按照CSV格式打印,方便用戶成csv文件,供后續直接使用

   具體案例演示:
#創建topic,并向其中生產數據

①   創建topic
kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 \
--replication-factor 1 \
--partitions 3 \
--topic 'test-group'

②   創建生產者,生產數據
kafka-producer-perf-test.sh \
--topic 'test-group' \
--num-records 500 \
--throughput -1 \
--record-size 100 \
--producer-props bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092 acks=-1
③   啟動一個console consumer程序,組名設置為test-group:
kafka-console-consumer.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--topic 'test-group' \
--from-beginning \
--consumer-property group.id=test-group

④   查看當前消費者組消費topic 的細節
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--describe

由圖可知,當前消費者中將topic中的數據完全消費。 LAG表示剩余未消費的message。

#案例演示

## --to-earliest:將偏移量設置為partition開頭 
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-earliest \
--execute
## --to-latest:把位移調整到分區當前最新位移
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-latest \
--execute
## --to-offset:把位移調整到指定位移處
kafka-consumer-groups.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--group test-group \
--reset-offsets \
--all-topics \
--to-offset 100 \
--execute
    ## --to-current:把位移調整到分區當前位移
    kafka-consumer-groups.sh \
    --bootstrap-server  hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --to-current \
    --execute

    ## --shift-by N  把位移調整到當前位移 + N處,注意N可以是負數,表示向前移動
        kafka-consumer-groups.sh \
        --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
        --group test-group \
        --reset-offsets \
        --all-topics \
        --shift-by -100 \
        --execute
    ##  --to-datetime :將offset調整到大于XX日期最早的位移出
    kafka-consumer-groups.sh \
    --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --to-datetime 2019-07-31T03:40:33.000
    ## --by-duration  把位移調整到距離當前時間指定間隔的位移處
    kafka-consumer-groups.sh \
    --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
    --group test-group \
    --reset-offsets \
    --all-topics \
    --by-duration PT0H20M0S
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

垫江县| 久治县| 高雄市| 衢州市| 治县。| 老河口市| 任丘市| 绩溪县| 司法| 镇平县| 海安县| 拉萨市| 大庆市| 荣昌县| 陇南市| 香河县| 喀喇沁旗| 当雄县| 中宁县| 平安县| 崇文区| 阿图什市| 定西市| 迁西县| 广平县| 大关县| 米林县| 黄石市| 商河县| 临安市| 阿尔山市| 南皮县| 武城县| 通河县| 德昌县| 蒙山县| 山西省| 内江市| 怀安县| 正安县| 玛纳斯县|