リアルタイム分散処理の常識をApache S4で身につける:ビッグデータ処理の常識をJavaで身につける(6)(2/2 ページ)
Hadoopをはじめ、Java言語を使って構築されることが多い「ビッグデータ」処理のためのフレームワーク/ライブラリを紹介しながら、大量データを活用するための技術の常識を身に付けていく連載
Apache S4のシステム構成
S4と外部システムとのイベントの送受信は、イベントの振り分けのための「ClientAdapter」が介在したシステム構成です。どのようなシステム構成かを示します。
シングル構成
シングル構成では、S4とClientAdapterが1台ずつ存在します。構成は定義ファイル「clusters.xml」にそれぞれが通信に用いるUDPのポート番号を指定します。また、定義ファイル「client-stub-conf.xml」ではClientAdapterが外部システムとの通信に用いるTCPのポート番号を指定します。
ユーザーアプリケーションはディレクトリ「s4-apps」に配置することで、S4起動時に認識されます。
クラスタ構成
クラスタ構成では、S4とClientAdapterが複数台存在し、負荷分散が可能です。構成は定義ファイル「clusters.xml」にS4とClientAdapterを複数台定義し、それぞれが通信に用いるUDPのポート番号を指定します。
また、定義ファイル「client-stub-conf.xml」に外部システムとの通信に用いるTCPのポート番号を指定します。負荷分散は、イベントのキーの値のハッシュ値によりそれを処理するノードを決定することで行われます。
可用性向上
障害対策として、クラスタ構成では「Apache Zookeeper」を利用したスタンバイ機を用意することで可用性向上が図れます。これは、環境設定ファイル「s4-core.properties-header」で「commlayer_mode=dynamic」とすることで、「clusters.xml」の内容をZookeeperから取得することを指定できます(clusters.xmlの内容は「task-setup.sh」によりZookeeperに登録します)。
障害発生時のスタンバイ機への切り替えは次のように行います(例:clusters.xmlでS4を2台定義し、S4を3台稼働させた場合)。
- 3台目は起動時にZookeeperを参照して2台がすでに稼働済みと判断し待ちに入り、Zookeeper上のS4の稼働数を定期的に監視
- 実際に稼働しているS4の2台のどちらかがダウンすると、Zookeeper上の稼働数が減る
- 待ち状態の3台目では、それを検知して待ちを解除し、サービス提供を開始
S4アプリケーションの作成、稼働
アプリケーションの作成は、S4とともに提供されるサンプルのtwittertopiccountを基にします。ディレクトリ構成はそのまま流用し、以下を参考にします。
- アプリケーションの定義ファイルは「twittertopiccount-conf.xml」
- 外部システムでのイベント作成は「TwitterFeedListener.java」
Twittertopiccountの稼働確認の手順を示します。まず、S4のモジュールをGitHubからダウンロードします。
git clone https://github.com/s4/s4.git
S4のバイナリを作成します。
./gradlew allImage
S4のバイナリの在り処を環境変数S4_IMAGEにセットします。
cd build/s4-image/ export S4_IMAGE=`pwd`
アプリケーションtwittertopiccountを入手します。
git clone https://github.com/s4/twittertopiccount.git
アプリケーションをコンパイルします。
./gradlew install
アプリケーションをS4のバイナリにデプロイします。
./gradlew deploy
S4システムを起動します。
$S4_IMAGE/scripts/start-s4.sh -r client-adapter
ClientAdapterを起動します。
$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml
外部システムのTwitterFeedListenerを実行します。
cd build/install/twitter_feed_listener bin/twitter_feed_listener {twitterユーザー名} {twitterパスワード}
必要に応じてeclipseのプロジェクトにできます。
./gradlew eclipse
S4を使う際に知っておきたい4つの特徴
S4を活用するうえで認識が必要となる特徴を紹介します。
【1】外部システムでのキーの扱い
サンプル「twittertopiccount」では、外部システムはキーを指定せずにイベントをS4に送信します。この場合、ClientAdapterがクラスタ環境でどのノードに送信するかはランダムに決定され、そのノードでのPEのインスタンスは1つだけになります。
では、外部システムでキー付きのイベントを送信できるでしょうか。イベント作成時のコンストラクタをキー付きのものに変え、アプリケーションの定義ファイルでキーの指定するように変更すればよさそうですが、こうするとPEでイベントが受信できなくなります。
これは、外部システムのイベント作成時はキーは「keyNames」のプロパティにセットされますが、ClientAdapterでは「keys」のプロパティで検索することに起因します。
よって、S4のソースコードを修正し、このプロパティを統一すればキー付きでイベントを取り扱えます。これにより、クラスタ環境ではランダムではなくキーの値のハッシュ値で送信先ノードを決定し、キーの値が同じであれば、同じノードに送られ、その中の同じPEのインスタンスで処理されることが保証されます。
【2】PEのインスタンス管理
PEのインスタンスはキーの値ごとに存在しますが、どのように管理されるでしょうか? インスタンスはPEのクラスごとにPrototypeWrapperで管理され、以下を要素として持ちます。
プロパティ名 | 内容 |
---|---|
prototype | PEインスタンスのひな型。インスタンス生成時のコピー元として使用 |
lookupTable | インスタンス管理に使用。ConcurrentHashMapを用い、キーの値、PEのインスタンス+生成時刻+TTLを格納 |
S4はイベントを受信すると、それを処理するPEのインスタンスを特定するため、まずストリーム名より処理するPEのクラスを特定して、このPrototypeWrapperを取得し、キーの値でこのlookupTableを検索します。
存在すれば、そのインスタンスでイベントを処理し、存在しなければprototypeのPEインスタンスのひな型を「clone()」して、このキーの値を処理する専用のインスタンスとして生成時刻とともにHashMapに格納します。
アプリケーションの定義ファイルでは各PEのインスタンスの有効期限(TTL)が設定できます。生成されてからTTL以上経過したPEのインスタンスはHashMapから削除し、無駄にインスタンスが存続するのを防ぎます。
【3】障害対策の実現レベルは
S4とClientAdapterはUDPで通信しますが、その実装は「java.net.DatagramPacket」で送信データを作成し、「java.net.DatagramSocket#send」で送信処理を行います。このため、送信先のノードが停止していても送信処理は必ず正常終了と判断されるので、S4ではダウンしているノードにイベントを送信し、イベントがロスすることを防止/検出できません。
これは、可用性を高めるため、Zookeeperを用いてスタンバイ機を使用しても、ノードのダウン検知、スタンバイ機のサービス開始のタイムラグの間のイベントはロスを防げません。
また、S4内ではイベントはサイズが指定されたキュー「java.util.concurrent.LinkedBlockingQueue」に格納されます。このサイズは環境定義ファイル「s4-core.properties-header」で「pe_container_max_queue_size=8000(デフォルト値)」のように設定されます。高負荷時にイベントを受信しこのキューが満杯であると、このイベントは破棄されます。
【4】チューニング
イベントの処理状況はログファイル「build/s4-image/s4-core/logs/s4-core/s4-core_{プロセス番号}.mon」に以下などが出力されます。
- キューに正常にPUTできた回数「S4::S4CoreMetrics:pec_nq = 64」
- キューが満杯でPUTできなかった回数「S4::S4CoreMetrics:pec_dr = 79」
- キューの現在のサイズ「S4::S4CoreMetrics:pec_qsz_w = 0」
- イベントRawStatusの処理数「S4::S4EventMetrics:gl_in:et:RawStatus = 3」
- TopicCountAndReportPEの呼び出し数「S4::S4AppMetrics:pec_pe:at:TopicCountAndReportPE = 15」
- Javaヒープの空き容量「S4::S4CoreMetrics:s4_fmem = 73」
次のような点に着目し、S4の稼働状況を把握し、チューニングを検討します。
- S4の「pec_dr」が「0」になること(データのロスを防止するため)
- ClientAdapterの「lle_out」(ClientAdapterがS4に送信したイベント数)とS4のlle_out(S4のPEが別PEにイベントを送信した数)の和が、S4の「lll_in」(S4でのイベント受信数)が等しくなること
- 空きメモリ「s4_fmem」に余裕があるか
チューニングとしては、キューのサイズを増やすとともにJavaヒープサイズを増やす、またはS4のノードを追加することを検討します。
S4なら「ミッションクリティカル」でも分散処理できるか?
S4はHadoopと比較すると、データをオンメモリベースで1件ずつ処理するため、低レイテンシが期待できます。現在はアーキテクチャの前提でデータロストを許容しているので、ミッションクリティカルな分野への適用が困難なところはありますが、リアルタイムな処理が要求される場合はS4の使用を検討してみてはいかがでしょうか。
著者紹介
木村幸敏(きむら ゆきとし)
大規模案件での性能問題にかかわり、アプリのボトルネック特定/解消にかかわる。
また各種FWの適用による生産性向上に取り組む。
TIS先端技術センターでは、採れたての検証成果や知見などをWebサイトで発信中
Copyright © ITmedia, Inc. All Rights Reserved.
関連記事
- 次世代Hadoopの特徴は、MapReduce 2とGiraph
Hadoopの父に聞く、HadoopとClouderaの現在・未来 - テキストマイニングで始める実践Hadoop活用
- Javaで覚えるIT技術者の40の常識
新人プログラマ/SEは覚えておきたい“まとめ” - 実践! Rで学ぶ統計解析の基礎