並列処理について③(PLINQ)

PLINQ

ParallelEnumerable.AsParallel()

AsParallel()を使用することで、LINQのクエリを並列実行することができる。
ただし、必ずしも並列実行されるわけでなく、AsParallel()にてパフォーマンス向上が判定された場合のみ。 
上記を理解した上でコーディングしないと、場合によってはパフォーマンスの低下を招く可能性がある。 

並列実行方法のコントロール

ParallelEnumerableには以下のようなメソッドが用意されており、並列実行方法をコントロールできる。

  • AsOrdered() : 実行結果はインプットの順序を維持(パフォーマンスは落ちる)
  • WithDegreeOfParallelism() : 並列処理される最大タスク数を指定
  • WithExecutionMode() : 並列実行モードの指定(逐次/並列)
  • AsSequential() : 実行順はインプットの順序を維持
    • OrderBy()を並列実行後、その結果を逐次処理する場合などに有用

以下のコードを実行した場合、並列を強制して最大タスク数は4としているため、threadIdsの件数は1~4件になります。
itemsの件数が少ない場合、4タスクを並列実行する必要がないため、2件程度になります。

var items = Enumerable.Range(0, 1000000);
var threadIds = new HashSet();
var lockObj = new object();
items
    .AsParallel()
    .WithDegreeOfParallelism(4)
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Select(
        i =>
        {
            lock (lockObj)
            {
                threadIds.Add(Thread.CurrentThread.ManagedThreadId);
            }
            return i;
        })
    .ToArray();

Console.WriteLine($"Thread : {string.Join(",", threadIds)}");

その他詳細はMSDNライブラリを参照
PLINQ の概要 | Microsoft Docs

例外

PLINQでの並列処理中の例外は、クエリ完了後にまとめてAggregateExceptionとしてスローされる。

try
{
    var items = Enumerable.Range(0, 100);
    items
        .AsParallel()
        .Where(
            item =>
            {
                throw new InvalidOperationException($"Parallel Error at {item}");
            })
        .ToArray();
}
catch (AggregateException e)
{
    foreach (var innerException in e.InnerExceptions)
    {
        Console.WriteLine(innerException.Message);
    }
}

並列処理について②(Parallel.For/ForEachの管理)

ParallelLoopState

Prallel.For/ForEachにわたすAction/ラムダ式は、ParallelLoopStateを引数とするものも指定可能です。
ParallelLoopState.Stop()/Break()によって、並列ループの実行を終了させることができます。
上記メソッドを実行した場合、即時終了されるわけでなく、実行中の各ループ処理は終了が保証されます。
以下に違いをまとめます。

ParallelLoopState.Stop()

呼び出し以降、未実行のループ処理は一切処理されません。
以下コードを実行した場合、20以上のインデックスのループ処理が呼ばれたタイミングでループ処理が終了されます。
並列処理は順不同で実行されるため、Stop実行時点でも20以下のインデックスで未実行の処理が存在し得ます。
そういったものがある場合、processedの値は20未満となります。

int processed = 0;
Parallel.For(
    1,
    100,
    (i, loopState) =>
    {
        if (i > 20)
        {
            loopState.Stop();
        }
        else
        {
            Thread.Sleep(10);
            Interlocked.Increment(ref processed);
        }
    });

Console.WriteLine($"Processed : {processed}");

ParallelLoopState.Break()

呼び出し時のループのインデックスをParallelLoopState.LowestBreakIterationに設定します。
その値より大きいインデックスのループ処理は実行されず、その値より小さいインデックスのループ処理は確実に処理されます。
以下コードを実行した場合、20以下のインデックスのループ処理は確実に処理されるため、processedの値は必ず20になります。

int processed = 0;
Parallel.For(
1,
100,
(i, loopState) =>
{
    if (i > 20)
    {
        loopState.Break();
    }
    else
    {
        Thread.Sleep(10);
        Interlocked.Increment(ref processed);
    }
});

Console.WriteLine($"Processed : {processed}");

ParallelLoopResult

Parallel.For/ForEachは戻り値としてParallelLoopResultを返します。
ParallelLoopResultは並列ループ処理の終了ステータスを保持します。
並列ループ処理が中断されずにすべて完了した場合、ParallelLoopResult.IsCompletedはTrueになります。
ParallelLoopState.Break()が呼び出された場合、ParallelLoopResult.LowestBreakIterationに中断時のインデックスが設定されます。

並列処理について①(Parallel)

並列処理のポイント

  • タスクは特定のプロセッサに割り当てることはできない。空いたものに自動的に割り当てられる。
  • 開発者はどのプロセスがアクティブなのか、プロセスの実行時間がどれだけなのか、処理がいつ完了するのかを仮定することはできない。
  • .Netにおける並列処理はTPL(Task Parallel Library)によって提供される。

Parallel

Parallel.Invoke

  • 引数に複数のAction/ラムダ式を受け、それぞれに対してTaskを生成する
  • 各Taskの処理順、プロセッサへの割当はコントロール不可
Parallel.Invoke(
    () => Console.WriteLine("Execute 1"),
    () => Console.WriteLine("Execute 2"),
    () => Console.WriteLine("Execute 3"));

Parallel.ForEach

  • foreachの並列実行版
  • 第一引数にループ対象のIEnumerableオブジェクト(Listなど)
  • 第二引数にListの各要素に対する処理を表すAction/ラムダ式
  • 各Taskは開始順に処理されないことに注意
var array = new[] { 1, 2, 3 };
Parallel.ForEach(
    array,
    item => Console.WriteLine($"Execute {item}"));

Parallel.For

  • forの並列実行版
  • 第一引数にカウンタ初期値
  • 第二引数にカウンタ最大値
  • 第三引数にカウンタの値を受けるAction/ラムダ式
Parallel.For(
    1,
    3,
    counter => Console.WriteLine($"Execute {counter}"));