マルチスレッドでキューやスタックなどを利用するには?[.NET 4.0以降、C#/VB].NET TIPS

BlockingCollection<T>クラスを使うと、lock構文などを使うことなく、スレッドセーフなキューやスタックの操作を簡潔に記述できる。

» 2018年02月07日 05時00分 公開
[山本康彦BluewaterSoft/Microsoft MVP for Windows Development]
「.NET TIPS」のインデックス

連載「.NET TIPS」

 キューやスタックは、マルチスレッドで利用したいことが多い。従来は、そのためにlock構文やMonitorクラス(System.Threading名前空間)などを使って、Queue<T>オブジェクト/Stack<T>オブジェクト(ともにSystem.Collections.Generic名前空間)などにアクセスするスレッド間の同期を取るための複雑なコーディングをしなければならなかった(.NET TIPS「キューを利用するには?[C#/VB]」を参照、以降「前記事」という)。

 .NET Framework 4.0では、スレッドセーフなコードが簡潔に書けるジェネリックなコレクションが追加された。BlockingCollection<T>クラス(System.Collections.Concurrent名前空間)である。本稿では、BlockingCollection<T>クラスをマルチスレッドで使う方法を解説する。

POINT BlockingCollection<T>クラスの基本的な使い方

BlockingCollection&lt;T&gt;クラスの基本的な使い方まとめ BlockingCollection<T>クラスの基本的な使い方まとめ
コード例はC#のみ示してあるが、VBでも同様である。


 特定のトピックをすぐに知りたいという方は以下のリンクを活用してほしい。

 なお、本稿に掲載したサンプルコードをそのまま試すにはVisual Studio 2015以降が必要である。サンプルコードはコンソールアプリの一部であり、コードの冒頭に以下の宣言が必要となる。

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using static System.Console;

Imports System.Collections.Concurrent
Imports System.Console

本稿のサンプルコードに必要な宣言(上:C#、下:VB)

BlockingCollectionのインスタンスを作るには?

 BlockingCollection<T>クラスをインスタンス化するときに、その内部で利用するコレクションを指定できる(指定しなかった場合はキューになる)。また、コレクションに格納できるデータの最大個数もコンストラクタ引数で設定できる。例を次のコードに示す。

const int MAX_QUE = 3; // コレクションに保持する最大個数

// キュー(先入れ先出し)を作る
var que = new BlockingCollection<int>(MAX_QUE);
// または、
var que = new BlockingCollection<int>(
                new ConcurrentQueue<int>(), MAX_QUE);

// スタック(後入れ先出し)を作る
var stack = new BlockingCollection<int>(
                  new ConcurrentStack<int>(), MAX_QUE);

// データバッグ(取り出し順序不定)を作る
var bag = new BlockingCollection<int>(
                new ConcurrentBag<int>(), MAX_QUE);

Const MAX_QUE As Integer = 3 ' コレクションに保持する最大個数

' キュー(先入れ先出し)を作る
Dim que = New BlockingCollection(Of Integer)(MAX_QUE)
' または、
Dim que = New BlockingCollection(Of Integer)(
                New ConcurrentQueue(Of Integer)(), MAX_QUE)

' スタック(後入れ先出し)を作る
Dim stack = New BlockingCollection(Of Integer)(
                  New ConcurrentStack(Of Integer)(), MAX_QUE)

' データバッグ(取り出し順序不定)を作る
Dim bag = New BlockingCollection(Of Integer)(
                New ConcurrentBag(Of Integer)(), MAX_QUE)

BlockingCollection<T>クラスのインスタンスを作る例(上:C#、下:VB)
コンストラクタ引数に、内部的に利用するコレクションのインスタンスと、コレクションに格納するデータの最大個数を渡すのが基本だ。コレクションのインスタンスを渡さなかった場合には、内部的にConcurrentQueue<T>クラス(System.Collections.Concurrent名前空間)のインスタンスが生成される。最大個数を省略した場合は、際限なくデータを格納する(メモリを使い切ってしまう可能性がある)。
コンストラクタ引数に渡せるのはIProducerConsumerCollection<T>インタフェース(System.Collections.Concurrent名前空間)である。それを実装しているクラスには、ここに示したようにConcurrentQueue<T>クラス/ConcurrentStack<T>クラス/ConcurrentBag<T>クラス(いずれもSystem.Collections.Concurrent名前空間)の3種類がある(本稿執筆時点)。
余談だが、「IProducerConsumerCollection<T>インタフェース」という名前にも注目してほしい。BlockingCollection<T>クラスは、Producer−Consumerパターン(前記事参照)を想定しているということなのだろう。

データの追加/取り出しを行うには?

 BlockingCollection<T>クラスに対してマルチスレッドでデータの追加/取り出しを行うには、TryAddメソッド/TryTakeメソッドを使うのが基本となる(次のコード)。

int data = 1;

// キューへデータを投入する(ロック不要)
bool addSuccess = que.TryAdd(data, System.Threading.Timeout.Infinite);

// キューからデータを取り出す(ロック不要)
bool takeSuccess = que.TryTake(out data, System.Threading.Timeout.Infinite);

Dim data As Integer = 1

' キューへデータを投入する(ロック不要)
Dim addSuccess As Boolean = que.TryAdd(data, Threading.Timeout.Infinite)

' キューからデータを取り出す(ロック不要)
Dim takeSuccess As Boolean = que.TryTake(data, Threading.Timeout.Infinite)

BlockingCollectionに対してデータの追加/取り出しを行う例(上:C#、下:VB)
BlockingCollection<T>クラスのTryAddメソッド/TryTakeメソッドは、どちらも成功するとtrueを返す。
TryAddメソッドの第1引数は、投入するデータである。
TryTakeメソッドの第1引数は、取り出したデータが格納される変数である。
第2引数は、メソッドがこのスレッドをブロックする最大時間(単位:ミリ秒)である。この例のようにInfinite(無限)を指定した場合は、成功するまで(または処理が終わるまで)ブロックされ続ける。追加/取り出しをできなかったときに他の処理を実行したいのであれば、第2引数には短い時間を指定する(または、第2引数をなしにする)。

 また、データの取り出しは、次のコードのようにforeach(C#)/For Each(VB)ループでも可能だ。データが取り出せるまで(または処理が終わるまで)スレッドがブロックされて構わないなら、この方法がすっきりしていてよいだろう。

foreach (var data in que.GetConsumingEnumerable())
{
  // 取り出したデータを使う
}

For Each dat In que.GetConsumingEnumerable()
  ' 取り出したデータを使う
Next

ループを使ってBlockingCollectionからデータを取り出す例(上:C#、下:VB)
BlockingCollection<T>クラスのGetConsumingEnumerableメソッドで得られるIEnumerable<T>型はちょっと特殊で、foreach(C#)/For Each(VB)の各ループに与えられるデータはコレクションから取り出されたものだ(コレクションからは削除される)。
処理中にコレクションが空になると、このループはブロックされる。
次に説明する完了通知が行われた後でコレクションが空になると、このループは終了する。

 その他に、複数あるコレクションの中から適切なものを自動的に選んでデータの追加/取り出しを行ってくれるTryAddToAnyメソッド/TryTakeFromAnyメソッドというものも用意されている。

完了を通知するには?

 BlockingCollection<T>クラスへのデータの追加が完了したことを通知できる。

 BlockingCollection<T>クラスのCompleteAddingメソッドを呼び出すと、直ちにIsAddingCompletedプロパティがtrueになり、それ以上はデータを追加できなくなる。その後でコレクションが空になると、今度はIsCompletedプロパティがtrueになり、ブロックされていたスレッドは(もしあれば)再開される(TryTakeメソッドはfalseを返し、GetConsumingEnumerableメソッドによるループは終了する)。

 データを投入する側は、投入が完了したらCompleteAddingメソッドを呼び出すようにする。データを取り出す側は、IsCompletedプロパティをチェックして、trueに変わったら処理終了と判断すればよいわけだ。

 なお、コレクションにデータが残っている状態でも処理を中止する必要があるときは、CancellationTokenを使うとよい。

使い終わったときには?

 BlockingCollection<T>クラスを使うときの注意事項となるが、使い終わったらDisposeメソッドを呼び出すべきである。List<T>クラスやDictionary<T>クラス、あるいはConcurrentQueue<T>クラスなど、他のコレクションでは不要だった処理なので注意してほしい。

 可能ならusingステートメントを使うとよい。usingステートメントを抜けるときに自動的にDisposeメソッドが呼び出されるので、呼び出し忘れることがない。そうできないときは、try〜finallyを使ったり(例を後述)、プラットフォームのエラートラップ機構を使ったりして(「.NET TIPS:WPF:例外をまとめてトラップするには?[C#/VB]」参照)、確実にDisposeメソッドが呼び出されるようにしよう。

Producer−Consumerパターンを実装するには?

 最後に、BlockingCollection<T>クラスを使ったProducer−Consumerパターンの実装例を紹介しておこう。前記事で実装するときの注意点を3つ挙げたが、BlockingCollection<T>クラスを使うと次のようにして解決される。

  • キューに対するアクセスを排他にする:TryAddメソッド/TryTakeメソッドを使う。データの取り出しは、GetConsumingEnumerableメソッドを用いたforeach(C#)/For Each(VB)ループで行ってもよい。
  • キューのサイズを制限する:BlockingCollection<T>クラスをインスタンス化するときに最大サイズを指定すればよい。キューが満杯のときにTryAddメソッドを呼び出すと、falseが返されるか、(タイムアウトを指定した場合は)ブロックされる。
  • 生産または消費の速度が間に合わないときは他方の処理を一時的に止める:間に合っていないときは、他方のスレッドでTryAddメソッド/TryTakeメソッドがfalseを返してくる。または、タイムアウトにInfiniteを指定すれば完全にブロックされる。GetConsumingEnumerableメソッドを用いたループは、ブロックされる。

 Producer−Consumerパターンの実装例を次のコードに示す。前記事のコードと見比べてみてほしい。とても簡潔なコードになっている。

const int MAX_QUE = 3; // キューに保持する最大個数

// キューを生成する
var que = new BlockingCollection<int>(MAX_QUE);

try // BlockingCollectionはDisposeしなければならないので、try〜finallyする
{
  // Producerを定義して実行開始
  var produceTask = Task.Run(async () =>
  {
    // 1から5まで順にデータを投入していく
    for (int i = 1; i <= 5; i++)
    {
      // データ投入が間に合わない場合をシミュレート
      if (i == 5)
      {
        WriteLine($"stop enqueue");
        await Task.Delay(1000);
        WriteLine($"restart enqueue");
      }

      // 投入データを作成する処理をシミュレート
      await Task.Delay(10);

      // キューへデータを投入する(ロック不要)
      que.TryAdd(i, System.Threading.Timeout.Infinite);
      // ↑Infinite(無限)指定は、キューが満杯のときは空くまでブロックされる
      WriteLine($"Enqueue({i}), count={que.Count}");
    }
    que.CompleteAdding(); // データ投入完了を通知
    WriteLine($"end enqueue");
  });

  // Consumerを定義(その1−TryTakeメソッドを使って取り出し)
  Func<string, Task> consume1 = async (header) =>
  {
    while (!que.IsCompleted)
    {
      int data;
      // キューからデータを取り出す(ロック不要)
      bool success = que.TryTake(out data, System.Threading.Timeout.Infinite);
      // ↑Infinite(無限)指定は、キューが空のとき、
      // データが投入されるかque.IsCompletedがtrueになるまでブロックされる
      if (success)
      {
        WriteLine($"{header} Dequeue({data}), count={que.Count}");

        // キューから取り出したデータを使う処理をシミュレート
        WriteLine($"{header} start process({data})");
        await Task.Delay(300);
        WriteLine($"{header} end process({data})");
      }
    }
    WriteLine($"{header} EXIT");
  };

  // Consumerを定義(その2−GetConsumingEnumerableメソッドを使ってループ)
  Func<string, Task> consume2 = async (header) =>
  {
    // キューからデータを取り出す(que.IsCompletedがtrueになるまで)
    foreach (var data in que.GetConsumingEnumerable())
    {
      WriteLine($"{header} Dequeue({data}), count={que.Count}");

      // キューから取り出したデータを使う処理をシミュレート
      WriteLine($"{header} start process({data})");
      await Task.Delay(300);
      WriteLine($"{header} end process({data})");
    }
    WriteLine($"{header} EXIT");
  };

  System.Threading.Thread.Sleep(100);

  // Consumerの実行を開始(2スレッド)
  var consumeTask1 = Task.Run(() => consume1("  [1]"));
  System.Threading.Thread.Sleep(10);
  var consumeTask2 = Task.Run(() => consume2("    [2]"));

  // 実行終了を待機
  Task.WaitAll(produceTask, consumeTask1, consumeTask2);
}
finally
{
  que.Dispose();
}
WriteLine("COMPLETED");

Const MAX_QUE As Integer = 3 ' キューに保持する最大個数

' キューを生成する
Dim que = New BlockingCollection(Of Integer)(MAX_QUE)

Try ' BlockingCollectionはDisposeしなければならないので、try〜finallyする

  ' Producerを定義して実行開始
  Dim produceTask = Task.Run(
  Async Function()
    ' 1から5まで順にデータを投入していく
    For i As Integer = 1 To 5
      ' データ投入が間に合わない場合をシミュレート
      If (i = 5) Then
        WriteLine($"stop enqueue")
        Await Task.Delay(1000)
        WriteLine($"restart enqueue")
      End If

      ' 投入データを作成する処理をシミュレート
      Await Task.Delay(10)

      ' キューへデータを投入する(ロック不要)
      que.TryAdd(i, Threading.Timeout.Infinite)
      ' ↑Infinite(無限)指定は、キューが満杯のときは空くまでブロックされる
      WriteLine($"Enqueue({i}), count={que.Count}")
    Next
    que.CompleteAdding() ' データ投入完了を通知
    WriteLine($"end enqueue")
  End Function)

  ' Consumerを定義(その1−TryTakeメソッドを使って取り出し)
  Dim consume1 As Func(Of String, Task) =
  Async Function(header)
    While (Not que.IsCompleted)
      Dim data As Integer
      ' キューからデータを取り出す(ロック不要)
      Dim success As Boolean = que.TryTake(data, Threading.Timeout.Infinite)
      ' ↑Infinite(無限)指定は、キューが空のとき、
      ' データが投入されるかque.IsCompletedがtrueになるまでブロックされる
      If (success) Then
        WriteLine($"{header} Dequeue({data}), count={que.Count}")

        ' キューから取り出したデータを使う処理をシミュレート
        WriteLine($"{header} start process({data})")
        Await Task.Delay(300)
        WriteLine($"{header} end process({data})")
      End If
    End While
    WriteLine($"{header} EXIT")
  End Function

  ' Consumerを定義(その2−GetConsumingEnumerableメソッドを使ってループ)
  Dim consume2 As Func(Of String, Task) =
  Async Function(header)
    ' キューからデータを取り出す(que.IsCompletedがtrueになるまで)
    For Each dat In que.GetConsumingEnumerable()
      WriteLine($"{header} Dequeue({dat}), count={que.Count}")

      ' キューから取り出したデータを使う処理をシミュレート
      WriteLine($"{header} start process({dat})")
      Await Task.Delay(300)
      WriteLine($"{header} end process({dat})")
    Next
    WriteLine($"{header} EXIT")
  End Function

  System.Threading.Thread.Sleep(100)

  ' Consumerの実行を開始(2スレッド)
  Dim consumeTask1 = Task.Run(Function() consume1("  [1]"))
  System.Threading.Thread.Sleep(10)
  Dim consumeTask2 = Task.Run(Function() consume2("    [2]"))

  ' 実行終了を待機
  Task.WaitAll(produceTask, consumeTask1, consumeTask2)

Finally
  que.Dispose()
End Try
WriteLine("COMPLETED")

BlockingCollectionでProducer−Consumerパターンを実装した例(上:C#、下:VB)
動作確認のために入れたコンソール出力のコードが多いが、データの追加と取り出しの部分に注目してほしい。Producer/Consumerそれぞれのコードは、スレッド間の排他制御を行ったり同期を取ったりするコードがBlockingCollectionの内部に隠蔽(いんぺい)され、前記事のコードに比べてとても簡潔になった。タイムアウトの指定がなければ、まるでシングルスレッド用のコードのようだ。

 上のコードの実行例を次の画像に示す。

Producer−Consumerパターンの実行例 Producer−Consumerパターンの実行例
インデントなしの出力がProducer、1段インデントした出力は1つ目のConsumer、そして2段インデントした出力が2つ目のConsumerからのものである。
出力を見ると、データの投入や処理が間に合わないときに他方のスレッドが止まっているのが分かる。このようにして、複数のProducer(データ生成)とConsumer(データ処理)のスレッドが協調しながら可能な限り効率よく処理をしていく。

まとめ

 キューにもスタックにもなれるBlockingCollection<T>クラスは、スレッド間の排他制御を行ったり同期を取ったりする機能が組み込まれているため、マルチスレッドでも簡潔なコーディングができる。

利用可能バージョン:.NET Framework 4.0以降
カテゴリ:クラス・ライブラリ 処理対象:コレクション
カテゴリ:クラス・ライブラリ 処理対象:マルチスレッド
使用ライブラリ:BlockingCollectionクラス(System.Collections.Concurrent名前空間)
使用ライブラリ:ConcurrentQueueクラス(System.Collections.Concurrent名前空間)
関連TIPS:キューを利用するには?[C#/VB]
関連TIPS:構文:クラス名を書かずに静的メソッドを呼び出すには?[C# 6.0]
関連TIPS:VB.NETでクラス名を省略してメソッドや定数を利用するには?
関連TIPS:数値を右詰めや0埋めで文字列化するには?[C#、VB]
関連TIPS:Visual Studioでコンソール・アプリケーションのデバッグ実行時にコマンド・プロンプトを閉じないようにするには?


「.NET TIPS」のインデックス

.NET TIPS

Copyright© Digital Advantage Corp. All Rights Reserved.

スポンサーからのお知らせPR

注目のテーマ

AI for エンジニアリング
「サプライチェーン攻撃」対策
1P情シスのための脆弱性管理/対策の現実解
OSSのサプライチェーン管理、取るべきアクションとは
Microsoft & Windows最前線2024
システム開発ノウハウ 【発注ナビ】PR
あなたにおすすめの記事PR

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。