検索
連載

リアクティブプログラミングにおけるPlay、Akka、Akka Streamsリアクティブプログラミング超入門(3)(2/2 ページ)

本連載では、リアクティブプログラミング(RP)の概要や、それに関連する技術、RPでアプリを作成するための手法について解説します。今回は、Lightbend Reactive Platformを構成するPlay framework、Akka、Akka Streamsについて、実際にサンプルを動かしながら解説します

PC用表示 関連情報
Share
Tweet
LINE
Hatena
前のページへ |       

Reactive StreamsのAkka実装「Akka Streams」とは

 現在のWebでは膨大な量のデータをリアルタイムでやりとりしています。こういったデータは普通にダウンロード/アップロードしていては時間がかかり過ぎてしまうので、ストリーム処理(無限に発生するデータを継続的に処理する仕組み)が必要になります。

 例えば、下記などはストリームが妥当な処理方法です。

  • テラバイト級ファイルの処理(メモリに展開し切れない)
  • 通信処理(処理が継続的に発生し続ける)

 Stream処理では、受信側(Subscriber)が処理しきれないデータを送信し続けるとバッファーがあふれてしまうため、送信側(Publisher)がデータを送り過ぎないようにする必要があります。

 これを回避するために「Back Pressure」という仕組みでデータ量を調整するストリームが、「Reactive Streams」(ノンブロッキングで非同期なストリーム処理の仕様)の基本方針です。そのReactive StreamsのAkka実装が、前回の記事でも少しだけ紹介したAkka Streamsなのです。

 Akka Streamsは処理する要素の入力と出力がはっきり決まっているため、図を作成するように分かりやすく処理フローを記述できます。また、処理については副作用がなく、再利用も可能になっています。

 ここからは、Akka Streamsを使うために覚えておきたいキーワードについて解説します。

Source

 データの源泉であり、出力を行う役割を持ちます(Publisher)。そのため、入力用チャンネルは持たず、出力チャンネルを1つだけ持っています。


図1 ource

Sink

 データの出力先です。1つの入力用チャンネルを持ち、出力用チャンネルは持っていません。


図2 Sink

Flow

 入力チャンネルと出力チャンネルを1つずつ持っており、SourceとSinkの間で、データ処理を行えます。SourceとFlowと接続すれば新たなSourceに、SinkとFlowを接続すれば新たなSinkになります。


図3 Flow

Materializer

 Akka StreamsでStream処理を実行する環境の抽象モデルです。「ActorMaterializer」クラスを使うと、アクターでの実行が可能になります。

RunnableGraph

 Source、Flow、Sinkを接続した結果が図4の「RunnableGraph」です。RunnableGraphとは実行可能な状態のGraphです。このRunnableGraphにrun関数を実行することで、Source、Flow、Sinkの処理が評価されます。


図4 RunnableGraph

Akka Streamsを使ってみる

 Akka Streamsを使ってみましょう。必要なクラスをインポートし、ActorMaterializerのインスタンスを作成します。

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent._
 
implicit val system = ActorSystem("SampleActor")
implicit val materializer = ActorMaterializer()

 次にSourceを作ります。Sourceは下記のように、幾つかの方法で作れます。

//単一の要素を持ったSourceを作成する場合はsingle関数を使用
val src1 = Source.single("string value")
 
//Rangeを受け取ることも可能
val src2 = Source(1 to 5)
 
//Futureを使うこともできる
val src3 = Source.fromFuture(Future.successful("Future Streams"))

コラム「Future型とは」

 Futureは、まだ存在しない処理結果を抽象化した型です。通常、Futureは並行処理され後で取り出すことが可能になっています。このため、Futureを使用した処理は非同期でノンブロッキングになることが多くあります。


 次にFlowを作ります。ここでは「Sourceから要素を受け取り、処理(値を2倍)した後Sinkへ渡す」処理を記述します。

val flow = Flow[Int].map(_ * 2)

 最後に、Sinkを作ります。ここでは受け取った要素全てに対して標準出力(println)を行います。

val sink = Sink.foreach[Int](println)

 Source→FLow→Sinkをつないで、RunnableGraphとして実行してみます。下記プログラムではSourceで1〜5のRangeオブジェクトを作成し、Flowでその値を2倍にし、Sinkで出力しています。

import scala.concurrent._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka.util._
 
object Main extends App {
 
  implicit val system = ActorSystem("TestSystem")
  implicit val materializer = ActorMaterializer()
  import system.dispatcher
 
  val src = Source(1 to 5)
  val flow = Flow[Int].map(_ * 2)
  val sink = Sink.foreach[Int](println)
 
  val graph = src.via(flow).to(sink)
 
  graph.run()
}

 SourceとFlowはvia関数で接続し、Sinkはto関数で接続します。toはRunnableGraphを返すので、それに対してrunを実行することで実際の処理を行えます。

Akka Streamsを使ったHTTPサーバのサンプル

 もう少し具体的なサンプルプログラムを作ってみましょう。

 サンプルではAkka Streamsを使ってファイルのコピーを行います。入力ストリームから受け取ったデータを全て出力ストリームへ書き込むだけの単純なサンプルです。

 Source作成はPaths.getを使用してパスを取得し、akka.stream.scaladsl.FileIOクラスのfromPathを使用して作成します。

val inPath = Paths.get("src/main/resources/test.txt")
val source:Source[ByteString,Future[IOResult]] = FileIO.fromPath(inPath)

 同じようにSinkも作ります。

val outPath = Paths.get("<path/your/output file>")
val sink:Sink[ByteString,Future[IOResult]] =
  FileIO.toPath(outPath,Set(CREATE,WRITE,APPEND))

 SourceとSinkは、どちらもストリームのエンドポイントです。今回はSourceとSinkを接続し、RunnableGraphを生成します。

val graph:RunnableGraph[Future[IOResult]] = source.to(sink)

 これでSourceからデータを受け取ってSinkへ送るRunnableGraphができましたが、これだけではまだコピーは実行されません。run関数を実行することでファイルのコピーが実行されます。

graph.run().foreach { result =>
  println(s"${result.count} bytes copy.")
  system.terminate()
}

 Akka Streamsを使ったHTTPサーバのサンプル全体のソースコードは下記です。

import java.nio.file.StandardOpenOption._
import java.nio.file.Paths
 
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl._
import akka.util.ByteString
 
import scala.concurrent.Future
 
object FileCopy {
  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val materializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher
 
    val inPath = Paths.get("<path/your/input file>")
    val source:Source[ByteString,Future[IOResult]] = FileIO.fromPath(inPath)
 
    val outPath = Paths.get("<path/your/output file>")
    val sink:Sink[ByteString,Future[IOResult]] =
      FileIO.toPath(outPath,Set(CREATE,WRITE,APPEND))
 
    val graph:RunnableGraph[Future[IOResult]] = source.to(sink)
 
    graph.run().foreach { result =>
      println(s"${result.count} bytes copy.")
      system.terminate()
    }
  }
}

次回はマイクロサービス向けフレームワーク「Lagom」について

 今回はLightbend PlatformのプロダクトであるPlay framework、Akka、Akka Streamsについて、実際にサンプルを動かしてみました。特にAkkaやAkka Streamsはリアクティブアプリを構築する上で重要です。

 なおPlay framewrokでは、標準でAkka Streamsを含んでいるため、依存関係を追加しなくても使えます。PlayとAkka Streamsの統合方法については次回以降で解説する予定ですが、次回はその前にマイクロサービス向けフレームワーク「Lagom」について解説します。

著者紹介

中村修太(なかむら しゅうた)

中村修太

クラスメソッド勤務の新しもの好きプログラマーです。数年前に東京から山口県に引っ越し、現在もリモート勤務しています。最近の趣味は空手とぬか漬け作り。


前のページへ |       

Copyright © ITmedia, Inc. All Rights Reserved.

ページトップに戻る