.NET Frameworkが提供するQueue<T>クラスの基本的な使い方と、マルチスレッドで同期を取りながらQueue<T>クラスを利用する方法を解説する。
キュー(待ち行列)とは入れたものが入れた順に出てくるコレクションのことだ。「先入れ先出し」(FIFO、"First In, First Out")のコレクションともいう。キューを実現するには、配列やList<T>コレクション(System.Collections.Generic名前空間)などを使って実装することも可能だが、.NET Framework 2.0以降ではQueue<T>コレクション(System.Collections.Generic名前空間)を利用するとよい。
本稿では、Queue<T>コレクションの基本的な使い方とマルチスレッドに対応する方法を解説する。
特定のトピックをすぐに知りたいという方は以下のリンクを活用してほしい。
なお、本稿に掲載したサンプルコードをそのまま試すにはVisual Studio 2015以降が必要である。サンプルコードはコンソールアプリの一部であり、コードの冒頭に以下の宣言が必要となる。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using static System.Console;
Imports System.Console
キューに対する基本的な操作は、「キューにデータを入れる」(enqueue)/「キューからデータを取り出す」(dequeue)/「次のデータを見る」(peek)の3つだ。Queue<T>コレクションは、それらを次の3つのメソッドとして実装している。
Enqueueメソッドでデータをキューの末尾に追加するとき、キューに空きがなければ自動的にキューのサイズが拡張される。Dequeueメソッドで先頭のデータを取り出すと、残りのデータは先頭に向かって1つずつ移動する。
この他に、Queue<T>コレクションにはキューに入っているデータの数を返すCountプロパティもある。キューにデータが入っていないときにDequeue/Peekメソッドを呼び出すと例外が出るので、呼び出す前にCountプロパティをチェックすべきである。また、Queue<T>コレクションはIEnumerable<T>インタフェース(System.Collections.Generic名前空間)を実装しているので、LINQ拡張メソッドも利用できる。
Queue<T>コレクションの基本的な使い方の例を示す(次のコード)。
static void Main(string[] args)
{
// 整数を格納するキュー
var que = new Queue<int>();
// キューに1から3まで順に投入
for (int i = 1; i < 4; i++)
que.Enqueue(i);
// キューから全てを取り出す
while (que.Count > 0)
{
WriteLine($"キューの先頭を見る:{que.Peek()}");
WriteLine($"キューに入っている個数:{que.Count}");
int n = que.Dequeue();
WriteLine($"キューから取り出し:{n}");
WriteLine($"キューに入っている個数:{que.Count}");
}
// 出力:
// キューの先頭を見る:1
// キューに入っている個数:3
// キューから取り出し:1
// キューに入っている個数:2
// キューの先頭を見る:2
// キューに入っている個数:2
// キューから取り出し:2
// キューに入っている個数:1
// キューの先頭を見る:3
// キューに入っている個数:1
// キューから取り出し:3
// キューに入っている個数:0
#if DEBUG
ReadKey();
#endif
}
Sub Main()
' 整数を格納するキュー
Dim que = New Queue(Of Integer)()
' キューに1から3まで順に投入
For i As Integer = 1 To 3
que.Enqueue(i)
Next
' キューから全てを取り出す
While (que.Count > 0)
WriteLine($"キューの先頭を見る:{que.Peek()}")
WriteLine($"キューに入っている個数:{que.Count}")
Dim n As Integer = que.Dequeue()
WriteLine($"キューから取り出し:{n}")
WriteLine($"キューに入っている個数:{que.Count}")
End While
' 出力:
' キューの先頭を見る:1
' キューに入っている個数:3
' キューから取り出し:1
' キューに入っている個数:2
' キューの先頭を見る:2
' キューに入っている個数:2
' キューから取り出し:2
' キューに入っている個数:1
' キューの先頭を見る:3
' キューに入っている個数:1
' キューから取り出し:3
' キューに入っている個数:0
#If DEBUG Then
ReadKey()
#End If
End Sub
Queue<T>コレクションのIEnumerable<T>インタフェースとLINQ拡張メソッドを利用する例を示す(次のコード)。Queue<T>コレクションには最後の要素を読み取るメソッドは用意されていないが、LINQのLast拡張メソッドを使えばよいのである。
// 整数を格納するキュー
var que = new Queue<int>();
// キューに1から3まで順に投入
for (int i = 1; i < 4; i++)
que.Enqueue(i);
WriteLine($"IEnumerable<int>:キューの内容:{string.Join(", ", que)}");
// 出力:IEnumerable<int>:キューの内容:1, 2, 3
WriteLine($"LINQ:キューの最後の要素:{que.Last()}");
// 出力:LINQ:キューの最後の要素:3
WriteLine($"LINQ:キューの合計:{que.Sum()}");
// 出力:LINQ:キューの合計:6
WriteLine($"LINQ:奇数の個数:{que.Count(n => (n % 2 != 0))}");
// 出力:LINQ:奇数の個数:2
' 整数を格納するキュー
Dim que = New Queue(Of Integer)()
' キューに1から3まで順に投入
For i As Integer = 1 To 3
que.Enqueue(i)
Next
WriteLine($"IEnumerable<int>:キューの内容:{String.Join(", ", que)}")
' 出力:IEnumerable<int>:キューの内容:1, 2, 3
WriteLine($"LINQ:キューの最後の要素:{que.Last()}")
' 出力:LINQ:キューの最後の要素:3
WriteLine($"LINQ:キューの合計:{que.Sum()}")
' 出力:LINQ:キューの合計:6
WriteLine($"LINQ:奇数の個数:{que.AsEnumerable().Count(Function(n) n Mod 2 <> 0)}")
' 出力:LINQ:奇数の個数:2
Queue<T>コレクションはスレッドセーフではない。非同期/並列処理でQueue<T>コレクションにアクセスするにはスレッド間の排他処理が必要になる。
.NET Framework 4.0以降であれば、スレッドセーフなConcurrentQueue<T>コレクションやBlockingCollection<T>コレクション(ともにSystem.Collections.Concurrent名前空間)が用意されているので、Queueコレクションではなくそちらを使うべきである。以下、.NET Framework 4.0未満の場合にQueueコレクションでマルチスレッドに対応する方法を説明しよう。
キューはマルチスレッドで使われることが多い。有名なものに「Producer−Consumerパターン」(生産者−消費者パターン)がある。Producerが処理すべきデータを作ってどんどんキューに投入していく。一方、時間がかかるデータの処理は、別の複数のスレッドで動作しているConsumerがキューからデータを取り出して行うというものだ。
Producer−Consumerパターンを実装するには、次のような注意点がある。
ここでは、スレッド間の排他にはlock構文を使おう(Monitorクラスを使うコードにコンパイルされる)。また、処理を一時的に止めるにはMonitorクラス(System.Threading名前空間)のWaitメソッド/PulseAllメソッドを使うことにする。Queueコレクションはサイズ固定にはできないが、インスタンス化時にキューのサイズを設定し、以降はそれを超えないようにコーディングしよう。
Queue<T>コレクションを使ってそのようなProducer−Consumerパターンを実装した例を示す(次のコード)。全体を簡潔に書くためにTPL(タスク並列ライブラリ)を使っているが、Producer/Consumerのそれぞれの処理は.NET Framework 2.0の範囲内で作成してある。
const int MAX_QUE = 3; // キューに保持する最大個数
var que = new Queue<int>(MAX_QUE);
bool runningFlag = true; // falseにした後、キューが空になるとConsumerが止まる
// Producerを定義して実行開始
var produceTask = Task.Run(() => {
// 1から5まで順にデータを投入していく
for (int i = 1; i <= 5; i++)
{
// データ投入が間に合わない場合をシミュレート
if (i == 5)
{
WriteLine($"stop enqueue");
System.Threading.Thread.Sleep(1000);
WriteLine($"restart enqueue");
}
// 投入データを作成する処理をシミュレート
System.Threading.Thread.Sleep(10);
// キューへデータを投入する
lock (que) // 排他ロックを取得
{
while (que.Count >= MAX_QUE)
{
// MAX_QUEを超えないように投入を待機する
WriteLine($"wait enqueue");
System.Threading.Monitor.Wait(que); // 待機する(他でMonitor.PulseAllされると再開)
}
que.Enqueue(i);
WriteLine($"Enqueue({i}), count={que.Count}");
System.Threading.Monitor.PulseAll(que); // 待機中のスレッドがあったら、再開させる
} // 排他ロックを解放
}
runningFlag = false; // データ投入完了を通知
WriteLine($"end enqueue");
});
// Consumerを定義
Action<string> consume = (header) => {
while (true)
{
int? data = null;
lock (que) // 排他ロックを取得
{
while (que.Count == 0 && runningFlag)
{
// キューが空の間は待機
WriteLine($"{header} wait dequeue");
System.Threading.Monitor.Wait(que); // 待機する(他でMonitor.PulseAllされると再開)
}
// キューからデータを取り出す
if (que.Count > 0)
{
data = que.Dequeue();
WriteLine($"{header} Dequeue({data.Value}), count={que.Count}");
System.Threading.Monitor.PulseAll(que); // 待機中のスレッドがあったら、再開させる
}
else // キューが空で、runningFlag==falseのとき
break; // 無限ループを抜ける
} // 排他ロックを解放
// データの処理はロックを外してから行う
if (data.HasValue)
{
// キューから取り出したデータを使う処理をシミュレート
WriteLine($"{header} start process({data.Value})");
System.Threading.Thread.Sleep(300);
WriteLine($"{header} end process({data.Value})");
}
}
WriteLine($"{header} EXIT");
};
System.Threading.Thread.Sleep(100);
// Consumerの実行を開始(2スレッド)
var consumeTask1 = Task.Run(() => consume(" [1]"));
var consumeTask2 = Task.Run(() => consume(" [2]"));
// 実行終了を待機
Task.WaitAll(produceTask, consumeTask1, consumeTask2);
WriteLine("COMPLETED");
Const MAX_QUE As Integer = 3 ' キューに保持する最大個数
Dim que = New Queue(Of Integer)(MAX_QUE)
Dim runningFlag As Boolean = True ' falseにした後、キューが空になるとConsumerが止まる
' Producerを定義して実行開始
Dim produceTask = Task.Run(
Sub()
' 1から5まで順にデータを投入していく
For i As Integer = 1 To 5
' データ投入が間に合わない場合をシミュレート
If (i = 5) Then
WriteLine($"stop enqueue")
System.Threading.Thread.Sleep(1000)
WriteLine($"restart enqueue")
End If
' 投入データを作成する処理をシミュレート
System.Threading.Thread.Sleep(10)
' キューへデータを投入する
SyncLock que ' 排他ロックを取得
While (que.Count >= MAX_QUE)
' MAX_QUEを超えないように投入を待機する
WriteLine($"wait enqueue")
System.Threading.Monitor.Wait(que) ' 待機する(他でMonitor.PulseAllされると再開)
End While
que.Enqueue(i)
WriteLine($"Enqueue({i}), count={que.Count}")
System.Threading.Monitor.PulseAll(que) ' 待機中のスレッドがあったら、再開させる
End SyncLock '排他ロックを解放
Next
runningFlag = False ' データ投入完了を通知
WriteLine($"end enqueue")
End Sub)
' Consumerを定義
Dim consume As Action(Of String) =
Sub(header)
While (True)
Dim data As Integer? = Nothing
SyncLock que '排他ロックを取得
While (que.Count = 0 AndAlso runningFlag)
' キューが空の間は待機
WriteLine($"{header} wait dequeue")
System.Threading.Monitor.Wait(que) ' 待機する(他でMonitor.PulseAllされると再開)
End While
' キューからデータを取り出す
If (que.Count > 0) Then
data = que.Dequeue()
WriteLine($"{header} Dequeue({data.Value}), count={que.Count}")
System.Threading.Monitor.PulseAll(que) ' 待機中のスレッドがあったら、再開させる
Else ' キューが空で、runningFlag==falseのとき
Exit While ' 無限ループを抜ける
End If
End SyncLock ' 排他ロックを解放
' データの処理はロックを外してから行う
If (data.HasValue) Then
' キューから取り出したデータを使う処理をシミュレート
WriteLine($"{header} start process({data.Value})")
System.Threading.Thread.Sleep(300)
WriteLine($"{header} end process({data.Value})")
End If
End While
WriteLine($"{header} EXIT")
End Sub
System.Threading.Thread.Sleep(100)
' Consumerの実行を開始(2スレッド)
Dim consumeTask1 = Task.Run(Sub() consume(" [1]"))
Dim consumeTask2 = Task.Run(Sub() consume(" [2]"))
' 実行終了を待機
Task.WaitAll(produceTask, consumeTask1, consumeTask2)
WriteLine("COMPLETED")
上のコードの実行例を次の画像に示す。
ちなみに余談ではあるが、ジェネリック版ではないQueueコレクション(System.Collections名前空間)にあったSynchronizedメソッドは、Queue<T>コレクションでは削除された。Synchronizedメソッドで得られたラッパークラスのオブジェクトといえども、次のコードに示すような場合にはスレッド間の排他ロックが必要なのだが、それを忘れてコーディングしてしまう例が頻発したため、Synchronizedメソッドを廃止したということらしい。
var que = new Queue();
var syncQue = Queue.Synchronized(que);
……省略……
Object obj = null;
// 次のコードは正しくない
if (syncQue.Count > 0) {
// ↑↓この間に他スレッドでキューが空にされたら……
obj = syncQue.Dequeue(); // ←ここで例外が出てしまう
}
// 正しくは次のようにロックしなければならない
lock (syncQue.SyncRoot) {
if (syncQue.Count > 0)
obj = syncQue.Dequeue();
}
キュー(待ち行列)は、処理待ちのデータを一時的に蓄えておくのに適したコレクションだ。シングルスレッドの場合は、本稿で解説したQueue<T>クラスを使えばよい。マルチスレッドの場合、.NET Framework 4.0未満ではQueue<T>クラスを使うことになるが、本稿で示したように少々複雑になる(.NET Framework 4.0以降ならばBlockingCollection<T>クラスを使った方がよい)。
利用可能バージョン:.NET Framework 2.0以降
カテゴリ:クラス・ライブラリ 処理対象:コレクション
使用ライブラリ:Queueクラス(System.Collections.Generic名前空間)
関連TIPS:構文:クラス名を書かずに静的メソッドを呼び出すには?[C# 6.0]
関連TIPS:VB.NETでクラス名を省略してメソッドや定数を利用するには?
関連TIPS:数値を右詰めや0埋めで文字列化するには?[C#、VB]
関連TIPS:Visual Studioでコンソール・アプリケーションのデバッグ実行時にコマンド・プロンプトを閉じないようにするには?
Copyright© Digital Advantage Corp. All Rights Reserved.