並列処理について③(PLINQ)
PLINQ
ParallelEnumerable.AsParallel()
AsParallel()を使用することで、LINQのクエリを並列実行することができる。
ただし、必ずしも並列実行されるわけでなく、AsParallel()にてパフォーマンス向上が判定された場合のみ。
上記を理解した上でコーディングしないと、場合によってはパフォーマンスの低下を招く可能性がある。
並列実行方法のコントロール
ParallelEnumerableには以下のようなメソッドが用意されており、並列実行方法をコントロールできる。
- AsOrdered() : 実行結果はインプットの順序を維持(パフォーマンスは落ちる)
- WithDegreeOfParallelism() : 並列処理される最大タスク数を指定
- WithExecutionMode() : 並列実行モードの指定(逐次/並列)
- AsSequential() : 実行順はインプットの順序を維持
- OrderBy()を並列実行後、その結果を逐次処理する場合などに有用
以下のコードを実行した場合、並列を強制して最大タスク数は4としているため、threadIdsの件数は1~4件になります。
itemsの件数が少ない場合、4タスクを並列実行する必要がないため、2件程度になります。
var items = Enumerable.Range(0, 1000000); var threadIds = new HashSet(); var lockObj = new object(); items .AsParallel() .WithDegreeOfParallelism(4) .WithExecutionMode(ParallelExecutionMode.ForceParallelism) .Select( i => { lock (lockObj) { threadIds.Add(Thread.CurrentThread.ManagedThreadId); } return i; }) .ToArray(); Console.WriteLine($"Thread : {string.Join(",", threadIds)}");
その他詳細はMSDNライブラリを参照
PLINQ の概要 | Microsoft Docs
例外
PLINQでの並列処理中の例外は、クエリ完了後にまとめてAggregateExceptionとしてスローされる。
try { var items = Enumerable.Range(0, 100); items .AsParallel() .Where( item => { throw new InvalidOperationException($"Parallel Error at {item}"); }) .ToArray(); } catch (AggregateException e) { foreach (var innerException in e.InnerExceptions) { Console.WriteLine(innerException.Message); } }
並列処理について②(Parallel.For/ForEachの管理)
ParallelLoopState
Prallel.For/ForEachにわたすAction/ラムダ式は、ParallelLoopStateを引数とするものも指定可能です。
ParallelLoopState.Stop()/Break()によって、並列ループの実行を終了させることができます。
上記メソッドを実行した場合、即時終了されるわけでなく、実行中の各ループ処理は終了が保証されます。
以下に違いをまとめます。
ParallelLoopState.Stop()
呼び出し以降、未実行のループ処理は一切処理されません。
以下コードを実行した場合、20以上のインデックスのループ処理が呼ばれたタイミングでループ処理が終了されます。
並列処理は順不同で実行されるため、Stop実行時点でも20以下のインデックスで未実行の処理が存在し得ます。
そういったものがある場合、processedの値は20未満となります。
int processed = 0; Parallel.For( 1, 100, (i, loopState) => { if (i > 20) { loopState.Stop(); } else { Thread.Sleep(10); Interlocked.Increment(ref processed); } }); Console.WriteLine($"Processed : {processed}");
ParallelLoopState.Break()
呼び出し時のループのインデックスをParallelLoopState.LowestBreakIterationに設定します。
その値より大きいインデックスのループ処理は実行されず、その値より小さいインデックスのループ処理は確実に処理されます。
以下コードを実行した場合、20以下のインデックスのループ処理は確実に処理されるため、processedの値は必ず20になります。
int processed = 0; Parallel.For( 1, 100, (i, loopState) => { if (i > 20) { loopState.Break(); } else { Thread.Sleep(10); Interlocked.Increment(ref processed); } }); Console.WriteLine($"Processed : {processed}");
ParallelLoopResult
Parallel.For/ForEachは戻り値としてParallelLoopResultを返します。
ParallelLoopResultは並列ループ処理の終了ステータスを保持します。
並列ループ処理が中断されずにすべて完了した場合、ParallelLoopResult.IsCompletedはTrueになります。
ParallelLoopState.Break()が呼び出された場合、ParallelLoopResult.LowestBreakIterationに中断時のインデックスが設定されます。
並列処理について①(Parallel)
並列処理のポイント
- タスクは特定のプロセッサに割り当てることはできない。空いたものに自動的に割り当てられる。
- 開発者はどのプロセスがアクティブなのか、プロセスの実行時間がどれだけなのか、処理がいつ完了するのかを仮定することはできない。
- .Netにおける並列処理はTPL(Task Parallel Library)によって提供される。
Parallel
Parallel.Invoke
Parallel.Invoke( () => Console.WriteLine("Execute 1"), () => Console.WriteLine("Execute 2"), () => Console.WriteLine("Execute 3"));
Parallel.ForEach
- foreachの並列実行版
- 第一引数にループ対象のIEnumerableオブジェクト(Listなど)
- 第二引数にListの各要素に対する処理を表すAction/ラムダ式
- 各Taskは開始順に処理されないことに注意
var array = new[] { 1, 2, 3 }; Parallel.ForEach( array, item => Console.WriteLine($"Execute {item}"));
Parallel.For
- forの並列実行版
- 第一引数にカウンタ初期値
- 第二引数にカウンタ最大値
- 第三引数にカウンタの値を受けるAction/ラムダ式
Parallel.For( 1, 3, counter => Console.WriteLine($"Execute {counter}"));