中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

基于binlog的離線分析平臺的一些初步實踐

發布時間:2020-07-24 17:49:47 來源:網絡 閱讀:625 作者:我的二狗呢 欄目:系統運維

基于binlog的離線分析平臺的一些初步實踐

?

參考文檔:?

http://quarterback.cn/%e9%80%9a%e8%bf%87kafka-nifi%e5%bf%ab%e9%80%9f%e6%9e%84%e5%bb%ba%e5%bc%82%e6%ad%a5%e6%8c%81%e4%b9%85%e5%8c%96mongodb%e6%9e%b6%e6%9e%84/

http://seanlook.com/2018/01/13/maxwell-binlog/

https://yq.aliyun.com/articles/338423


直接上圖

方案1

基于binlog的離線分析平臺的一些初步實踐


方案2

基于binlog的離線分析平臺的一些初步實踐



方案3

基于binlog的離線分析平臺的一些初步實踐




方案1的比較簡單,基本上也是滿足使用,也是不錯的選擇。但是功能上比較單一。

方案2比較復雜,引入了更多的組件,將數據存到MongoDB里面。這種引入了kafka的比較適合有多個異構數據庫或者DW數倉抽數的場景。

方案3也比較復雜,和方案2類似,區別就是將數據存到ES里面,并且graylog自帶了一個web查詢的界面。

?

?

這里我們實驗采用的是方案2,先把binlog采集到kafka,然后就可以任意自由消費binlog,更加靈活些。


?

實驗涉及到的軟件:

OS 版本:CentOS7.5

maxwell 版本:1.22.4

nifi 版本:1.9.2

kafka-eagle 版本:1.3.9

?

?

maxwell部署節點: 192.168.20.10

zk+kafka部署節點: 192.168.2.4

kafka-eagle部署的節點: 192.168.2.4

nifi部署的節點: 192.168.2.4

模擬的業務MySQL數據庫:192.168.2.4:3306

?

?

?

kafkazk

kafka zk的部署,不是這里的重點。我這里的zkkafka都是部署在 192.168.2.4上面的,這里的具體操作我直接跳過。

?

我實驗中, zkkafka都是單機部署的,生產環境下一定要使用集群模式。

?

1、最好將主機名和ip關系,寫到各主機的 /etc/hosts 中,不然可能遇到解析失敗的情況

2、需要注意的是,我這里的zk是高版本的,默認會監聽 8080端口,建議改成其他的,把8080端口留給其它服務使用。

[root@Test-dba01 /usr/local/zookeeper-3.5.5-bin ] # cat conf/zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=./data/

clientPort=2181

admin.serverPort=12345

?

啟動后,可以看到監聽的端口起來了

[root@Test-dba01 /usr/local/kafka ] # ss -lnt| egrep 2181

LISTEN???? 0????? 50????????? :::2181??????????????????? :::*

[root@Test-dba01 /usr/local/kafka ] # ss -lnt| egrep 12345

LISTEN???? 0????? 50????????? :::12345?????????????????? :::*?

?

?

?

kafka-eagle

kafka-eagle 是國內的一個大佬開發出來的, 我這里用到它主要是喜歡它附帶的ksql功能,支持直接查詢kafkatopic里面的數據。

?

此外,這個工具還有很多好用的功能,這里我就不介紹了。

?

貼下我的配置

cd /root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9

egrep -v '^$|^#' /root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9/conf/system-config.properties

kafka.eagle.zk.cluster.alias=cluster1

cluster1.zk.list=192.168.2.4:2181

kafka.zk.limit.size=25

kafka.eagle.webui.port=8048

cluster1.kafka.eagle.offset.storage=kafka

cluster2.kafka.eagle.offset.storage=zk

kafka.eagle.metrics.charts=false

kafka.eagle.sql.fix.error=false

kafka.eagle.sql.topic.records.max=5000

kafka.eagle.mail.enable=false

kafka.eagle.mail.sa=alert_sa@163.com

kafka.eagle.mail.username=alert_sa@163.com

kafka.eagle.mail.password=mqslimczkdqabbbh322222

kafka.eagle.mail.server.host=smtp.163.com

kafka.eagle.mail.server.port=25

kafka.eagle.topic.token=keadmin

cluster1.kafka.eagle.sasl.enable=false

cluster1.kafka.eagle.sasl.protocol=SASL_PLAINTEXT

cluster1.kafka.eagle.sasl.mechanism=PLAIN

cluster1.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka-eagle";

cluster2.kafka.eagle.sasl.enable=false

cluster2.kafka.eagle.sasl.protocol=SASL_PLAINTEXT

cluster2.kafka.eagle.sasl.mechanism=PLAIN

cluster2.kafka.eagle.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="kafka-eagle";

kafka.eagle.driver=org.sqlite.JDBC

kafka.eagle.url=jdbc:sqlite:./db/ke.db

kafka.eagle.username=root

kafka.eagle.password=www.kafka-eagle.org

主要就是修改了下zk的地址和sqlite數據庫的路徑,其它保持默認

?

啟動進程:

export KE_HOME=/root/kafka-eagle-bin-1.3.9/kafka-eagle-web-1.3.9

export PATH=$PATH:$KE_HOME/bin

./bin/ke.sh start

?

?

登錄web頁面

http://192.168.2.4:8048/ke/

用戶名 admin

密碼 123456

?

具體功能,大家自由探索,整個工具還是很強大的。

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐



maxwell

?

maxwell 使用的是 1.22.4 版本

?

?

0、在 192.168.2.4mysql開通賬號,便于maxwell連接上去拉取binlog

mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';

mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

?

?

1、在192.168.20.10上部署 maxwell

cd /usr/local/

curl -sLo - https://github.com/zendesk/maxwell/releases/download/v1.22.4/maxwell-1.22.4.tar.gz | tar zxvf -

cd maxwell-1.22.4/

?

?

2、輸出到kafka的方式

2.1 拷貝 kafka-clients-2.3.0.jar maxwelllib/kafka-clients/目錄下

?

2.2 修改配置文件

cp config.properties.example config.properties? 然后修改下, 修改后的內容如下:

?

log_level=info

?

producer=kafka

?

# maxwell的元數據存放的MySQL的連接信息

host=localhost

user=maxwell

password=maxwell

producer=kafka

host=127.0.0.1

port=3306

user=maxwell

password=XXXXXX

schema_database=maxwell

?

gtid_mode=true

ssl=DISABLED

replication_ssl=DISABLED

schema_ssl=DISABLED

?

# 上游MySQL的連接信息

replication_host=192.168.2.4

replication_user=maxwell

replication_password=XXXXXX

replication_port=3306

?

# 定義需要輸出哪些數據

output_binlog_position=true

output_gtid_position=true

output_nulls=true

output_server_id=true

output_ddl=true

output_commit_info=true

?

kafka.bootstrap.servers=192.168.2.4:9092?? # 生產環境上,這里需要填多個kafka的連接方式

kafka_topic=maxwell

ddl_kafka_topic=maxwell_ddl

kafka.compression.type=snappy

kafka.retries=5

kafka.acks=1

producer_partition_by=database

?

# 下面是復制的過濾規則,不符合下面條件的binlog不會被保留下來【支持正則表達式】

# filter= exclude: test.*, include: db.*, include: coupons.*, include: testdb.user

?

# 暴露metrics地址用于監控

metrics_type=http

metrics_prefix=MaxwellMetrics

metrics_jvm=true

http_port=8081

?

?

?

2.3? 前臺啟動

啟動前,先去創建2topic

??? bin/kafka-topics.sh --zookeeper 192.168.2.4:2181 --create --topic maxwell --partitions 20 --replication-factor 2

??? bin/kafka-topics.sh --zookeeper 192.168.2.4:2181 --create --topic maxwell_ddl --partitions 6 --replication-factor 2

?

測試期間,我們先前臺啟動maxwell進程

??? bin/maxwell --config config.properties --producer=kafka --kafka_version=2.3.0

?

另外建議:在 192.168.2.4 上我們啟動2個前臺consumer進程,用于觀察數據進入kafka的情況:

??? cd /opt/kafka1/bin/

??? ./kafka-console-consumer.sh --bootstrap-server 192.168.2.4:9092 --topic maxwell

??? ./kafka-console-consumer.sh --bootstrap-server 192.168.2.4:9092 --topic maxwell_ddl

?

maxwell topic里面的數據;類似這樣:

{"database":"test","table":"resourcesinfo","type":"delete","ts":1571644826,"xid":5872872,"xoffset":78,"position":"mysql-bin.0003306,"data":{"id":94,"name":"222","hostname":"33","spec":"","belong":"","createtime":"0000-00-00 00:00:00.000000"}}

?

?

maxwell_ddl topic里面的數據;類似這樣:

{"type":"table-create","database":"leaf","table":"d2sf","def":{"database":"leaf","charset":"utf8mb4","table":"d2sf","columns":[{"type":"varchar","name":"biz_tag","charset":"utf8mb4"},{"type":"bigint","name":"max_id","signed":true},{"type":"int","name":"step","signed":true},{"type":"varchar","name":"description","charset":"utf8mb4"},{"type":"timestamp","name":"update_time","column-length":0}],"primary-key":["biz_tag"]},"ts":1571642076000,"sql":"create? table d2sf like leaf_alloc","position":"mysql-bin.000003:172413504","gtid":"fd2adbd9-e263-11e8-847a-141877487b3d:1386014"}

?



搭建MongoDB復制集

不是這里的重點步驟。

我這里是在 192.168.2.4上,部署的單機多實例的mongodb復制集。

192.168.2.4:27017 standby

192.168.2.4:27017 primary

192.168.2.4:27019 ARBITER

?

沒有設置密碼登錄。

然后,創建個測試用的數據庫和表

production:PRIMARY> use testdb

production:PRIMARY> db.createCollection("maxwell")


搭建NIFI 這里是關鍵

NIFI是一個ETL工具,比較簡單。

?

cd /root/

tar xf nifi-1.9.2.tar.gz -C ./

cd /root/nifi-1.9.2

?

我們這里也不優化相關參數了,先嘗試跑起來看看效果

./bin/nifi.sh start

?

稍等3分鐘,查看下狀態

./bin/nifi.sh status

Java home: /usr/local/jdk

NiFi home: /root/nifi-1.9.2

?

Bootstrap Config File: /root/nifi-1.9.2/conf/bootstrap.conf

?

2019-10-21 17:46:48,372 INFO [main] org.apache.nifi.bootstrap.Command Apache NiFi is currently running, listening to Bootstrap on port 43024, PID=130790

?

?

訪問web界面

http://192.168.2.4:8080/nifi/


拖動 "process group" 這個按鈕,到網頁中間,創建一個名為test? "process group"

然后雙擊 test這個方框,在這個頁面上,創建一個2processpor,并用線條連接起來??

高能預警: 下面的配置操作,有點難度,我貼的圖也不太好敘述,不一定能幫到您,如果有問題需要自己再摸索下!

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐

基于binlog的離線分析平臺的一些初步實踐


然后,我們再 192.168.2.4 上,隨便的crud些數據, 看看 NIFI 界面上是否有數值的變化。

?

如果,這里沒問題后。我們到mongodb數據庫里面看看數據是否進去了。

?

?

驗證數據及后續數據的加工處理

到mongodb里面,查看是否有數據進來

?

use maxwell

db.maxwell.findOne()

?

有數據后,我們就可以繼續基于mongodb的各種操作了

db.maxwell.createIndex({ts:1},{background:true})

db.maxwell.createIndex({table:1},{background:true})

db.maxwell.createIndex({database:1},{background:true})

db.maxwell.createIndex({database:1,table:1},{background:true})

?

?

?

?

db.maxwell.find({"table":"tbsdb"}).pretty()

?

db.maxwell.find({"table":"leaf_alloc"}).pretty()

?

?

?

db.maxwell.find({"database":"leaf"}).pretty()

db.maxwell.find({"database":"test"}).pretty() 日志類似這樣:

基于binlog的離線分析平臺的一些初步實踐


統計某個時間范圍內的操作:

db.maxwell.count({'ts':{$lt:1571673600,$gt:1571587200},"database":"test","type":"delete"})

db.maxwell.count({'ts':{$lt:1571673600,$gt:1571587200},"database":"test","type":"update"})

db.maxwell.count({'ts':{$lt:1571673600,$gt:1571587200},"database":"test","type":"insert"})




向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

木兰县| 龙门县| 民勤县| 德令哈市| 江西省| 泰宁县| 徐闻县| 上栗县| 巴彦淖尔市| 无锡市| 赤城县| 酒泉市| 象山县| 尼玛县| 鄂伦春自治旗| 漳州市| 凤山县| 汽车| 长乐市| 红安县| 三穗县| 定西市| 高碑店市| 文成县| 瓦房店市| 蚌埠市| 抚远县| 安福县| 公主岭市| 阿鲁科尔沁旗| 栾川县| 仙居县| 侯马市| 错那县| 广东省| 白山市| 咸阳市| 来安县| 石河子市| 田东县| 西乌珠穆沁旗|