現在のWebでは膨大な量のデータをリアルタイムでやりとりしています。こういったデータは普通にダウンロード/アップロードしていては時間がかかり過ぎてしまうので、ストリーム処理(無限に発生するデータを継続的に処理する仕組み)が必要になります。
例えば、下記などはストリームが妥当な処理方法です。
Stream処理では、受信側(Subscriber)が処理しきれないデータを送信し続けるとバッファーがあふれてしまうため、送信側(Publisher)がデータを送り過ぎないようにする必要があります。
これを回避するために「Back Pressure」という仕組みでデータ量を調整するストリームが、「Reactive Streams」(ノンブロッキングで非同期なストリーム処理の仕様)の基本方針です。そのReactive StreamsのAkka実装が、前回の記事でも少しだけ紹介したAkka Streamsなのです。
Akka Streamsは処理する要素の入力と出力がはっきり決まっているため、図を作成するように分かりやすく処理フローを記述できます。また、処理については副作用がなく、再利用も可能になっています。
ここからは、Akka Streamsを使うために覚えておきたいキーワードについて解説します。
データの源泉であり、出力を行う役割を持ちます(Publisher)。そのため、入力用チャンネルは持たず、出力チャンネルを1つだけ持っています。
データの出力先です。1つの入力用チャンネルを持ち、出力用チャンネルは持っていません。
入力チャンネルと出力チャンネルを1つずつ持っており、SourceとSinkの間で、データ処理を行えます。SourceとFlowと接続すれば新たなSourceに、SinkとFlowを接続すれば新たなSinkになります。
Akka StreamsでStream処理を実行する環境の抽象モデルです。「ActorMaterializer」クラスを使うと、アクターでの実行が可能になります。
Source、Flow、Sinkを接続した結果が図4の「RunnableGraph」です。RunnableGraphとは実行可能な状態のGraphです。このRunnableGraphにrun関数を実行することで、Source、Flow、Sinkの処理が評価されます。
Akka Streamsを使ってみましょう。必要なクラスをインポートし、ActorMaterializerのインスタンスを作成します。
import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl._ import scala.concurrent._ implicit val system = ActorSystem("SampleActor") implicit val materializer = ActorMaterializer()
次にSourceを作ります。Sourceは下記のように、幾つかの方法で作れます。
//単一の要素を持ったSourceを作成する場合はsingle関数を使用 val src1 = Source.single("string value") //Rangeを受け取ることも可能 val src2 = Source(1 to 5) //Futureを使うこともできる val src3 = Source.fromFuture(Future.successful("Future Streams"))
Futureは、まだ存在しない処理結果を抽象化した型です。通常、Futureは並行処理され後で取り出すことが可能になっています。このため、Futureを使用した処理は非同期でノンブロッキングになることが多くあります。
次にFlowを作ります。ここでは「Sourceから要素を受け取り、処理(値を2倍)した後Sinkへ渡す」処理を記述します。
val flow = Flow[Int].map(_ * 2)
最後に、Sinkを作ります。ここでは受け取った要素全てに対して標準出力(println)を行います。
val sink = Sink.foreach[Int](println)
Source→FLow→Sinkをつないで、RunnableGraphとして実行してみます。下記プログラムではSourceで1〜5のRangeオブジェクトを作成し、Flowでその値を2倍にし、Sinkで出力しています。
import scala.concurrent._ import akka._ import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.util._ object Main extends App { implicit val system = ActorSystem("TestSystem") implicit val materializer = ActorMaterializer() import system.dispatcher val src = Source(1 to 5) val flow = Flow[Int].map(_ * 2) val sink = Sink.foreach[Int](println) val graph = src.via(flow).to(sink) graph.run() }
SourceとFlowはvia関数で接続し、Sinkはto関数で接続します。toはRunnableGraphを返すので、それに対してrunを実行することで実際の処理を行えます。
もう少し具体的なサンプルプログラムを作ってみましょう。
サンプルではAkka Streamsを使ってファイルのコピーを行います。入力ストリームから受け取ったデータを全て出力ストリームへ書き込むだけの単純なサンプルです。
Source作成はPaths.getを使用してパスを取得し、akka.stream.scaladsl.FileIOクラスのfromPathを使用して作成します。
val inPath = Paths.get("src/main/resources/test.txt") val source:Source[ByteString,Future[IOResult]] = FileIO.fromPath(inPath)
同じようにSinkも作ります。
val outPath = Paths.get("<path/your/output file>") val sink:Sink[ByteString,Future[IOResult]] = FileIO.toPath(outPath,Set(CREATE,WRITE,APPEND))
SourceとSinkは、どちらもストリームのエンドポイントです。今回はSourceとSinkを接続し、RunnableGraphを生成します。
val graph:RunnableGraph[Future[IOResult]] = source.to(sink)
これでSourceからデータを受け取ってSinkへ送るRunnableGraphができましたが、これだけではまだコピーは実行されません。run関数を実行することでファイルのコピーが実行されます。
graph.run().foreach { result => println(s"${result.count} bytes copy.") system.terminate() }
Akka Streamsを使ったHTTPサーバのサンプル全体のソースコードは下記です。
import java.nio.file.StandardOpenOption._ import java.nio.file.Paths import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, IOResult} import akka.stream.scaladsl._ import akka.util.ByteString import scala.concurrent.Future object FileCopy { def main(args: Array[String]): Unit = { implicit val system = ActorSystem() implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher val inPath = Paths.get("<path/your/input file>") val source:Source[ByteString,Future[IOResult]] = FileIO.fromPath(inPath) val outPath = Paths.get("<path/your/output file>") val sink:Sink[ByteString,Future[IOResult]] = FileIO.toPath(outPath,Set(CREATE,WRITE,APPEND)) val graph:RunnableGraph[Future[IOResult]] = source.to(sink) graph.run().foreach { result => println(s"${result.count} bytes copy.") system.terminate() } } }
今回はLightbend PlatformのプロダクトであるPlay framework、Akka、Akka Streamsについて、実際にサンプルを動かしてみました。特にAkkaやAkka Streamsはリアクティブアプリを構築する上で重要です。
なおPlay framewrokでは、標準でAkka Streamsを含んでいるため、依存関係を追加しなくても使えます。PlayとAkka Streamsの統合方法については次回以降で解説する予定ですが、次回はその前にマイクロサービス向けフレームワーク「Lagom」について解説します。
Copyright © ITmedia, Inc. All Rights Reserved.