您好,登錄后才能下訂單哦!
最近和一些同學交流的時候反饋說,在面試Kafka時,被問到Kafka組件組成部分、API使用、Consumer和Producer原理及作用等問題都能詳細作答。但是,問到一個平時不注意的問題,就是Kafka的冪等性,被卡主了。那么,今天筆者就為大家來剖析一下Kafka的冪等性原理及實現。
Producer在生產發送消息時,難免會重復發送消息。Producer進行retry時會產生重試機制,發生消息重復發送。而引入冪等性后,重復發送只會生成一條有效的消息。Kafka作為分布式消息系統,它的使用場景常見與分布式系統中,比如消息推送系統、業務平臺系統(如物流平臺、銀行結算平臺等)。以銀行結算平臺來說,業務方作為上游把數據上報到銀行結算平臺,如果一份數據被計算、處理多次,那么產生的影響會很嚴重。
在使用Kafka時,需要確保Exactly-Once語義。分布式系統中,一些不可控因素有很多,比如網絡、OOM、FullGC等。在Kafka Broker確認Ack時,出現網絡異常、FullGC、OOM等問題時導致Ack超時,Producer會進行重復發送。可能出現的情況如下:
Kafka為了實現冪等性,它在底層設計架構中引入了ProducerID和SequenceNumber。那這兩個概念的用途是什么呢?
Kafka在引入冪等性之前,Producer向Broker發送消息,然后Broker將消息追加到消息流中后給Producer返回Ack信號值。實現流程如下:
上圖的實現流程是一種理想狀態下的消息發送情況,但是實際情況中,會出現各種不確定的因素,比如在Producer在發送給Broker的時候出現網絡異常。比如以下這種異常情況的出現:
上圖這種情況,當Producer第一次發送消息給Broker時,Broker將消息(x2,y2)追加到了消息流中,但是在返回Ack信號給Producer時失敗了(比如網絡異常) 。此時,Producer端觸發重試機制,將消息(x2,y2)重新發送給Broker,Broker接收到消息后,再次將該消息追加到消息流中,然后成功返回Ack信號給Producer。這樣下來,消息流中就被重復追加了兩條相同的(x2,y2)的消息。
面對這樣的問題,Kafka引入了冪等性。那么冪等性是如何解決這類重復發送消息的問題的呢?下面我們可以先來看看流程圖:
?同樣,這是一種理想狀態下的發送流程。實際情況下,會有很多不確定的因素,比如Broker在發送Ack信號給Producer時出現網絡異常,導致發送失敗。異常情況如下圖所示:
?當Producer發送消息(x2,y2)給Broker時,Broker接收到消息并將其追加到消息流中。此時,Broker返回Ack信號給Producer時,發生異常導致Producer接收Ack信號失敗。對于Producer來說,會觸發重試機制,將消息(x2,y2)再次發送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發送給Broker,而之前Broker緩存過之前發送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會出現重復發送的情況。
客戶端在生成Producer時,會實例化如下代碼:
// 實例化一個Producer對象
Producer<String, String> producer = new KafkaProducer<>(props);
在org.apache.kafka.clients.producer.internals.Sender類中,在run()中有一個maybeWaitForPid()方法,用來生成一個ProducerID,實現代碼如下:
private void maybeWaitForPid() {
if (transactionState == null)
return;
while (!transactionState.hasPid()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. " +
"We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. " +
"We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}
與冪等性有關的另外一個特性就是事務。Kafka中的事務與數據庫的事務類似,Kafka中的事務屬性是指一系列的Producer生產消息和消費消息提交Offsets的操作在一個事務中,即原子性操作。對應的結果是同時成功或者同時失敗。
這里需要與數據庫中事務進行區別,操作數據庫中的事務指一系列的增刪查改,對Kafka來說,操作事務是指一系列的生產和消費等原子性操作。
在事務屬性引入之前,先引入Producer的冪等性,它的作用為:
產生的場景有:
比如,在Consumer中Commit Offsets時,當Consumer在消費完成時Commit的Offsets為100(假設最近一次Commit的Offsets為50),那么執行觸發Balance時,其他Consumer就會重復消費消息(消費的Offsets介于50~100之間的消息)。
Producer提供了五種事務方法,它們分別是:initTransactions()、beginTransaction()、sendOffsetsToTransaction()、commitTransaction()、abortTransaction(),代碼定義在org.apache.kafka.clients.producer.Producer<K,V>接口中,具體定義接口如下:
// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();
// 開啟事務
void beginTransaction() throws ProducerFencedException;
// 為Consumer提供的在事務內Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事務
void commitTransaction() throws ProducerFencedException;
// 放棄事務,類似于回滾事務的操作
void abortTransaction() throws ProducerFencedException;
在Kafka事務中,一個原子性操作,根據操作類型可以分為3種情況。情況如下:
Kafka的冪等性和事務是比較重要的特性,特別是在數據丟失和數據重復的問題上非常重要。Kafka引入冪等性,設計的原理也比較好理解。而事務與數據庫的事務特性類似,有數據庫使用的經驗對理解Kafka的事務也比較容易接受。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。