您好,登錄后才能下訂單哦!
在面試中關于多線程同步,你必須要思考的問題 一文中,我們知道glibc的pthread_cond_timedwait
底層是用linux futex機制實現的。
理想的同步機制應該是沒有鎖沖突時在用戶態利用原子指令就解決問題,而需要掛起等待時再使用內核提供的系統調用進行睡眠與喚醒。換句話說,在用戶態的自旋失敗時,能不能讓進程掛起,由持有鎖的線程釋放鎖時將其喚醒?
如果你沒有較深入地考慮過這個問題,很可能想當然的認為類似于這樣就行了(偽代碼):
void?lock(int?lockval)?{ //trylock是用戶級的自旋鎖 while(!trylock(lockval))?{ wait();//釋放cpu,并將當期線程加入等待隊列,是系統調用 } }boolean?trylock(int?lockval){ int?i=0;? //localval=1代表上鎖成功 while(!compareAndSet(lockval,0,1)){ if(++i>10){ return?false; } } return?true; }void?unlock(int?lockval)?{ ?compareAndSet(lockval,1,0); ?notify(); }
上述代碼的問題是trylock和wait兩個調用之間存在一個窗口:
如果一個線程trylock失敗,在調用wait時持有鎖的線程釋放了鎖,當前線程還是會調用wait進行等待,但之后就沒有人再喚醒該線程了。
為了解決上述問題,linux內核引入了futex機制,futex主要包括等待和喚醒兩個方法:futex_wait
和futex_wake
,其定義如下
//uaddr指向一個地址,val代表這個地址期待的值,當*uaddr==val時,才會進行waitint?futex_wait(int?*uaddr,?int?val);//喚醒n個在uaddr指向的鎖變量上掛起等待的進程int?futex_wake(int?*uaddr,?int?n);
futex在真正將進程掛起之前會檢查addr指向的地址的值是否等于val,如果不相等則會立即返回,由用戶態繼續trylock。否則將當期線程插入到一個隊列中去,并掛起。
在關于同步的一點思考-上文章中對futex的背景與基本原理有介紹,對futex不熟悉的人可以先看下。
本文將深入分析futex的實現,讓讀者對于鎖的最底層實現方式有直觀認識,再結合之前的兩篇文章(關于同步的一點思考-上和關于同步的一點思考-下)能對操作系統的同步機制有個全面的理解。
下文中的進程一詞包括常規進程與線程。
在看下面的源碼分析前,先思考一個問題:如何確保掛起進程時,val的值是沒有被其他進程修改過的?
代碼在kernel/futex.c中
static?int?futex_wait(u32?__user?*uaddr,?int?fshared, ??????u32?val,?ktime_t?*abs_time,?u32?bitset,?int?clockrt){ struct?hrtimer_sleeper?timeout,?*to?=?NULL; struct?restart_block?*restart; struct?futex_hash_bucket?*hb; struct?futex_q?q; int?ret; ... //設置hrtimer定時任務:在一定時間(abs_time)后,如果進程還沒被喚醒則喚醒wait的進程 if?(abs_time)?{ ????... hrtimer_init_sleeper(to,?current); ... } retry: //該函數中判斷uaddr指向的值是否等于val,以及一些初始化操作 ret?=?futex_wait_setup(uaddr,?val,?fshared,?&q,?&hb); //如果val發生了改變,則直接返回 if?(ret) goto?out; //將當前進程狀態改為TASK_INTERRUPTIBLE,并插入到futex等待隊列,然后重新調度。 futex_wait_queue_me(hb,?&q,?to); /*?If?we?were?woken?(and?unqueued),?we?succeeded,?whatever.?*/ ret?=?0; //如果unqueue_me成功,則說明是超時觸發(因為futex_wake喚醒時,會將該進程移出等待隊列,所以這里會失敗) if?(!unqueue_me(&q)) goto?out_put_key; ret?=?-ETIMEDOUT; if?(to?&&?!to->task) goto?out_put_key; /* ?*?We?expect?signal_pending(current),?but?we?might?be?the ?*?victim?of?a?spurious?wakeup?as?well. ?*/ if?(!signal_pending(current))?{ put_futex_key(fshared,?&q.key); goto?retry; } ret?=?-ERESTARTSYS; if?(!abs_time) goto?out_put_key; ... out_put_key: put_futex_key(fshared,?&q.key); out: if?(to)?{ //取消定時任務 hrtimer_cancel(&to->timer); destroy_hrtimer_on_stack(&to->timer); } return?ret; }
在將進程阻塞前會將當期進程插入到一個等待隊列中,需要注意的是這里說的等待隊列其實是一個類似Java HashMap的結構,全局唯一。
struct?futex_hash_bucket?{ spinlock_t?lock; //雙向鏈表 struct?plist_head?chain;};static?struct?futex_hash_bucket?futex_queues[1<<FUTEX_HASHBITS];
著重看futex_wait_setup
和兩個函數futex_wait_queue_me
static?int?futex_wait_setup(u32?__user?*uaddr,?u32?val,?int?fshared, ???struct?futex_q?*q,?struct?futex_hash_bucket?**hb){ u32?uval; int?ret; retry: q->key?=?FUTEX_KEY_INIT; //初始化futex_q ret?=?get_futex_key(uaddr,?fshared,?&q->key,?VERIFY_READ); if?(unlikely(ret?!=?0)) return?ret; retry_private: //獲得自旋鎖 *hb?=?queue_lock(q); //原子的將uaddr的值設置到uval中 ret?=?get_futex_value_locked(&uval,?uaddr); ???...? //如果當期uaddr指向的值不等于val,即說明其他進程修改了 //uaddr指向的值,等待條件不再成立,不用阻塞直接返回。 if?(uval?!=?val)?{ //釋放鎖 queue_unlock(q,?*hb); ret?=?-EWOULDBLOCK; } ???... return?ret; }
函數futex_wait_setup
中主要做了兩件事,一是獲得自旋鎖,二是判斷*uaddr是否為預期值。
static?void?futex_wait_queue_me(struct?futex_hash_bucket?*hb,?struct?futex_q?*q, struct?hrtimer_sleeper?*timeout) { //設置進程狀態為TASK_INTERRUPTIBLE,cpu調度時只會選擇 //狀態為TASK_RUNNING的進程 set_current_state(TASK_INTERRUPTIBLE); //將當期進程(q封裝)插入到等待隊列中去,然后釋放自旋鎖 queue_me(q,?hb); //啟動定時任務 if?(timeout)?{ hrtimer_start_expires(&timeout->timer,?HRTIMER_MODE_ABS); if?(!hrtimer_active(&timeout->timer)) timeout->task?=?NULL; } /* ?*?If?we?have?been?removed?from?the?hash?list,?then?another?task ?*?has?tried?to?wake?us,?and?we?can?skip?the?call?to?schedule(). ?*/ if?(likely(!plist_node_empty(&q->list)))?{ ? ?//如果沒有設置過期時間?||?設置了過期時間且還沒過期 if?(!timeout?||?timeout->task) //系統重新進行進程調度,這個時候cpu會去執行其他進程,該進程會阻塞在這里 schedule(); } //走到這里說明又被cpu選中運行了 __set_current_state(TASK_RUNNING); }
futex_wait_queue_me
中主要做幾件事:
將當期進程插入到等待隊列
啟動定時任務
重新調度進程
在futex_wait_setup
方法中會加自旋鎖;在futex_wait_queue_me
中將狀態設置為TASK_INTERRUPTIBLE
,調用queue_me
將當期線程插入到等待隊列中,然后才釋放自旋鎖。也就是說檢查uaddr的值的過程跟進程掛起的過程放在同一個臨界區中。當釋放自旋鎖后,這時再更改addr地址的值已經沒有關系了,因為當期進程已經加入到等待隊列中,能被wake喚醒,不會出現本文開頭提到的沒人喚醒的問題。
總結下futex_wait
流程:
加自旋鎖
檢測*uaddr是否等于val,如果不相等則會立即返回
將進程狀態設置為TASK_INTERRUPTIBLE
將當期進程插入到等待隊列中
釋放自旋鎖
創建定時任務:當超過一定時間還沒被喚醒時,將進程喚醒
掛起當前進程
futex_wake
static?int?futex_wake(u32?__user?*uaddr,?int?fshared,?int?nr_wake,?u32?bitset){ struct?futex_hash_bucket?*hb; struct?futex_q?*this,?*next; struct?plist_head?*head; union?futex_key?key?=?FUTEX_KEY_INIT; int?ret; ... //根據uaddr的值填充&key的內容 ret?=?get_futex_key(uaddr,?fshared,?&key,?VERIFY_READ); if?(unlikely(ret?!=?0)) goto?out; //根據&key獲得對應uaddr所在的futex_hash_bucket hb?=?hash_futex(&key); //對該hb加自旋鎖 spin_lock(&hb->lock); head?=?&hb->chain; //遍歷該hb的鏈表,注意鏈表中存儲的節點是plist_node類型,而而這里的this卻是futex_q類型,這種類型轉換是通過c中的container_of機制實現的 plist_for_each_entry_safe(this,?next,?head,?list)?{ if?(match_futex?(&this->key,?&key))?{ ... //喚醒對應進程 wake_futex(this); if?(++ret?>=?nr_wake) break; } } //釋放自旋鎖 spin_unlock(&hb->lock); put_futex_key(fshared,?&key); out: return?ret; }
futex_wake
流程如下:
找到uaddr對應的futex_hash_bucket
,即代碼中的hb
對hb加自旋鎖
遍歷fb的鏈表,找到uaddr對應的節點
調用wake_futex
喚起等待的進程
釋放自旋鎖
wake_futex
中將制定進程狀態設置為TASK_RUNNING
并加入到系統調度列表中,同時將進程從futex的等待隊列中移除掉,具體代碼就不分析了,有興趣的可以自行研究。
Java中的ReentrantLock,Object.wait和Thread.sleep等等底層都是用futex進行線程同步,理解futex的實現能幫助你更好的理解與使用這些上層的同步機制。另外因篇幅與精力有限,涉及到進程調度的相關內容沒有具體分析,不過并不妨礙理解文章內容。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。