您好,登錄后才能下訂單哦!
今天給大家介紹一下.Net中如何使用Parallel。文章的內容小編覺得不錯,現在給大家分享一下,覺得有需要的朋友可以了解一下,希望對大家有所幫助,下面跟著小編的思路一起來閱讀吧。
一、理解硬件線程和軟件線程
多核處理器帶有一個以上的物理內核--物理內核是真正的獨立處理單元,多個物理內核使得多條指令能夠同時并行運行。硬件線程也稱為邏輯內核,一個物理內 核可以使用超線程技術提供多個硬件線程。所以一個硬件線程并不代表一個物理內核;Windows中每個運行的程序都是一個進程,每一個進程都會創建并運行 一個或多個線程,這些線程稱為軟件線程。硬件線程就像是一條泳道,而軟件線程就是在其中游泳的人。
二、并行場合
.Net Framework4 引入了新的Task Parallel Library(任務并行庫,TPL),它支持數據并行、任務并行和流水線。讓開發人員應付不同的并行場合。
數據并行:有大量數據需要處理,并且必須對每一份數據執行同樣的操作。比如通過256bit的密鑰對100個Unicode字符串進行AES算法加密。
任務并行:通過任務并發運行不同的操作。例如生成文件散列碼,加密字符串,創建縮略圖。
流水線:這是任務并行和數據并行的結合體。
TPL引入了System.Threading.Tasks ,主類是Task,這個類表示一個異步的并發的操作,然而我們不一定要使用Task類的實例,可以使用Parallel靜態類。它提供了 Parallel.Invoke, Parallel.For Parallel.Forecah 三個方法。
三、Parallel.Invoke
試圖讓很多方法并行運行的最簡單的方法就是使用Parallel類的Invoke方法。例如有四個方法:
WatchMovie
HaveDinner
ReadBook
WriteBlog
通過下面的代碼就可以使用并行。
System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog);
這段代碼會創建指向每一個方法的委托。Invoke方法接受一個Action的參數組。
1 |
|
用lambda表達式或匿名委托可以達到同樣的效果。
System.Threading.Tasks.Parallel.Invoke(() => WatchMovie(), () => HaveDinner(), () => ReadBook(), delegate() { WriteBlog(); });
1.沒有特定的執行順序。
Parallel.Invoke方法只有在4個方法全部完成之后才會返回。它至少需要4個硬件線程才足以讓這4個方法并發運行。但并不保證這4個方法能夠同時啟動運行,如果一個或者多個內核處于繁忙狀態,那么底層的調度邏輯可能會延遲某些方法的初始化執行。
給方法加上延時,就可以看到必須等待最長的方法執行完成才回到主方法。
static void Main(string[] args) { System.Threading.Tasks.Parallel.Invoke(WatchMovie, HaveDinner, ReadBook, WriteBlog); Console.WriteLine("執行完成"); Console.ReadKey(); } static void WatchMovie() { Thread.Sleep(5000); Console.WriteLine("看電影"); } static void HaveDinner() { Thread.Sleep(1000); Console.WriteLine("吃晚飯"); } static void ReadBook() { Thread.Sleep(2000); Console.WriteLine("讀書"); } static void WriteBlog() { Thread.Sleep(3000); Console.WriteLine("寫博客"); }
這樣會造成很多邏輯內核處于長時間閑置狀態。
四、Parallel.For
Parallel.For為固定數目的獨立For循環迭代提供了負載均衡 (即將工作分發到不同的任務中執行,這樣所有的任務在大部分時間都可以保持繁忙) 的并行執行。從而能盡可能地充分利用所有的可用的內核。
我們比較下下面兩個方法,一個使用For循環,一個使用Parallel.For 都是生成密鑰在轉換為十六進制字符串。
private static void GenerateAESKeys() { var sw = Stopwatch.StartNew(); for (int i = 0; i < NUM_AES_KEYS; i++) { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); } Console.WriteLine("AES:"+sw.Elapsed.ToString()); } private static void ParallelGenerateAESKeys() { var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, (int i) => { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); }); Console.WriteLine("Parallel_AES:" + sw.Elapsed.ToString()); }
private static int NUM_AES_KEYS = 100000;
static void Main(string[] args)
{
Console.WriteLine("執行"+NUM_AES_KEYS+"次:"); GenerateAESKeys();
ParallelGenerateAESKeys();
Console.ReadKey();
}
執行1000000次
這里并行的時間是串行的一半。
五、Parallel.ForEach
在Parallel.For中,有時候對既有循環進行優化可能會是一個非常復雜的任務。Parallel.ForEach為固定數目的獨立For Each循環迭代提供了負載均衡的并行執行,且支持自定義分區器,讓使用者可以完全掌握數據分發。實質就是將所有要處理的數據區分為多個部分,然后并行運 行這些串行循環。
修改上面的代碼:
System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1), range => { var aesM = new AesManaged(); Console.WriteLine("AES Range({0},{1} 循環開始時間:{2})",range.Item1,range.Item2,DateTime.Now.TimeOfDay); for (int i = range.Item1; i < range.Item2; i++) { aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); } Console.WriteLine("AES:"+sw.Elapsed.ToString()); });
從執行結果可以看出,分了13個段執行的。
第二次執行還是13個段。速度上稍微有差異。開始沒有指定分區數,Partitioner.Create使用的是內置默認值。
而且我們發現這些分區并不是同時執行的,大致是分了三個時間段執行。而且執行順序是不同的。總的時間和Parallel.For的方法差不多。
public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body)
Parallel.ForEach方法定義了source和Body兩個參數。source是指分區器。提供了分解為多個分區的數據源。body是 要調用的委托。它接受每一個已定義的分區作為參數。一共有20多個重載,在上面的例子中,分區的類型為Tuple<int,int>,是一個 二元組類型。此外,返回一個ParallelLoopResult的值。
Partitioner.Create 創建分區是根據邏輯內核數及其他因素決定。
public static OrderablePartitioner<Tuple<int, int>> Create(int fromInclusive, int toExclusive) { int num = 3; if (toExclusive <= fromInclusive) throw new ArgumentOutOfRangeException("toExclusive"); int rangeSize = (toExclusive - fromInclusive) / (PlatformHelper.ProcessorCount * num); if (rangeSize == 0) rangeSize = 1; return Partitioner.Create<Tuple<int, int>>(Partitioner.CreateRanges(fromInclusive, toExclusive, rangeSize), EnumerablePartitionerOptions.NoBuffering); }
因此我們可以修改分區數目,rangesize大致為250000左右。也就是說我的邏輯內核是4.
var rangesize = (int) (NUM_AES_KEYS/Environment.ProcessorCount) + 1;
System.Threading.Tasks.Parallel.ForEach(Partitioner.Create(1, NUM_AES_KEYS + 1,rangesize), range =>
再次執行:
分區變成了四個,時間上沒有多大差別(***個時間是串行時間)。我們看見這四個分區幾乎是同時執行的。大部分情況下,TPL在幕后使用的負載均衡機制都是非常高效的,然而對分區的控制便于使用者對自己的工作負載進行分析,來改進整體的性能。
Parallel.ForEach也能對IEnumerable<int>集合進行重構。Enumerable.Range生產了序列化的數目。但這樣就沒有上面的分區效果。
private static void ParallelForEachGenerateMD5HasHes() { var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), number => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); }); Console.WriteLine("MD5:"+sw.Elapsed.ToString()); }
六、從循環中退出
和串行運行中的break不同,ParallelLoopState 提供了兩個方法用于停止Parallel.For 和 Parallel.ForEach的執行。
Break:讓循環在執行了當前迭代后盡快停止執行。比如執行到100了,那么循環會處理掉所有小于100的迭代。
Stop:讓循環盡快停止執行。如果執行到了100的迭代,那不能保證處理完所有小于100的迭代。
修改上面的方法:執行3秒后退出。
private static void ParallelLoopResult(ParallelLoopResult loopResult) { string text; if (loopResult.IsCompleted) { text = "循環完成"; } else { if (loopResult.LowestBreakIteration.HasValue) { text = "Break終止"; } else { text = "Stop 終止"; } } Console.WriteLine(text); } private static void ParallelForEachGenerateMD5HasHesBreak() { var sw = Stopwatch.StartNew(); var loopresult= System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (int number,ParallelLoopState loopState) => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (sw.Elapsed.Seconds > 3) { loopState.Stop(); } }); ParallelLoopResult(loopresult); Console.WriteLine("MD5:" + sw.Elapsed); }
七、捕捉并行循環中發生的異常。
當并行迭代中調用的委托拋出異常,這個異常沒有在委托中被捕獲到時,就會變成一組異常,新的System.AggregateException負責處理這一組異常。
private static void ParallelForEachGenerateMD5HasHesException() { var sw = Stopwatch.StartNew(); var loopresult = new ParallelLoopResult(); try { loopresult = System.Threading.Tasks.Parallel.ForEach(Enumerable.Range(1, NUM_AES_KEYS), (number, loopState) => { var md5M = MD5.Create(); byte[] data = Encoding.Unicode.GetBytes(Environment.UserName + number); byte[] result = md5M.ComputeHash(data); string hexString = ConverToHexString(result); if (sw.Elapsed.Seconds > 3) { throw new TimeoutException("執行超過三秒"); } }); } catch (AggregateException ex) { foreach (var innerEx in ex.InnerExceptions) { Console.WriteLine(innerEx.ToString()); } } ParallelLoopResult(loopresult); Console.WriteLine("MD5:" + sw.Elapsed); }
結果:
異常出現了好幾次。
八、指定并行度。
TPL的方法總會試圖利用所有可用的邏輯內核來實現***的結果,但有時候你并不希望在并行循環中使用所有的內核。比如你需要留出一個不參與并行計算 的內核,來創建能夠響應用戶的應用程序,而且這個內核需要幫助你運行代碼中的其他部分。這個時候一種好的解決方法就是指定***并行度。
這需要創建一個ParallelOptions的實例,設置MaxDegreeOfParallelism的值。
private static void ParallelMaxDegree(int maxDegree) { var parallelOptions = new ParallelOptions(); parallelOptions.MaxDegreeOfParallelism = maxDegree; var sw = Stopwatch.StartNew(); System.Threading.Tasks.Parallel.For(1, NUM_AES_KEYS + 1, parallelOptions, (int i) => { var aesM = new AesManaged(); aesM.GenerateKey(); byte[] result = aesM.Key; string hexStr = ConverToHexString(result); }); Console.WriteLine("AES:" + sw.Elapsed.ToString()); }
調用:如果在四核微處理器上運行,那么將使用3個內核。
ParallelMaxDegree(Environment.ProcessorCount - 1);
時間上大致慢了點(第一次Parallel.For 3.18s),但可以騰出一個內核來處理其他的事情。
以上就是.Net中如何使用Parallel的全部內容了,更多與.Net中如何使用Parallel相關的內容可以搜索億速云之前的文章或者瀏覽下面的文章進行學習哈!相信小編會給大家增添更多知識,希望大家能夠支持一下億速云!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。