S4と外部システムとのイベントの送受信は、イベントの振り分けのための「ClientAdapter」が介在したシステム構成です。どのようなシステム構成かを示します。
シングル構成では、S4とClientAdapterが1台ずつ存在します。構成は定義ファイル「clusters.xml」にそれぞれが通信に用いるUDPのポート番号を指定します。また、定義ファイル「client-stub-conf.xml」ではClientAdapterが外部システムとの通信に用いるTCPのポート番号を指定します。
ユーザーアプリケーションはディレクトリ「s4-apps」に配置することで、S4起動時に認識されます。
クラスタ構成では、S4とClientAdapterが複数台存在し、負荷分散が可能です。構成は定義ファイル「clusters.xml」にS4とClientAdapterを複数台定義し、それぞれが通信に用いるUDPのポート番号を指定します。
また、定義ファイル「client-stub-conf.xml」に外部システムとの通信に用いるTCPのポート番号を指定します。負荷分散は、イベントのキーの値のハッシュ値によりそれを処理するノードを決定することで行われます。
障害対策として、クラスタ構成では「Apache Zookeeper」を利用したスタンバイ機を用意することで可用性向上が図れます。これは、環境設定ファイル「s4-core.properties-header」で「commlayer_mode=dynamic」とすることで、「clusters.xml」の内容をZookeeperから取得することを指定できます(clusters.xmlの内容は「task-setup.sh」によりZookeeperに登録します)。
障害発生時のスタンバイ機への切り替えは次のように行います(例:clusters.xmlでS4を2台定義し、S4を3台稼働させた場合)。
アプリケーションの作成は、S4とともに提供されるサンプルのtwittertopiccountを基にします。ディレクトリ構成はそのまま流用し、以下を参考にします。
Twittertopiccountの稼働確認の手順を示します。まず、S4のモジュールをGitHubからダウンロードします。
git clone https://github.com/s4/s4.git
S4のバイナリを作成します。
./gradlew allImage
S4のバイナリの在り処を環境変数S4_IMAGEにセットします。
cd build/s4-image/ export S4_IMAGE=`pwd`
アプリケーションtwittertopiccountを入手します。
git clone https://github.com/s4/twittertopiccount.git
アプリケーションをコンパイルします。
./gradlew install
アプリケーションをS4のバイナリにデプロイします。
./gradlew deploy
S4システムを起動します。
$S4_IMAGE/scripts/start-s4.sh -r client-adapter
ClientAdapterを起動します。
$S4_IMAGE/scripts/run-client-adapter.sh -s client-adapter -g s4 -d $S4_IMAGE/s4-core/conf/default/client-stub-conf.xml
外部システムのTwitterFeedListenerを実行します。
cd build/install/twitter_feed_listener bin/twitter_feed_listener {twitterユーザー名} {twitterパスワード}
必要に応じてeclipseのプロジェクトにできます。
./gradlew eclipse
S4を活用するうえで認識が必要となる特徴を紹介します。
サンプル「twittertopiccount」では、外部システムはキーを指定せずにイベントをS4に送信します。この場合、ClientAdapterがクラスタ環境でどのノードに送信するかはランダムに決定され、そのノードでのPEのインスタンスは1つだけになります。
では、外部システムでキー付きのイベントを送信できるでしょうか。イベント作成時のコンストラクタをキー付きのものに変え、アプリケーションの定義ファイルでキーの指定するように変更すればよさそうですが、こうするとPEでイベントが受信できなくなります。
これは、外部システムのイベント作成時はキーは「keyNames」のプロパティにセットされますが、ClientAdapterでは「keys」のプロパティで検索することに起因します。
よって、S4のソースコードを修正し、このプロパティを統一すればキー付きでイベントを取り扱えます。これにより、クラスタ環境ではランダムではなくキーの値のハッシュ値で送信先ノードを決定し、キーの値が同じであれば、同じノードに送られ、その中の同じPEのインスタンスで処理されることが保証されます。
PEのインスタンスはキーの値ごとに存在しますが、どのように管理されるでしょうか? インスタンスはPEのクラスごとにPrototypeWrapperで管理され、以下を要素として持ちます。
プロパティ名 | 内容 |
---|---|
prototype | PEインスタンスのひな型。インスタンス生成時のコピー元として使用 |
lookupTable | インスタンス管理に使用。ConcurrentHashMapを用い、キーの値、PEのインスタンス+生成時刻+TTLを格納 |
S4はイベントを受信すると、それを処理するPEのインスタンスを特定するため、まずストリーム名より処理するPEのクラスを特定して、このPrototypeWrapperを取得し、キーの値でこのlookupTableを検索します。
存在すれば、そのインスタンスでイベントを処理し、存在しなければprototypeのPEインスタンスのひな型を「clone()」して、このキーの値を処理する専用のインスタンスとして生成時刻とともにHashMapに格納します。
アプリケーションの定義ファイルでは各PEのインスタンスの有効期限(TTL)が設定できます。生成されてからTTL以上経過したPEのインスタンスはHashMapから削除し、無駄にインスタンスが存続するのを防ぎます。
S4とClientAdapterはUDPで通信しますが、その実装は「java.net.DatagramPacket」で送信データを作成し、「java.net.DatagramSocket#send」で送信処理を行います。このため、送信先のノードが停止していても送信処理は必ず正常終了と判断されるので、S4ではダウンしているノードにイベントを送信し、イベントがロスすることを防止/検出できません。
これは、可用性を高めるため、Zookeeperを用いてスタンバイ機を使用しても、ノードのダウン検知、スタンバイ機のサービス開始のタイムラグの間のイベントはロスを防げません。
また、S4内ではイベントはサイズが指定されたキュー「java.util.concurrent.LinkedBlockingQueue」に格納されます。このサイズは環境定義ファイル「s4-core.properties-header」で「pe_container_max_queue_size=8000(デフォルト値)」のように設定されます。高負荷時にイベントを受信しこのキューが満杯であると、このイベントは破棄されます。
イベントの処理状況はログファイル「build/s4-image/s4-core/logs/s4-core/s4-core_{プロセス番号}.mon」に以下などが出力されます。
次のような点に着目し、S4の稼働状況を把握し、チューニングを検討します。
チューニングとしては、キューのサイズを増やすとともにJavaヒープサイズを増やす、またはS4のノードを追加することを検討します。
S4はHadoopと比較すると、データをオンメモリベースで1件ずつ処理するため、低レイテンシが期待できます。現在はアーキテクチャの前提でデータロストを許容しているので、ミッションクリティカルな分野への適用が困難なところはありますが、リアルタイムな処理が要求される場合はS4の使用を検討してみてはいかがでしょうか。
木村幸敏(きむら ゆきとし)
大規模案件での性能問題にかかわり、アプリのボトルネック特定/解消にかかわる。
また各種FWの適用による生産性向上に取り組む。
TIS先端技術センターでは、採れたての検証成果や知見などをWebサイトで発信中
Copyright © ITmedia, Inc. All Rights Reserved.