您好,登錄后才能下訂單哦!
這篇文章主要介紹了Ruby3多線程并行Ractor怎么使用的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Ruby3多線程并行Ractor怎么使用文章都會有所收獲,下面我們一起來看看吧。
在Ruby3之前,使用Thread來創建新的線程,但這種方式創建的多線程是并發而非并行的,MRI有一個全局解釋器鎖GIL來控制同一時刻只能有一個線程在執行:
# main Thread t1 = Thread.new do # new Thread sleep 3 end t1.join
Ruby3通過Ractor(Ruby Actor,Actor模型通過消息傳遞的方式來修改狀態)支持真正的多線程并行,多個Ractor之間可并行獨立運行。
# main Ractor # 創建一個可與main Ractor并行運行的Ractor r = Ractor.new do sleep 2 Ractor.yield "hello" end puts r.take
需注意,每個Ractor中至少有一個原生Ruby線程,但每個Ractor內部都擁有獨立的GIL,使得Ractor內部在同一時刻最多只能有一個線程在運行。從這個角度來看,Ractor實際上是解釋器線程,每個解釋器線程擁有一個全局解釋器鎖。
如果main Ractor退出,則其他Ractor也會收到退出信號,就像main Thread退出時,其他Thread也會退出一樣。
使用Ractor.new
創建一個Ractor實例,創建實例時需指定一個語句塊,該語句塊中的代碼會在該Ractor中運行。
r = Ractor.new do puts "new Ractor" end
可在new方法的參數上為該Ractor實例指定名稱:
r = Ractor.new(name: "ractor1") do puts "new Ractor" end puts r.name # ractor 1
new方法也可指定其他參數,這些參數必須在name參數之前,且這些參數將直接原樣傳遞給語句塊參數:
arr = [11, 22, 33] r = Ractor.new(arr, name: "r1") do |arr| puts "arr" end sleep 1
關于new的參數,稍后還會有解釋。
可使用Ractor.current
獲取當前的Ractor實例,使用Ractor.count
獲取當前存活的Ractor實例數量。
Ractor傳遞消息的方式分兩種:
Push方式:向某個特定的Ractor實例推送消息,可使用r.send(Msg)
或別名r << Msg
向該Ractor實例傳送消息,并在該Ractor實例內部使用Ractor.receive
或別名Ractor.recv
或它們的同名私有方法來接收推送進來的消息
Ractor還提供了Ractor.receive_if {expr}
方法,表示只在expr為true時才接收消息,receive
等價于receive_if {true}
Pull方式:從某個特定的Ractor實例拉取消息,可在該Ractor實例內部使用Ractor.yield
向外傳送消息,并在需要的地方使用r.take
獲取傳輸出來的消息
Ractor.new
的語句塊返回值,相當于Ractor.yield
,它也可被r.take
接收
因此,對于Push方式,要求知道消息傳遞的目標Ractor,對于Pull方式,要求知道消息的來源Ractor。
# yield + take r = Ractor.new {Ractor.yield "hello"} puts r.take # send + receive r1 = Ractor.new do # Ractor.receive或Ractor.recv # 或同名私有方法:receive、recv puts Ractor.receive end r1.send("hello") r1.take # 本次take取得r1語句塊的返回值,即puts的返回值nil
使用new方法創建Ractor實例時,可指定new的參數,這些參數會被原樣傳遞給Ractor的語句塊參數。
arr = [11, 22, 33] r = Ractor.new(arr) { |arr| ...}
實際上,new的參數等價于在Ractor語句塊的開頭使用了Ractor.receive
接收消息:
r = Ractor.new 'ok' { |msg| msg } r.take #=> 'ok' # 基本等價于 r = Ractor.new do msg = Ractor.receive msg end r.send 'ok' r.take #=> 'ok'
Ractor之間傳遞消息時,實際上是通過Ractor的消息端口進行傳遞的。
每個Ractor都有自己的incoming port和outgoing port:
incoming port:是該Ractor接收消息的端口,r.send
和Ractor.receive
使用該端口
每個incoming port都連接到一個大小不限的隊列上
r.send
傳入的消息都會寫入該隊列,由于該隊列大小不限,因此r.send
從不阻塞
Ractor.receive
從該隊列彈出消息,當隊列為空時,Ractor.receive
被阻塞直到新消息出現
可使用r.close_incoming
關閉incoming port,關閉該端口后,r.send
將直接報錯,Ractor.receive
將先從隊列中取數據,當隊列為空后,再調用Ractor.receive
將報錯
outgoing port:是該Ractor向外傳出消息的端口,Ractor.yield
和r.take
使用該端口
Ractor.yield
或Ractor語句塊返回時,消息從outgoing port流出
當沒有r.take
接收消息時,r內部的Ractor.yield
將被阻塞
當r內部沒有Ractor.yield
時,r.take
將被阻塞
Ractor.yield
從outgoing port傳出的消息可被任意多個r.take
等待,但只有一個r.take
可獲取到該消息
可使用r.close_outgoing
關閉outgoing port,關閉該端口后,再調用r.take
和Ractor.yield
將直接報錯。如果r.take
正被阻塞(等待Ractor.yield
傳出消息),關閉outgoing port操作將取消所有等待中的take并報錯
可使用Ractor.select(r1,r2,r3...)
等待一個或多個Ractor實例outgoing port上的消息(因此,select主要用于等待Ractor.yield
的消息),等待到第一個消息后立即返回。
Ractor.select
的返回值格式為[r, obj]
,其中:
r表示等待到的那個Ractor實例
obj表示接收到的消息對象
例如:
r1 = Ractor.new{'r1'} r2 = Ractor.new{'r2'} rs = [r1, r2] as = [] # Wait for r1 or r2's Ractor.yield r, obj = Ractor.select(*rs) rs.delete(r) as << obj # Second try (rs only contain not-closed ractors) r, obj = Ractor.select(*rs) rs.delete(r) as << obj as.sort == ['r1', 'r2'] #=> true
通常來說,會使用Ractor.select
來輪詢等待多個Ractor實例的消息,通用化的處理流程參考如下:
# 充當管道功能的Ractor:接收消息并發送出去,并不斷循環 pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end RN = 10 # rs變量保存了10個Ractor實例 # 每個Ractor實例都從管道pipe中取一次消息然后由本Ractor發送出去 rs = RN.times.map{|i| Ractor.new pipe, i do |pipe, i| msg = pipe.take msg # ping-pong end } # 向管道中發送10個數據 RN.times{|i| pipe << i} # 輪詢等待10個Ractor實例的outgoing port # 每等待成功一次,從rs中刪除所等待到的Ractor實例, # 然后繼續等待剩下的Ractor實例 RN.times.map{ r, n = Ractor.select(*rs) rs.delete r n }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
此外,Ractor.select
除了可等待消息外,也可以用來yield傳遞消息,更多用法參考官方手冊:Ractor.select。
多個Ractor之間是可并行運行的,為了避免Ractor之間傳遞數據時出現競態問題,Ractor采取了一些措施:
對于不可變對象,它們可直接在Ractor之間共享,此時傳遞它們的引用
對于可變對象,它們不可直接在Ractor之間共享,此時傳遞數據時,默認先按字節逐字節拷貝,然后后傳遞副本
也可以顯式指定移動數據,將某份數據從Ractor1移動到另一個Ractor2中,即轉移數據的所有權(參考Rust的所有權規則),轉移所有權后,原始所有者Ractor中將無法再訪問該數據
可共享的對象:自動傳遞它們的引用,效率高
不可變對象可在Ractor之間直接共享(如Integer、symbol、true/false、nil),如:
i=123
:i是可共享的
s="str".freeze
:s是可共享的
h={c: Object}.freeze
:h是可共享的,因為Object是一個類對象,類對象是可共享的
a=[1,[2],3].freeze
:a不可共享,因為凍結后仍然包含可變的[2]
Class/Module對象,即類對象自身和模塊對象自身是可共享的
Ractor對象自身是可共享的
例如:
i = 33 r = Ractor.new do m = recv puts m.object_id end r.send(i) # 傳遞i r.take # 等待Ractor執行結束(語句塊返回) puts i.object_id # i傳遞后仍然可用 =begin 67 67 =end
值得注意的是,Ractor對象是可共享的,因此可將某個Ractor實例傳遞給另一個Ractor實例。例如:
pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end RN = 10 rs = RN.times.map{|i| # pipe是一個Ractor實例,這里作為參數傳遞給其他的Ractor實例 Ractor.new pipe, i do |pipe, i| pipe << i end } RN.times.map{ pipe.take }.sort #=> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
絕大多數對象不是可直接共享的。在Ractor之間傳遞不可共享的對象時,默認會傳遞deep-copy后的副本,即按字節拷貝的方式拷貝該對象的每一個字節。這種方式效率較低。
例如:
arr = [11, 22, 33] # 數組是可變的,不可共享 r = Ractor.new do m = recv puts "copied: #{m.object_id}" end r.send(arr) # 傳遞數組,此時將逐字節拷貝數組 r.take puts "origin: #{arr.object_id}" =begin copied: 60 origin: 80 =end
從結果看,兩個Ractor內的arr不是同一個對象。
需注意,對于全局唯一的對象來說(比如數值、nil、false、true、symbol),逐字節拷貝時并不會拷貝它們。例如:
arr = %i[lang action sub] r = Ractor.new do m = recv puts "copied: #{m.object_id}, #{m[0].object_id}, #{m[1].object_id}" end r.send(arr) r.take puts "origin: #{arr.object_id}, #{arr[0].object_id}, #{arr[1].object_id}" =begin copied: 60, 80, 1046748 origin: 100, 80, 1046748 =end
注意,Thread對象無法拷貝,因此無法在Ractor之間傳遞。
還可以讓r.send(msg, move: true)
和Ractor.yield(msg, move: true)
傳遞數據時,明確表示要移動而非拷貝數據,即轉移數據的所有權(從原來的所有者Ractor實例轉移到目標Ractor實例)。
無論是可共享還是不可共享的對象,都可以轉移所有權,只不過轉移可共享對象的所有權沒有意義,因為轉移之后,原所有者仍然擁有所有權。
因此,通常只對不可共享的數據來轉移所有權,轉移所有權后,原所有者將無法訪問該數據。
str = "hello" puts str.object_id r = Ractor.new do m = recv puts m.object_id end r.send(str, move: true) # 轉移str的所有權 r.take #puts str.object_id # 轉移所有權后再訪問str,將報錯 =begin 60 80 =end
值得注意的是,移動的本質是內存拷貝,它底層也一樣是逐字節拷貝原始數據的過程,所以移動傳遞數據的效率和傳遞副本數據的效率是類似的。移動傳遞和傳遞副本的區別之處在于所有權,移動傳遞后,原所有者Ractor實例將無法訪問該數據,而拷貝傳遞方式則允許原所有者訪問。
注意,Thread對象無法轉移所有權,因此無法在Ractor之間傳遞。
對于不可共享的數據obj,可通過Ractor.make_shareable(obj)
方法將其轉變為可共享的數據,默認轉變的方式是逐層次地遞歸凍結obj。也可指定額外的參數Ractor.make_shareable(obj, copy: true)
,此時將深拷貝obj得其副本,再讓副本(逐層遞歸凍結)轉變為可共享數據。
例如:
arr = %w[lang action sub] puts arr.object_id r = Ractor.new do m = recv puts m.object_id end r.send(Ractor.make_shareable(arr)) r.take puts arr.object_id puts arr.frozen?
輸出:
60 60 60 true
工作者線程池:
require 'prime' pipe = Ractor.new do loop do Ractor.yield Ractor.receive end end N = 1000 RN = 10 workers = (1..RN).map do Ractor.new pipe do |pipe| while n = pipe.take Ractor.yield [n, n.prime?] end end end (1..N).each{|i| pipe << i } pp (1..N).map{ _r, (n, b) = Ractor.select(*workers) [n, b] }.sort_by{|(n, b)| n}
Pipeline:
# pipeline with yield/take r1 = Ractor.new do 'r1' end r2 = Ractor.new r1 do |r1| r1.take + 'r2' end r3 = Ractor.new r2 do |r2| r2.take + 'r3' end p r3.take #=> 'r1r2r3'
關于“Ruby3多線程并行Ractor怎么使用”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Ruby3多線程并行Ractor怎么使用”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。