中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Kafka中如何將數據導入到Elasticsearch

發布時間:2021-12-15 16:00:30 來源:億速云 閱讀:805 作者:柒染 欄目:大數據

本篇文章為大家展示了Kafka中如何將數據導入到Elasticsearch,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

Elasticsearch作為當前主流的 全文檢索 引擎,除了強大的全文檢索能力和高擴展性之外,對多種數據源的兼容能力也是其成功的秘訣之一。而Elasticsearch強大的數據源兼容能力,主要來源于其核心組件之一的Logstash, Logstash通過插件的形式實現了對多種數據源的輸入和輸出。Kafka是一種高吞吐量的分布式發布訂閱消息系統,是一種常見的數據源,也是Logstash支持的眾多輸入輸出源的其中一個。將從實踐的角度,研究使用Logstash Kafka Input插件實現將Kafka中數據導入到Elasticsearch的過程。

Kafka中如何將數據導入到Elasticsearch

使用Logstash Kafka插件連接Kafka和Elasticsearch

1 Logstash Kafka input插件簡介

Logstash Kafka Input插件使用Kafka API從Kafka topic中讀取數據信息,使用時需要注意Kafka的版本及對應的插件版本是否一致。該插件支持通過SSL和Kerveros SASL方式連接Kafka。另外該插件提供了group管理,并使用默認的offset管理策略來操作Kafka topic。

Logstash默認情況下會使用一個單獨的group來訂閱Kafka消息,每個Logstash Kafka Consumer會使用多個線程來增加吞吐量。當然也可以多個Logstash實例使用同一個group_id,來均衡負載。另外建議把Consumer的個數設置為Kafka分區的大小,以提供更好的性能。

2 測試環境準備 2.1 創建Elasticsearch集群

為了簡化搭建過程,本文使用了騰訊云Elasticsearch service。騰訊云Elasticsearch service不僅可以實現Elasticsearch集群的快速搭建,還提供了內置Kibana,集群監控,專用主節點,Ik分詞插件等功能,極大的簡化了Elasticsearch集群的創建和管理工作。

2.2 創建Kafka服務

Kafka服務的搭建采用騰訊云CKafka來完成。與Elasticsearch  Service 一樣,騰訊云CKafka可以實現Kafka服務的快速創建,100%兼容開源Kafka API(0.9版本)。

2.3 服務器

除了準備Elasticsearch和Kafka,另外還需要準備一臺服務器,用于運行Logstash以連接Elasticsearch和Kafka。本文采用騰訊云CVM服務器

2.4 注意事項

1) 需要將Elasticsearch、Kafka和服務器創建在同一個網絡下,以便實現網絡互通。由于本文采用的是騰訊云相關的技術服務,因此只需要將Elasticsearch service,CKafka和CVM創建在同一個私有網路(VPC)下即可。

2) 注意獲取Elasticsearch serivce,CKafka和CVM的內網地址和端口,以便后續服務使用

本次測試中:

服務 ip port




Elasticsearch service192.168.0.89200
Ckafka192.168.13.109092
CVM192.168.0.13-

3 使用Logstash連接Elasticsearch和Kafka 3.1 Kafka準備

可以參考[CKafka 使用入門]

按照上面的教程

1) 創建名為kafka_es_test的topic

2) 安裝JDK

3) 安裝Kafka工具包

4) 創建producer和consumer驗證kafka功能

3.2 安裝Logstash

Logstash的安裝和使用可以參考[一文快速上手Logstash]

3.3 配置Logstash Kafka input插件

創建kafka_test_pipeline.conf文件內容如下:

input{
        kafka{
                bootstrap_servers=>"192.168.13.10:9092"
                topics=>["kafka_es_test"]
                group_id=>"logstash_kafka_test"
        }
}
output{
        elasticsearch{
                hosts=>["192.168.0.8:9200"]
        }
}

其中定義了一個kafka的input和一個elasticsearch的output

對于Kafka input插件上述三個參數為必填參數,除此之外還有一些對插件行為進行調整的一些參數如:

auto_commit_interval_ms 用于設置Consumer提交offset給Kafka的時間間隔

consumer_threads 用于設置Consumer的線程數,默認為1,實際中應設置與Kafka Topic分區數一致

fetch_max_wait_ms 用于指定Consumer等待一個fetch請求達到fetch_min_bytes的最長時間

fetch_min_bytes 用于指定Consumer fetch請求應返回的最小數據量

topics_pattern 用于通過正則訂閱符合某一規則的一組topic

更多參數參考:[Kafka Input Configuration Options]

3.4 啟動Logstash

以下操作在Logstash根目錄中進行

1) 驗證配置

./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit

如有錯誤,根據提示修改配置文件。若配置正確會得到如下結果

Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties
[2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"}
[2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"}
Configuration OK
[2018-11-11T15:24:01,746][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash

2) 啟動Logstash

./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic

觀察日志是否有錯誤提示,并及時處理

3.4 啟動Kafka Producer

以下操作在Kafka工具包根目錄下進行

./bin/kafka-console-producer.sh --broker-list 192.168.13.10:9092 --topic kafka_es_test

寫入測試數據

This is a message

3.5 Kibana驗證結果

登錄Elasticsearch對應Kibana, 在Dev Tools中進行如下操作

1) 查看索引

GET _cat/indices

可以看到一個名為logstash-xxx.xx.xx的索引被創建成功

green open .kibana             QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb
green open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb

2) 查看寫入的數據

GET logstash-2018.11.11/_search

可以看到數據已經被成功寫入

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "logstash-2018.11.11",
        "_type": "logs",
        "_id": "AWcBsEegMu-Dkjm1ap3H",
        "_score": 1,
        "_source": {
          "message": "This is a message",
          "@version": "1",
          "@timestamp": "2018-11-11T07:33:09.079Z"
        }
      }
    ]
  }
}

Logstash作為Elastic Stack中數據采集和處理的核心組件,為Elasticsearch提供了強大的數據源兼容能力。從測試過程可以看出,使用Logstash實現kafka和Elaticsearch的連接過程相當簡單方便。另外Logstash的數據處理功能,也使得采用該架構的系統對數據映射和處理有天然的優勢。

上述內容就是Kafka中如何將數據導入到Elasticsearch,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

延庆县| 西畴县| 临西县| 博兴县| 新闻| 兴业县| 安乡县| 洞头县| 偃师市| 曲阳县| 平罗县| 绿春县| 和龙市| 延安市| 通榆县| 沙雅县| 郎溪县| 阳东县| 古浪县| 澎湖县| 呼伦贝尔市| 富宁县| 理塘县| 桑日县| 兴海县| 伽师县| 庆阳市| 类乌齐县| 白银市| 收藏| 尉犁县| 雷波县| 南阳市| 威宁| 衡阳县| 苏尼特左旗| 黄山市| 临泽县| 双牌县| 唐山市| 叙永县|