データフローのベースクラスであるFlowDescriptionを継承したJavaのクラスとして宣言します。
package tc.asakusa.day2.jobflow;
import tc.asakusa.day2.modelgen.dmdl.model.MstItem;
import tc.asakusa.day2.modelgen.dmdl.model.TrnSalesDetail;
import tc.asakusa.day2.modelgen.dmdl.model.ItemSalesSummary;
import tc.asakusa.day2.operator.ItemSalesSummaryOperatorFactory;
import tc.asakusa.day2.operator.ItemSalesSummaryOperatorFactory.SummarizeByItem;
import tc.asakusa.day2.operator.ItemSalesSummaryOperatorFactory.JoinItem;
import com.asakusafw.vocabulary.flow.Export;
import com.asakusafw.vocabulary.flow.FlowDescription;
import com.asakusafw.vocabulary.flow.Import;
import com.asakusafw.vocabulary.flow.In;
import com.asakusafw.vocabulary.flow.JobFlow;
import com.asakusafw.vocabulary.flow.Out;
import com.asakusafw.vocabulary.flow.util.CoreOperatorFactory;
/**
* 商品別に販売数量の集計を行う。
*/
@JobFlow(name = "JFItemSummary") // ジョブフローID
public class JfItemSalesSummary extends FlowDescription {
final In<MstItem> mstItem;
final In<TrnSalesDetail> trnSalesDetail;
final Out<ItemSalesSummary> itemSalesSummary;
public JfItemSalesSummary(
// 入力データの定義
@Import(name = "MstItem", description = MstItemFromCsv.class)
In<MstItem> mstItem,
@Import(name = "TrnSalesDetail", description = TrnSalesDetailFromCsv.class)
In<TrnSalesDetail> trnSalesDetail,
// 出力データの定義
@Export(name = "ItemSalesSummary", description = ItemSalesSummaryToCsv.class)
Out<ItemSalesSummary> itemSalesSummary) {
this.mstItem= mstItem;
this.trnSalesDetail= trnSalesDetail;
this.itemSalesSummary = itemSalesSummary;
}
@Override
protected void describe() {
ItemSalesSummaryOperatorFactory operators = new ItemSalesSummaryOperatorFactory(); // 作成したOperatorのFactoryクラスをインスタンス化して使用
CoreOperatorFactory core = new CoreOperatorFactory(); // AsakusaFWに用意されているコア演算子を使用する場合
// 売上伝票明細と商品マスタをジョインする
JoinItem joinItem = operators.joinItem(mstItem, trnSalesDetail);
// ジョインできなかったレコードは捨てる
core.stop(joinItem.missed); // 不要な出力は、stop演算子で止める
(stopはコア演算子)
// 売上をカテゴリ別に集計
SummarizeByItem summarize = operators.summarizeByItem(joinItem.joined); // 結合演算子の結合結果joinItemを単純集計演算子summarizeByItemのパラメタに引き渡す
// 集計結果を出力
itemSalesSummary.add(summarize.out);// 単純集計演算子の出力をこのジョブの出力にadd
}
}
今回作成したジョブフローは1つなので、下記のように、「run({ジョブフロークラス}).soon()」で、すぐに実行するように記述します。
package tc.asakusa.day2.batch;
import tc.asakusa.day2.jobflow.JfItemSalesSummary;
import com.asakusafw.vocabulary.batch.Batch;
import com.asakusafw.vocabulary.batch.BatchDescription;
@Batch(name = "Day2ItemSummary")
public class BtSummarize extends BatchDescription {
@Override
protected void describe() {
run(JfItemSalesSummary.class).soon();
}
}
例えば、Job1の終了後、Job2を起動するような場合は、「run(Job2.class).after(job1)」のように記述します。図のようにJob1の終了後、Job2とJob3を並列で起動して、Job2とJob3の両方が終了した後、Job4を起動したい場合は、(例)のように記述します。
Work job1 = run(Job1.class).soon();
Work job2 = run(Job2.class).after(job1);
Work job3 = run(Job3.class).after(job1);
Work job4 = run(Job4.class).after(job2, job3);
(例)
さて、ようやく1本のバッチが完成しました。早速ビルドしてみましょう。アプリケーションのビルドは、pom.xmlを右クリックして[Run As]→[Maven package]を選択します。
コマンドラインからビルドする場合は、プロジェクトのディレクトリ(pom.xmlがあるディレクトリ)に移動してから下記のMavenコマンドを実行します。
$ cd ~/workspace/プロジェクトのディレクトリ
$ mvn package
mvn packageを実行すると、テストコードをすべて実行されます。すべてのテストにパスすれば、プロジェクトのターゲットディレクトリにパッケージが作成されます。
$ cd ~/workspace/プロジェクトのディレクトリ
$ mvn package
:
: (略)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 16.061s
[INFO] Finished at: Tue Mar 13 11:22:19 JST 2012
[INFO] Final Memory: 7M/16M
[INFO] ------------------------------------------------------------------------
$ cd target/
$ ls -l
合計 328
drwxrwxr-x 2 asakusa asakusa 4096 2月 20 11:50 antrun/
drwxrwxr-x 3 asakusa asakusa 4096 3月 13 11:22 batchc/
drwxrwxr-x 3 asakusa asakusa 4096 3月 13 11:22 batchcwork/
drwxrwxr-x 3 asakusa asakusa 4096 2月 20 11:51 classes/
drwxrwxr-x 2 asakusa asakusa 4096 3月 13 11:22 excel/
drwxrwxr-x 4 asakusa asakusa 4096 2月 20 11:50 generated-sources/
-rw-rw-r-- 1 asakusa asakusa 52242 3月 13 11:22 hands-on-asakusa-day2-1.0-SNAPSHOT-sources.jar
-rw-rw-r-- 1 asakusa asakusa 90902 3月 13 11:22 hands-on-asakusa-day2-1.0-SNAPSHOT.jar
-rw-rw-r-- 1 asakusa asakusa 127984 3月 13 11:22 hands-on-asakusa-day2-batchapps-1.0-SNAPSHOT.jar
drwxrwxr-x 2 asakusa asakusa 4096 3月 13 11:22 maven-archiver/
drwxrwxr-x 2 asakusa asakusa 4096 3月 13 11:21 surefire/
drwxrwxr-x 2 asakusa asakusa 4096 3月 13 11:21 surefire-reports/
drwxrwxr-x 3 asakusa asakusa 4096 2月 20 11:51 test-classes/
drwxrwxr-x 3 asakusa asakusa 4096 3月 13 11:20 testdriver/
$
大量にテストコードがあり、テストの実行だけで数十分かかってしまうということもあるでしょう。ちょっと急いでいるので、テストはスキップしたいということもあると思います。そんな場合は、下記のコマンドで、テストをスキップしてパッケージを作成できます。
$ cd ~/workspace/プロジェクトのディレクトリ
$ mvn -Dmaven.test.skip=true package
ただし、あまり乱用しないようにしてください。「これぐらいの修正であれば、影響ないだろう」という妄信は、トラブルの元です。
バッチのデプロイ先は、「$ASAKUSA_HOME/batchapps」です。デプロイは、ここにjarファイルを展開します。
$ cd $ASAKUSA_HOME/batchapps
$ jar xvf ~/workspace_asakusa/hands-on-asakusa-example/target/hands-on-asakusa-batchapps-1.0-SNAPSHOT.jar
$ rm -rf META-INF
デプロイが完了したら、「$ASAKUSA_HOME/batchapps」ディレクトリの下に、バッチIDのディレクトリが作成されます。
├ Day2ItemSummary ←バッチIDのディレクトリ
│ ├ bin
│ │ └ experimental.sh ←テスト実行用のシェル(deprecated予定)
│ ├ etc
│ │ ├ build.log
│ │ └ yaess-script.properties
│ └─ lib
│ ├ jobflow-JFItemSummary-sources.jar
│ └ jobflow-JFItemSummary.jar
jarファイルを展開すると、テスト実行用のシェルスクリプト(experimental.sh)ができますが、このシェルスクリプトは、deprecated 予定なので、バッチの実行にはYAESSを使用することが推奨されます。YAESSを使ったバッチの実行は、「$ASAKUSA_HOME/batchapps」ディレクトリで「yaess-batch.sh」にバッチIDを引数に指定して起動します。
YAESSで実行するバッチに引数を渡したい場合は、yaess-batch.sh実行時に「 -A {変数名}={値}」のように指定します。
$ $ASAKUSA_HOME/yaess/bin/yaess-batch.sh SampleBatch01 -A shop_code=330 sum_month=201203
※プログラムの中からバッチ引数の値を参照するには、コンテキストAPIを使用します。
:
Import com.asakusafw.runtime.core.BatchContext;
:
:
String shop_code = BatchContext.get("shop_code");
String sum_month = BatchContext.get("sum_month");
:
現行で5分かかっている「バッチ処理」が1分になったところで、そんなにうれしいことはないでしょう。例えば、8時間かかるので月次でしか実行できない商品別販売数の集計バッチ処理が20分で終わるようになれば、日次で実行することも可能になるかもしれません。
意思決定の周期が短くなれば、適宜、必要な商品を必要なだけ仕入れられるようになり、過剰在庫をかかえて腐らせてしまうこともありません。これは、一例でしかありませんが、皆さんがお使いになっているシステムや、お客さまに提供しているシステムでも「バッチ処理」が早く終われば、「機会損失が少なくなる」「もっとダイナミックな営業展開ができる」「いままであきらめていたことが、できるようになる」といったことがあるのではないかと思います。
また、データ量の増大に伴い、「バッチ処理」が実行できる時間帯で、実行が終わらなくなりそうなので、「そろそろハードウェアを増強しないと」みたいな話もあると思います。高価で高性能なハードウェアを増強するよりも、コモディティなマシンで構成したHadoopクラスタで、長時間バッチの実行時間を短縮するという解決法も考えられます。
とはいえ、自社でHadoopクラスタ構築、運用するというのも、若干、ハードルが高かったりもします。そんなに「たくさんのマシンを導入して管理しきれるのか」であるとか、「1台故障したときにどのように対処すればいいのだろう」であるとか、Hadoopクラスタを導入するのであればそれなりに検討しておかないといけないことがあります。
最近では、AWS(Amazon Web Serves)などのパブリッククラウドを必要なときに必要な時間だけ借りて、並列分散で「バッチ処理」を行うような事例も発表されています。これなら、時間単位で使用した分だけ課金されるので、比較的少ないコストで、「バッチ処理の時間短縮することによる新しい価値」を得られます。本当に時間短縮できるのか、試験的にHadoopクラスタを使ってみるのにも適しています。
「ビッグデータ」を安価な「クラウド環境」で高速に処理して「新しい価値」を得られるという、うれしい時代になってきたのです。
笹尾 一夫(ささお かずお)
ビッグデータの分析ツールの調査、検証に従事。本番業務システムにおいて300時間かかっていたバッチ処理をAsakusaFWで設計・実装し直し5時間弱に時間短縮可能であることを検証した。
TIS先端技術センターでは、採れたての検証成果や知見などをWebサイトで発信中