BlockingCollection<T>クラスを使うと、lock構文などを使うことなく、スレッドセーフなキューやスタックの操作を簡潔に記述できる。
キューやスタックは、マルチスレッドで利用したいことが多い。従来は、そのためにlock構文やMonitorクラス(System.Threading名前空間)などを使って、Queue<T>オブジェクト/Stack<T>オブジェクト(ともにSystem.Collections.Generic名前空間)などにアクセスするスレッド間の同期を取るための複雑なコーディングをしなければならなかった(.NET TIPS「キューを利用するには?[C#/VB]」を参照、以降「前記事」という)。
.NET Framework 4.0では、スレッドセーフなコードが簡潔に書けるジェネリックなコレクションが追加された。BlockingCollection<T>クラス(System.Collections.Concurrent名前空間)である。本稿では、BlockingCollection<T>クラスをマルチスレッドで使う方法を解説する。
特定のトピックをすぐに知りたいという方は以下のリンクを活用してほしい。
なお、本稿に掲載したサンプルコードをそのまま試すにはVisual Studio 2015以降が必要である。サンプルコードはコンソールアプリの一部であり、コードの冒頭に以下の宣言が必要となる。
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using static System.Console;
Imports System.Collections.Concurrent
Imports System.Console
BlockingCollection<T>クラスをインスタンス化するときに、その内部で利用するコレクションを指定できる(指定しなかった場合はキューになる)。また、コレクションに格納できるデータの最大個数もコンストラクタ引数で設定できる。例を次のコードに示す。
const int MAX_QUE = 3; // コレクションに保持する最大個数
// キュー(先入れ先出し)を作る
var que = new BlockingCollection<int>(MAX_QUE);
// または、
var que = new BlockingCollection<int>(
new ConcurrentQueue<int>(), MAX_QUE);
// スタック(後入れ先出し)を作る
var stack = new BlockingCollection<int>(
new ConcurrentStack<int>(), MAX_QUE);
// データバッグ(取り出し順序不定)を作る
var bag = new BlockingCollection<int>(
new ConcurrentBag<int>(), MAX_QUE);
Const MAX_QUE As Integer = 3 ' コレクションに保持する最大個数
' キュー(先入れ先出し)を作る
Dim que = New BlockingCollection(Of Integer)(MAX_QUE)
' または、
Dim que = New BlockingCollection(Of Integer)(
New ConcurrentQueue(Of Integer)(), MAX_QUE)
' スタック(後入れ先出し)を作る
Dim stack = New BlockingCollection(Of Integer)(
New ConcurrentStack(Of Integer)(), MAX_QUE)
' データバッグ(取り出し順序不定)を作る
Dim bag = New BlockingCollection(Of Integer)(
New ConcurrentBag(Of Integer)(), MAX_QUE)
BlockingCollection<T>クラスに対してマルチスレッドでデータの追加/取り出しを行うには、TryAddメソッド/TryTakeメソッドを使うのが基本となる(次のコード)。
int data = 1;
// キューへデータを投入する(ロック不要)
bool addSuccess = que.TryAdd(data, System.Threading.Timeout.Infinite);
// キューからデータを取り出す(ロック不要)
bool takeSuccess = que.TryTake(out data, System.Threading.Timeout.Infinite);
Dim data As Integer = 1
' キューへデータを投入する(ロック不要)
Dim addSuccess As Boolean = que.TryAdd(data, Threading.Timeout.Infinite)
' キューからデータを取り出す(ロック不要)
Dim takeSuccess As Boolean = que.TryTake(data, Threading.Timeout.Infinite)
また、データの取り出しは、次のコードのようにforeach(C#)/For Each(VB)ループでも可能だ。データが取り出せるまで(または処理が終わるまで)スレッドがブロックされて構わないなら、この方法がすっきりしていてよいだろう。
foreach (var data in que.GetConsumingEnumerable())
{
// 取り出したデータを使う
}
For Each dat In que.GetConsumingEnumerable()
' 取り出したデータを使う
Next
その他に、複数あるコレクションの中から適切なものを自動的に選んでデータの追加/取り出しを行ってくれるTryAddToAnyメソッド/TryTakeFromAnyメソッドというものも用意されている。
BlockingCollection<T>クラスへのデータの追加が完了したことを通知できる。
BlockingCollection<T>クラスのCompleteAddingメソッドを呼び出すと、直ちにIsAddingCompletedプロパティがtrueになり、それ以上はデータを追加できなくなる。その後でコレクションが空になると、今度はIsCompletedプロパティがtrueになり、ブロックされていたスレッドは(もしあれば)再開される(TryTakeメソッドはfalseを返し、GetConsumingEnumerableメソッドによるループは終了する)。
データを投入する側は、投入が完了したらCompleteAddingメソッドを呼び出すようにする。データを取り出す側は、IsCompletedプロパティをチェックして、trueに変わったら処理終了と判断すればよいわけだ。
なお、コレクションにデータが残っている状態でも処理を中止する必要があるときは、CancellationTokenを使うとよい。
BlockingCollection<T>クラスを使うときの注意事項となるが、使い終わったらDisposeメソッドを呼び出すべきである。List<T>クラスやDictionary<T>クラス、あるいはConcurrentQueue<T>クラスなど、他のコレクションでは不要だった処理なので注意してほしい。
可能ならusingステートメントを使うとよい。usingステートメントを抜けるときに自動的にDisposeメソッドが呼び出されるので、呼び出し忘れることがない。そうできないときは、try〜finallyを使ったり(例を後述)、プラットフォームのエラートラップ機構を使ったりして(「.NET TIPS:WPF:例外をまとめてトラップするには?[C#/VB]」参照)、確実にDisposeメソッドが呼び出されるようにしよう。
最後に、BlockingCollection<T>クラスを使ったProducer−Consumerパターンの実装例を紹介しておこう。前記事で実装するときの注意点を3つ挙げたが、BlockingCollection<T>クラスを使うと次のようにして解決される。
Producer−Consumerパターンの実装例を次のコードに示す。前記事のコードと見比べてみてほしい。とても簡潔なコードになっている。
const int MAX_QUE = 3; // キューに保持する最大個数
// キューを生成する
var que = new BlockingCollection<int>(MAX_QUE);
try // BlockingCollectionはDisposeしなければならないので、try〜finallyする
{
// Producerを定義して実行開始
var produceTask = Task.Run(async () =>
{
// 1から5まで順にデータを投入していく
for (int i = 1; i <= 5; i++)
{
// データ投入が間に合わない場合をシミュレート
if (i == 5)
{
WriteLine($"stop enqueue");
await Task.Delay(1000);
WriteLine($"restart enqueue");
}
// 投入データを作成する処理をシミュレート
await Task.Delay(10);
// キューへデータを投入する(ロック不要)
que.TryAdd(i, System.Threading.Timeout.Infinite);
// ↑Infinite(無限)指定は、キューが満杯のときは空くまでブロックされる
WriteLine($"Enqueue({i}), count={que.Count}");
}
que.CompleteAdding(); // データ投入完了を通知
WriteLine($"end enqueue");
});
// Consumerを定義(その1−TryTakeメソッドを使って取り出し)
Func<string, Task> consume1 = async (header) =>
{
while (!que.IsCompleted)
{
int data;
// キューからデータを取り出す(ロック不要)
bool success = que.TryTake(out data, System.Threading.Timeout.Infinite);
// ↑Infinite(無限)指定は、キューが空のとき、
// データが投入されるかque.IsCompletedがtrueになるまでブロックされる
if (success)
{
WriteLine($"{header} Dequeue({data}), count={que.Count}");
// キューから取り出したデータを使う処理をシミュレート
WriteLine($"{header} start process({data})");
await Task.Delay(300);
WriteLine($"{header} end process({data})");
}
}
WriteLine($"{header} EXIT");
};
// Consumerを定義(その2−GetConsumingEnumerableメソッドを使ってループ)
Func<string, Task> consume2 = async (header) =>
{
// キューからデータを取り出す(que.IsCompletedがtrueになるまで)
foreach (var data in que.GetConsumingEnumerable())
{
WriteLine($"{header} Dequeue({data}), count={que.Count}");
// キューから取り出したデータを使う処理をシミュレート
WriteLine($"{header} start process({data})");
await Task.Delay(300);
WriteLine($"{header} end process({data})");
}
WriteLine($"{header} EXIT");
};
System.Threading.Thread.Sleep(100);
// Consumerの実行を開始(2スレッド)
var consumeTask1 = Task.Run(() => consume1(" [1]"));
System.Threading.Thread.Sleep(10);
var consumeTask2 = Task.Run(() => consume2(" [2]"));
// 実行終了を待機
Task.WaitAll(produceTask, consumeTask1, consumeTask2);
}
finally
{
que.Dispose();
}
WriteLine("COMPLETED");
Const MAX_QUE As Integer = 3 ' キューに保持する最大個数
' キューを生成する
Dim que = New BlockingCollection(Of Integer)(MAX_QUE)
Try ' BlockingCollectionはDisposeしなければならないので、try〜finallyする
' Producerを定義して実行開始
Dim produceTask = Task.Run(
Async Function()
' 1から5まで順にデータを投入していく
For i As Integer = 1 To 5
' データ投入が間に合わない場合をシミュレート
If (i = 5) Then
WriteLine($"stop enqueue")
Await Task.Delay(1000)
WriteLine($"restart enqueue")
End If
' 投入データを作成する処理をシミュレート
Await Task.Delay(10)
' キューへデータを投入する(ロック不要)
que.TryAdd(i, Threading.Timeout.Infinite)
' ↑Infinite(無限)指定は、キューが満杯のときは空くまでブロックされる
WriteLine($"Enqueue({i}), count={que.Count}")
Next
que.CompleteAdding() ' データ投入完了を通知
WriteLine($"end enqueue")
End Function)
' Consumerを定義(その1−TryTakeメソッドを使って取り出し)
Dim consume1 As Func(Of String, Task) =
Async Function(header)
While (Not que.IsCompleted)
Dim data As Integer
' キューからデータを取り出す(ロック不要)
Dim success As Boolean = que.TryTake(data, Threading.Timeout.Infinite)
' ↑Infinite(無限)指定は、キューが空のとき、
' データが投入されるかque.IsCompletedがtrueになるまでブロックされる
If (success) Then
WriteLine($"{header} Dequeue({data}), count={que.Count}")
' キューから取り出したデータを使う処理をシミュレート
WriteLine($"{header} start process({data})")
Await Task.Delay(300)
WriteLine($"{header} end process({data})")
End If
End While
WriteLine($"{header} EXIT")
End Function
' Consumerを定義(その2−GetConsumingEnumerableメソッドを使ってループ)
Dim consume2 As Func(Of String, Task) =
Async Function(header)
' キューからデータを取り出す(que.IsCompletedがtrueになるまで)
For Each dat In que.GetConsumingEnumerable()
WriteLine($"{header} Dequeue({dat}), count={que.Count}")
' キューから取り出したデータを使う処理をシミュレート
WriteLine($"{header} start process({dat})")
Await Task.Delay(300)
WriteLine($"{header} end process({dat})")
Next
WriteLine($"{header} EXIT")
End Function
System.Threading.Thread.Sleep(100)
' Consumerの実行を開始(2スレッド)
Dim consumeTask1 = Task.Run(Function() consume1(" [1]"))
System.Threading.Thread.Sleep(10)
Dim consumeTask2 = Task.Run(Function() consume2(" [2]"))
' 実行終了を待機
Task.WaitAll(produceTask, consumeTask1, consumeTask2)
Finally
que.Dispose()
End Try
WriteLine("COMPLETED")
上のコードの実行例を次の画像に示す。
キューにもスタックにもなれるBlockingCollection<T>クラスは、スレッド間の排他制御を行ったり同期を取ったりする機能が組み込まれているため、マルチスレッドでも簡潔なコーディングができる。
利用可能バージョン:.NET Framework 4.0以降
カテゴリ:クラス・ライブラリ 処理対象:コレクション
カテゴリ:クラス・ライブラリ 処理対象:マルチスレッド
使用ライブラリ:BlockingCollectionクラス(System.Collections.Concurrent名前空間)
使用ライブラリ:ConcurrentQueueクラス(System.Collections.Concurrent名前空間)
関連TIPS:キューを利用するには?[C#/VB]
関連TIPS:構文:クラス名を書かずに静的メソッドを呼び出すには?[C# 6.0]
関連TIPS:VB.NETでクラス名を省略してメソッドや定数を利用するには?
関連TIPS:数値を右詰めや0埋めで文字列化するには?[C#、VB]
関連TIPS:Visual Studioでコンソール・アプリケーションのデバッグ実行時にコマンド・プロンプトを閉じないようにするには?
Copyright© Digital Advantage Corp. All Rights Reserved.