您好,登錄后才能下訂單哦!
一、為什么選擇Kafka
為什么選Kafka?鑒于龐大的數據量,需要將其做成分布式,這時需要將Q里面的數據分到許多機器上進行存儲,除此之外還有分布式的計算需求。同時需要支持多語言,如Java、GO、php等,另外還有高可用的需求。
二、Kafka集群
Realtime的Kafka集群通過Mirror Maker將數據全部同步到Analysis的Kafka集群。
Realtime的Kafka集群主要負責在線實時讀寫,Analysis負責很多工作,諸如數據的導入導出,數據的多次流出給集群和網絡硬盤帶來了較大壓力。為保證線上的穩定性,要保證兩邊是隔開的。另外關于Topic目前有五萬多,每秒可能會有100多萬的數據流入流出。
三、Kafka的用戶使用問題
1.參數配置問題, Kafka有很多參數需要配置,常用的集群配置,延遲,重要性等,需要封裝。
開發測試不方便,使用者通常會有這樣的需求:我的數據寫進去沒,消費沒,他寫的數據長什么樣子,結構化的數據還需自己寫代碼來解析,等等。這些問題沒有工具和平臺來解決,會大大降低開發效率。
Topic申請不方便,topic是不能開放自己創建的,我們曾在測試環境開放過Topic,發現一周內漲到了好幾萬,而且參數千奇百怪,有全用默認參數的,有根據文檔,時間先來10個9的,也有partition直接來100的。工單方式對管理者很不友好,需要登上服務器敲命令,效率低下,且容易出錯。
結構化數據查詢不方便,瓜子的結構化使用的是AVRO, 序列化之后的數據很難查看原始數據。
消費異常定位不便,比如消費的數據或者位置不對,如果想要回滾重新消費或跳過臟數據就面臨各種問題。從哪個offset開始重新消費呢?或者跳到之后的哪個offset呢?另外就是滾動重啟了一個服務,結果發現消費的數據少了一批,很有可能是某一個隱藏的consumer同時在用這個consumer group,但是找了一圈沒找到哪個服務還沒關掉。
不知道下游,如果寫了生產者生產的Topic數據,卻不知道有哪些consumer,如果要對Topic信息發生改變時,不知該通知誰,這是很復雜的事情。要么先上,下游出問題了自己叫,要么躊躇不前,先收集下游,當然實際情況一般是前者,經常雞飛狗跳。
四、解決方案:Kafka platform
為解決上述問題,瓜子上線了Kafka platform,主要面向用戶和管理兩方面的功能。
面向用戶包括:查看數據,了解消費情況,方便地添加監控報警,以及如果出現問題后,快速查錯的工具。
管理方面包括: 權限管理, 申請審批,還有一些常用操作。比如,seek offset, 或是刪掉一個Topic,對partitions進行擴容等。
可以通過offset查詢對應offset的數據,也可以通過進入Kafka的大致時間,查詢那段內的數據,可以看到每條信息的partition,offset,入Kafka的時間,AVRO的版本信息等。
通過下圖顯示的界面可以查看一條消息,了解哪些consumer group已經消費了,哪些沒有消費。
同時可以查看它現在正在被哪個IP進行消費,這時我們可以方便地定位到有個consumer沒有關閉,它是在哪臺機器上,這些來自于我們的實踐經驗。還可以看到每個consumer group的消費延遲情況,精確到條數,partition的延遲。也可以看到partition消息總數,可以排查一些消息不均的問題。
下圖為監控報警,可以了解Topic的流入、流出數據,每秒寫入多少條消息,多大的size,每秒流出的情況。
報警是對Topic建一些流量報警,或是一些延遲報警,建好之后只需要訂閱一下即可,非常方便。
五、瓜子結構化數據流
目前有許多使用場景,比如前端埋點,tracking日志,Mysql數據同步,操作日志,一些諸如服務之間的交換,基于SQL的streaming,APM的數據,還有基于SQL的ETL等,都可以通過結構化將其快速同步到大數據中做后續分析。
我們是通過confluent提供的一整套解決方案來實現的。其中最主要的兩個組件是:Schema Registry和Kafka Connect。Schema Registry用于存儲schema信息,Kafka connect用于數據轉移。
目前,瓜子除日志部分外,90%以上為結構化。為什么選擇Avro?因為Avro速度快,并且跨語言支持,所有的Schema AVSC都是用JSON做的,對JSON支持的特別好,如果可能沒人想為一個schema定義再學一門語言吧。而且通過JSON無需code generation。
但僅有avro還不夠,我們在使用中會面臨更多的問題,比如:
統一的schema中心,這與配置中心的必要性是一樣的道理,沒有統一的地方,口口相傳,配置亂飛不是我們想看到的。
多版本的需求,schema是肯定會有更新需求的,也肯定有回滾需求,也會有兼容需求,所以多版本是需要滿足的。
版本兼容性檢查,設想一下上游改了一個schema的列名,下游寫到hive的時候就蒙了,歷史數據咋辦啊,現在這列數據又怎么處理。所以得有版本兼容,而且最好滿足下游所有組件的需求。
為解決這些問題,我們引入了confluence的Schema Registry。Confluence的Schema registry,通過RESTful接口,提供了類似配置中心的能力,還有完善的UI,支持版本兼容性檢查,支持多版本等,完全滿足了我們的需求。而且自帶HA,通過Kafka存儲配置信息,保證一致性。
五、瓜子的實踐
當然,僅有這些還不夠,我們在實踐中遇到了很多問題,比如schema注冊不可能完全開放,歷史告訴我們完全的自由意味著混亂。為在實踐中利用好avro,我們前后改了兩個方案,來保證schema可控。
為實現統一管控,所有schema會通過git來管理,如果需要使用可以fork該git。如果要做一個上線,更新或添加一個schema,可以通過提merge request提交給管理員。管理員檢查沒有問題后直接通過gitlab-ci自動注冊,管理員只需完成確認的操作。
但這樣會出現一些問題,首先是上線流程太長,要上線或更新一個schema時,需要提交merge request,要等管理員收到郵件后才可查看,屆時如果管理員發現schema寫的不對,還需重新再走一次流程,中間可能花一天時間。且回滾復雜,沒有權限管理。而且很多人會犯同樣的錯誤,客服表示相當的浪費時間。
六、平臺化解決方案
通過平臺化解決方案,我們提供了一個類似于git的頁面,可在上面直接提交schema,在下面直接點擊校驗,在評估新上線的schema是否有問題后,等待后臺審批即可。其中可以加諸如權限管理等一些功能。
七、為什么用到Kafka connect
Kafka connect專注copy數據,把一個數據從data source轉到Kafka,再從Kafka轉到其它地方。它支持批和流,同時支持實時和批處理,比如5min同步一次。
另外,它支持多個系統之間互相copy,數據源可能是Mysql、SQL Server 也可能是Oracle 。sink可以是Hbase、Hive等。它自己定義了一套plugin接口,可以自己寫很多數據源和不支持的sink。
并且他自己做到了分布式并行,支持完善的HA和load balance,還提供方便的RESTful 接口。
在沒有Kafka connect之前,運維ETL非常麻煩。拿canal來說,canal有server和client,都需手動部署,如果你有100臺canal節點1000個數據庫,想想看吧,管理員如何知道哪臺機器上跑了哪些庫表,新增的任務又放在哪臺機來運行。
此外,如果Mysql修改了一個字段,還需要讓程序員去機器上看一下那張表是如何修改的,相應的所有下游都需相應的完成表結構修改之后, 才能跑起來,響應速度非常慢。
目前Kafka connect已經解決了這些問題。其具備一個非常重要的特性,如果上游數據根據AVRO兼容性進行的修改,connect會在下游同樣做一些兼容性的修改,自動更改下游表結構,減輕了運維負擔。
我們來看看Kafka connect 的架構,Kafka connect會把所有信息存到Kafka 中,其中config topic存元數據,Stutas topic指當前哪些節點正在跑什么樣的job,offset topic指當前比如某個Topic的某個partitions到底消費到哪條數據。
WorKer都是無狀態的,在上面可以跑許多task,同樣一個task1,可能對應5個partitions,如果只給它三個并發,它會分布在三臺機器上。如果一臺機器掛了,這些job都會分配到另外兩臺機器,而且是實時同步的。
八、瓜子Plugins
瓜子對Kafka connect的很多plugins做了修改。
其中我們把canal通過maxwell替換,并且把maxwell做成了Kafka connect的plugin。
原生的Maxwell不支持AVRO,瓜子通過debezium思想對Maxwell進行了修改,使其支持avro格式,并用Mysql管理meta,并且支持Mysql的數據庫切換。
我們采用的是confluence公司的hdfs插件,但是其本身存在很多問題,比如寫入hive的時候會把當做partition的列也寫到主表數據中,雖然不影響hive的使用,但是影響presto讀取hive,這里我們改了源碼,去掉了主表中的這些列。
Hdfs在插件重啟時會去hdfs中讀取所有文件來確定從哪個offset繼續,這里會有兩個問題:耗時太長,切換集群時offset無法接續,我們也對他做了修改。
plugin寫入hive時支持用Kafka的timestamp做分區,也支持用數據內的某些列做分區,但是不支持兩者同時用,我們也修改了一下。
Hbase的plugin只支持最原始的導出,我們會有些特殊的需求,比如對rowkey自定義一下,通常mysql主鍵是自增ID,hbase不推薦用自增ID做rowkey,我們會有reverse的需求,還有多列聯合做rowkey的需求等,這個我們也改了源碼,支持通過配置自定義rowkey生成。
原始plugin不支持kerberos,而我們online hbase是帶權限的,所以也改了一下
還有一些小功能,比如把所有類型都先轉成string再存,支持delete,支持json等。
我們對kudu的使用很多,kudu開源的plugin有些bug,我們發現后也fix了一下。
Kudu的數據來源都是mysql,但是經常會有mysql刷庫的情況,這時量就會很大,kudu sink會有較大的延時,我們改了一下plugin,添加了自適應流量控制,自動擴充成多線程處理,也會在流量小時,自動縮容。
九、瓜子數據庫的Data Pipeline
瓜子的數據倉庫完全是基于Kafka、AVRO的結構化數據來做的。數據源非常多,需要將多個業務線的幾千張表同步到數倉,對外提供服務。
整個數據倉庫采用Lambda架構,分為T+1的存量處理和T+0.1的增量處理兩個流程。
先說T+1的存量處理部分,目前瓜子將所有mysql表通過Maxwell插件放到Kafka中,再通過Kafka connect寫到Hbase里,Hbase每天晚上做一次snapshot(快照),寫到hive中,然后經過多輪ETL:DWB-->DWD-->DW-->DM,最后將DM層數據導入Kudu中,對外提供BI分析服務,當然離線olap分析還是通過presto直接訪問Hive查詢。
再說T+0.1的增量流程,同T+1一樣,數據通過maxwell進入Kafka,這部分流程共用,然后增量數據會實時通過kudu的插件寫入kudu中,再通過impala做ETL,生成數據對外提供T+0.1的查詢,對外提供的查詢是通過另一套impala來做的。 未來我們還會考慮通過Flink直接讀取Kafka中數據來做實時ETL,提高實時性。
這是我們數倉架構的整體架構圖
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。