您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何實現從庫MTS多線程并行回放,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
實際上協調線程只是將Event分發到了工作線程的執行隊列中。那么工作線程執行Event就需要從執行隊列中拿出這些Event,然后進行執行。整個過程可以參考函數slave_worker_exec_job_group。因為這個流程比較簡單,因此就不需要畫圖了,但是我們需要關注一些點如下:
(1)從執行隊列中讀取Event。注意這里如果執行隊列中沒有Event那么就進入空閑等待,也就是工作線程處于無事可做的狀態,等待狀態為‘Waiting for an event from Coordinator’。
(2)如果執行到XID_EVENT那么說明事務已經結束了那么需要完成內存信息更新操作。可參考Slave_worker::slave_worker_exec_event和Xid_apply_log_event::do_apply_event_worker函數。更新內存相關信息可參考函數commit_positions函數。下面是一些更新的信息,我們可以看到和slave_worker_info表中的信息基本一致,如下:
1、更新當前信息 strmake(group_relay_log_name, ptr_g->group_relay_log_name, sizeof(group_relay_log_name) - 1); group_relay_log_pos= ev->future_event_relay_log_pos; set_group_master_log_pos(ev->common_header->log_pos); set_group_master_log_name(c_rli->get_group_master_log_name()); 2、將檢查點信息進行寫入: strmake(checkpoint_relay_log_name, ptr_g- >checkpoint_relay_log_name,sizeof(checkpoint_relay_log_name) - 1); checkpoint_relay_log_pos= ptr_g->checkpoint_relay_log_pos; strmake(checkpoint_master_log_name, ptr_g- >checkpoint_log_name,sizeof(checkpoint_master_log_name) - 1); checkpoint_master_log_pos= ptr_g->checkpoint_log_pos; 3、設置GAQ序號: checkpoint_seqno= ptr_g->checkpoint_seqno; 更新整個BITMAP,可能已經由檢查點進行GAQ出隊: for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++) //重新設置位圖 因為checkpoint已經 { //ptr_g->shifted是GAQ中出隊的事務個數 if (bitmap_is_set(&group_shifted, pos)) //這里就需要偏移掉出隊的事務,恢復已經不需要了 bitmap_set_bit(&group_executed, pos - ptr_g->shifted); } 4、設置位圖: bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno); //在本次事務相應的位置設置為1
(3)如果執行到XID_EVENT那么說明事務已經結束了那么需要完成內存信息的持久化,即強制刷內存信息持久化到slave_worker_info表中(relay_log_info_repository設置為TABLE)。可參考函數commit_positions函數,如下:
if ((error= w->commit_positions(this, ptr_group, w->is_transactional())))
(4)如果執行到XID_EVENT還需要進行事務的提交操作,也就是進行Innodb層事務的提交。
從上面我們可以看到MTS中每次事務的提交并不會更新slave_relay_log_info表,而是進行slave_worker_info表的更新,將最新的信息寫入到slave_worker_info表中。
我們前面也說過SQL線程已經蛻變為協調線程,那么slave_relay_log_info表什么時候更新呢?下面我們就能看到slave_relay_log_info表的更新實際上由協調線程在做完檢查點之后更新。
總的說來MTS中的檢查點是MTS進行異常恢復的起點。實際上就是代表到這個位置之前(包含自身)事務都是已經在從庫執行過了,但之后的事務可能執行完成了也可能沒有執行完成。檢查點由協調線程進行。
前面我們已經知道MTS中為每個工作線程維護了一個Event的分發隊列。除此之外協調線程還維護了一個非常的重要的隊列GAQ,它是一個環形隊列。下面是源碼中的定義:
/* master-binlog ordered queue of Slave_job_group descriptors of groups that are under processing. The queue size is @c checkpoint_group. Group assigned */ Slave_committed_queue *gaq;
每次協調線程分發事務的時候都會將事務記錄到GAQ隊列中,因此GAQ中事務的順序總是和relay log文件中事務的順序一致的。檢查點正是作用在GAQ隊列上的,每次檢查點的位置稱為LWM,還記得上一節我叫大家先忽略的LWM嗎?就是這個。源碼中定義也正是如此,它在GAQ隊列中進行維護。如下:
/* The last checkpoint time Low-Water-Mark */ Slave_job_group lwm;
在GAQ隊列中還維護有一個叫做checkpoint_seqno的序號,它是最后一次檢查點以來每個分配事務的序號,下面是源碼中的定義:
uint checkpoint_seqno; // counter of groups executed after the most recent CP
在協調線程讀取到GTID_LOG_EVENT后為其分配序號,記做checkpoint_seqno,如下:
rli->checkpoint_seqno++;//增加seqno
當協調線程進行檢查點的時候checkpoint_seqno序號會減去出隊的事務數量,如下:
checkpoint_seqno= checkpoint_seqno - shift; //這里減去出隊的事務
在MTS異常恢復的時候也會用到這個序號,每個工作線程會通過這個序號來確認本工作線程執行事務的上限,如下:
for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt, j= 0; i <= w->checkpoint_seqno; i++, j++) { if (bitmap_is_set(&w->group_executed, i)) //如果這一位 已經設置 { DBUG_PRINT("mts", ("Setting bit %u.", j)); bitmap_fast_test_and_set(groups, j); //那么GTOUPS 這個 bitmap中應該設置,最終GTOUPS會包含全的需要恢復的事務 } }
關于詳細的異常恢復流程將在第25節描述。
有了GAQ隊列和檢查點就知道異常恢復開始的位置了。但是我們并不知道每一個工作線程都完成了哪些事務,哪些又沒有執行完成,因此就不能確認哪些事務需要恢復。在MTS中并行回放事務的提交并不是按分發順序的進行的,某些大事務(或者其他原因比鎖堵塞)可能遲遲不能提交,而一些小事務卻會很快提交完成。這些遲遲不能提交的事務就成為了所謂的’gap’,如果使用了GTID那么在查看已經執行GTID SET的時候可能出現一些‘空洞’,為了防止’gap’的發生通常需要設置參數slave_preserve_commit_order。下一節我們將會看到這種‘空洞’以及slave_preserve_commit_order的作用。但是如果要設置了slave_preserve_commit_order參數就需要開啟從庫記錄binary log的功能,因此必須開啟log_slave_updates參數。下面是源碼的判斷:
if (opt_slave_preserve_commit_order && rli->opt_slave_parallel_workers > 0 && opt_bin_log && opt_log_slave_updates) commit_order_mngr= new Commit_order_manager(rli->opt_slave_parallel_workers); //order commit 管理器
這里先提前說一下MTS恢復的會有兩個關鍵階段:
掃描階段
通過掃描檢查點以后的relay log。通過每個工作線程的Bitmap區分出哪些事務已經執行完成,哪些事務沒有執行完成,并且匯總形成恢復Bitmap,同時得到需要恢復的事務總量。
執行階段
通過這個匯總的恢復Bitmap,將這些沒有執行完成事務讀取relay log再次執行。
這個Bitmap位圖和GAQ中的事務一一對應。當執行XID_EVENT完成提交后這一位將會被設置為‘1’。
這個已經在前面提到過,實際上每次進行檢查點的時候都需要將檢查點的位置固化到slave_relay_log_info表中(relay_log_info_repository設置為TABLE)。因此slave_relay_log_info中存儲的實際上不是實時的信息而是檢查點的信息。下面就是slave_relay_log_info表的表結構:
mysql> desc slave_relay_log_info; +-------------------+---------------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +-------------------+---------------------+------+-----+---------+-------+ | Number_of_lines | int(10) unsigned | NO | | NULL | | | Relay_log_name | text | NO | | NULL | | | Relay_log_pos | bigint(20) unsigned | NO | | NULL | | | Master_log_name | text | NO | | NULL | | | Master_log_pos | bigint(20) unsigned | NO | | NULL | | | Sql_delay | int(11) | NO | | NULL | | | Number_of_workers | int(10) unsigned | NO | | NULL | | | Id | int(10) unsigned | NO | | NULL | | | Channel_name | char(64) | NO | PRI | NULL | | +-------------------+---------------------+------+-----+---------+-------+
與此同時show slave status中的某些信息也是檢查點的內存信息。下面的信息將是來自檢查點:
Relay_Log_File :最新一次檢查點的relay log文件名。
Relay_Log_Pos :最新一次檢查點的relay log位點。
Relay_Master_Log_File:最新一次檢查點的主庫binary log文件名。
Exec_Master_Log_Pos:最新一次檢查點的主庫binary log位點。
Seconds_Behind_Master:根據檢查點指向事務的提交時間計算的延遲。
需要注意的是我們的GTID模塊獨立在這一套理論之外,在第3節我們講GTID模塊的初始化的時候我們就說過GTID模塊的初始化是在從庫信息初始化之前就完成了。因此在做MTS異常恢復的時候使用GTID AUTO_POSITION MODE模式將會變得更加簡單和安全,細節將在第25節描述。
工作線程的信息就持久化在slave_worker_info 表中,前面我們描述工作線程執行Event注意點的時候已經做了相應的描述。執行XID_EVENT完成事務提交之后會將信息寫入到slave_worker_info 表中(relay_log_info_repository設置為TABLE)。其中包括信息:
Relay_log_name:工作線程最后一個提交事務的relay log文件名。
Relay_log_pos:工作線程最后一個提交事務的relay log位點。
Master_log_name:工作線程最后一個提交事務的主庫binary log文件名。
Master_log_pos:工作線程最后一個提交事務的主庫binary log文件位點。
Checkpoint_relay_log_name:工作線程最后一個提交事務對應檢查點的relay log文件名。
Checkpoint_relay_log_pos:工作線程最后一個提交事務對應檢查點的relay log位點。
Checkpoint_master_log_name:工作線程最后一個提交事務對應檢查點的主庫binary log文件名。
Checkpoint_master_log_pos:工作線程最后一個提交事務對應檢查點的主庫binary log位點。
Checkpoint_seqno:工作線程最后一個提交事務對應checkpoint_seqno序號。
Checkpoint_group_size:工作線程的Bitmap字節數,約等于 GAQ隊列大小/8,因為1個字節為8位。
Checkpoint_group_bitmap:工作線程對應的Bitmap位圖信息。
關于Checkpoint_group_size的換算參考函數Slave_worker::write_info。
slave_checkpoint_group:GAQ隊列大小。
slave_checkpoint_period:多久執行一次檢查點,默認300毫秒。
超過slave_checkpoint_period配置。可參考next_event函數如下:
if (rli->is_parallel_exec() && (opt_mts_checkpoint_period != 0 || force)) { ulonglong period= static_cast<ulonglong>(opt_mts_checkpoint_period * 1000000ULL); ... (void) mts_checkpoint_routine(rli, period, force, true/*need_data_lock=true*/); ... }
達到GAQ隊列已滿,如下:
//如果達到了 GAQ的大小 設置為force 強制checkpoint bool force= (rli->checkpoint_seqno > (rli->checkpoint_group - 1));
正常stop slave。
通常有壓力的情況下的slave_worker_info中的所有工作線程最大的Checkpoint_master_log_pos應該和slave_relay_log_info中的Master_log_pos 相等,因為這是最后一個檢查點的位點信息,如下:
這一部分將詳細描述一下檢查點的步驟,關于檢查點可以參考函數mts_checkpoint_routine。
假設現在有7個事務是可以并行執行的,工作線程數量為4個。當前協調線程已經分發了5個,前面4個事務都已經執行完成,其中第5的一個事務是大事務。那么可能當前的狀態圖如下(圖20-1,高清原圖包含在文末原圖中):
前面4個事務每個工作線程都分到一個,最后一個大事務這里假設由工作線程2進行執行,圖中用紅色部分表示。
if (!force && diff < period) //是否需要進行檢查點是否超過了slave_checkpoint_period的設置 { /* We do not need to execute the checkpoint now because the time elapsed is not enough. */ DBUG_RETURN(FALSE); }
cnt= rli->gaq->move_queue_head(&rli->workers); //work數組 返回出隊的個數
move_queue_head部分代碼如下:
if (ptr_g->worker_id == MTS_WORKER_UNDEF || my_atomic_load32(&ptr_g->done) == 0) //當前GROUP是否已經執行完成 如果沒有執行完成就需要 停止本次檢查點 break; /* 'gap' at i'th */
先更新內存信息,也就是我們show slave status中看到的信息:
rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos); rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos); rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
然后強制寫入表slave_relay_log_info中:
error= rli->flush_info(TRUE); //將本次檢查點信息 寫入到relay_log_info_repository表中
這個值在第27節中會詳細描述,它是計算Seconds_behind_master的一個因素:
/* Update the rli->last_master_timestamp for reporting correct Seconds_behind_master. If GAQ is empty, set it to zero. Else, update it with the timestamp of the first job of the Slave_job_queue which was assigned in the Log_event::get_slave_worker() function. */ ts= rli->gaq->empty()? 0 : reinterpret_cast<Slave_job_group*>(rli->gaq->head_queue())->ts; //rli->gaq->head_queue 檢查點位置的GROUP的時間 rli->reset_notified_checkpoint(cnt, ts, need_data_lock, true); reset_notified_checkpoint函數中有: last_master_timestamp= new_ts;
因此MTS中Seconds_behind_master的計算和檢查點息息相關。
這個操作也是在函數Relay_log_info::reset_notified_checkpoint中完成的,實際上很簡單部分代碼如下:
for (Slave_worker **it= workers.begin(); it != workers.end(); ++it) //循環每個woker w->bitmap_shifted= w->bitmap_shifted + shift; //每個worker線程都會增加 這個偏移量 checkpoint_seqno= checkpoint_seqno - shift; //這里減去 移動的個數
到這里整個檢查點的基本操作就完成了。我們看到實際上步驟并不多,拿到Bitmap偏移量后每個工作線程就會在隨后的第一個事務提交的時候進行位圖的偏移,checkpoint_seqno 計數也會更新。
我們前面的假設環境中,如果觸發了一次檢查點,并且協調線程將后兩個可以并行的事務發給了工作線程1和3進行處理并且處理完成。那么我們的圖會變成如下(圖20-2,高清原圖包含在文末原圖中):
這張圖中我用不同樣色表示了不同線條,因為它們交叉比較多。GAQ中的紅色事務就是我們假設的大事務它仍然沒有執行完成,它也是我們所謂的‘gap’。如果這個時候MySQL實例異常重啟,那么這個紅色‘gap’就是我們啟動后需要找到的事務,方式就是通過Bitmap位圖進行比對,后面說異常恢復的時候再詳細討論。如果是開啟了GTID,這種‘gap’很容易就能觀察到,下一節將進行測試。
同時我們需要注意這個時候工作線程2并沒有分發新的事務執行,因為工作線程2沒有執行完大事務, 因此在slave_woker_info表中它的信息仍然顯示為上一次提交事務的信息。而工作線程4因為沒有分配到新的事務,因此slave_woker_info表中它的信息也顯示為上一次提交事務的信息。因此在slave_woker_info中工作線程2和工作線程4的檢查點信息、Bitmap信息、checkpoint_seqno都是老的信息。
好了到這里我已經說明了MTS中三個關鍵點
協調線程是根據什么規則進行事務分發的。
工作線程如何拿到分發的事務。
MTS中的檢查點是如何進行的。
看完上述內容,你們對如何實現從庫MTS多線程并行回放有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。