並列処理について③(PLINQ)

PLINQ

ParallelEnumerable.AsParallel()

AsParallel()を使用することで、LINQのクエリを並列実行することができる。
ただし、必ずしも並列実行されるわけでなく、AsParallel()にてパフォーマンス向上が判定された場合のみ。 
上記を理解した上でコーディングしないと、場合によってはパフォーマンスの低下を招く可能性がある。 

並列実行方法のコントロール

ParallelEnumerableには以下のようなメソッドが用意されており、並列実行方法をコントロールできる。

  • AsOrdered() : 実行結果はインプットの順序を維持(パフォーマンスは落ちる)
  • WithDegreeOfParallelism() : 並列処理される最大タスク数を指定
  • WithExecutionMode() : 並列実行モードの指定(逐次/並列)
  • AsSequential() : 実行順はインプットの順序を維持
    • OrderBy()を並列実行後、その結果を逐次処理する場合などに有用

以下のコードを実行した場合、並列を強制して最大タスク数は4としているため、threadIdsの件数は1~4件になります。
itemsの件数が少ない場合、4タスクを並列実行する必要がないため、2件程度になります。

var items = Enumerable.Range(0, 1000000);
var threadIds = new HashSet();
var lockObj = new object();
items
    .AsParallel()
    .WithDegreeOfParallelism(4)
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Select(
        i =>
        {
            lock (lockObj)
            {
                threadIds.Add(Thread.CurrentThread.ManagedThreadId);
            }
            return i;
        })
    .ToArray();

Console.WriteLine($"Thread : {string.Join(",", threadIds)}");

その他詳細はMSDNライブラリを参照
PLINQ の概要 | Microsoft Docs

例外

PLINQでの並列処理中の例外は、クエリ完了後にまとめてAggregateExceptionとしてスローされる。

try
{
    var items = Enumerable.Range(0, 100);
    items
        .AsParallel()
        .Where(
            item =>
            {
                throw new InvalidOperationException($"Parallel Error at {item}");
            })
        .ToArray();
}
catch (AggregateException e)
{
    foreach (var innerException in e.InnerExceptions)
    {
        Console.WriteLine(innerException.Message);
    }
}