script-runner.jarを使ってマスターノードで処理を走らせる
実はEMRにはscript-runner.jarというjarが用意されています※6。これは--argsオプション(もしくは--argオプション)で指定したスクリプトをマスターノード上で実行してくれるというものです。
※6 Running a Script in a Job Flow - Amazon Elastic MapReduce
オプション | 説明 | 例 | |
---|---|---|---|
--args ARGS | スクリプトに渡す引数をカンマ区切りで指定する | s3n://path/to/file,foo,bar | |
--arg ARG | スクリプトに渡す引数を指定する | ||
この処理に限らずEMRのステップでは/mnt/var/lib/hadoop/steps/x(ステップ番号)ディレクトリが作業パスとなります。例えばこの例の場合、この作業パスに--argsオプションの先頭で指定しているfinalizer.rbがダウンロードされて配置され、残りが引数としてマスターノードではこのようなコマンドが実行されます。
ここで利用しているfinalizer.rbはこのような内容となっています。
#!/usr/bin/ruby $KCODE = 'u' require 'date' keyword_counter = Hash.new {|h,k| h[k] = 0 } @output_dir, @file_name = ARGV def result_file_name "#{@file_name}_#{Date.today.strftime('%Y%m%d')}" end def concat_result system 'rm -rf result_dir' system 'mkdir result_dir' system "s3cmd sync s3://sasata299/output/#{@output_dir}/ result_dir" # part-xxxxxファイルを連結する `find result_dir -maxdepth 1 -type f -size +0`.split(/\n/).each do |file| system "cat #{file} >> result_dir/result" end end def upload_result system "s3cmd put result_dir/#{result_file_name} s3://sasata299/result/#{result_file_name}" end concat_result File.open('result_dir/result') {|f| f.each_line do |line| line.chomp! keyword, count = line.split(/\t/) keyword_counter[keyword] = count end } sorted_keyword_counter = keyword_counter.sort {|a,b| b[1] <=> a[1] } File.open("result_dir/#{result_file_name}", 'w') {|f| sorted_keyword_counter.each do |keyword, counter| next if keyword == '__TOTAL__' # キーワード、回数、表示された割合(%) f.puts "#{keyword},#{counter},#{counter.to_i * 100 / keyword_counter['__TOTAL__'].to_f}" end } upload_result
処理の流れとしてはまず、S3上からEMRのステップの結果を取得してきて/mnt/var/lib/hadoop/steps/x/result_dir/に置いています。
system "s3cmd sync s3://sasata299/output/#{@output_dir}/ result_dir"
そのディレクトリ内には複数の出力ファイルが存在しているはずなので、それらのファイルをひとつのファイル(この場合はresult)として連結します。
`find result_dir -maxdepth 1 -type f -size +0`.split(/\n/).each do |file| system "cat #{file} >> result_dir/result" end
あとはこのファイルを読み取って、検索回数順に並び替えます。
sorted_keyword_counter = keyword_counter.sort {|a,b| b[1] <=> a[1] }
これを別ファイルに保存します。今回はキーワード、検索回数だけじゃなく__TOTAL__を利用して、そのキーワードが検索された割合(%)も一緒に出力するようにしてみました。
File.open("result_dir/#{result_file_name}", 'w') {|f| sorted_keyword_counter.each do |keyword, counter| next if keyword == '__TOTAL__' # キーワード、回数、表示された割合(%) f.puts "#{keyword},#{counter},#{counter.to_i * 100 / keyword_counter['__TOTAL__'].to_f}" end }
最後にこれをS3上に保存すれば完了です。
system "s3cmd put result_dir/#{result_file_name} s3://sasata299/result/#{result_file_name}"
最終的にこのような結果が得られました。最初の結果よりもだいぶ扱いやすくなったのではないかと思います。
トマト,4,12.5 お弁当,3,9.375 カレー,2,6.25 ナス,2,6.25 キャベツ,2,6.25 きゅうり,2,6.25 豚肉,2,6.25 からあげ,1,3.125 サラダ,1,3.125 なす,1,3.125 じゃがいも,1,3.125 ハンバーグ,1,3.125 卵,1,3.125 たいやき,1,3.125 パスタ,1,3.125 にんじん,1,3.125 ピーマン,1,3.125 スープ,1,3.125 ズッキーニ,1,3.125 ウインナー,1,3.125 おかず,1,3.125 たまねぎ,1,3.125
script-runner.jarはさまざまな処理をマスターノード上で走らせることができるので、ぜひ有効に活用してみてください。特にHadoopのジョブを走らせた後の終了処理に使うと便利だと思います。
「特定のユーザのみを対象」とする
上記の例では全データをそのまま利用しましたが、例えば特定の条件を満たしたユーザに対してのみデータの集計を行いたい場合などもあるでしょう。そのような場合には--cacheオプションが便利です。
このようなファイルを用意し、ここに含まれるユーザIDのみを対象としてみます。
1038733 547291 191938 90123 193032 402918
それに併せてMapperの処理も少し変更します。user_id_listを参照するようにした修正後のMapperはこのようになりました。user_id_listに含まれていないユーザIDの場合には処理をスキップするようにしています。
では早速実行してみましょう。Reducerは先ほどと同じものを使っています。
script-runner.jarは先ほどと同様に使えます。
最終的な結果はこのようになりました。先ほどとは結果が異なり、特定のユーザのデータだけが利用されてランキングが変わっていることがわかると思います。
豚肉,2,18.1818181818182 キャベツ,1,9.09090909090909 ハンバーグ,1,9.09090909090909 じゃがいも,1,9.09090909090909 スープ,1,9.09090909090909 卵,1,9.09090909090909 たいやき,1,9.09090909090909 カレー,1,9.09090909090909 ナス,1,9.09090909090909 たまねぎ,1,9.09090909090909
ただし「処理が重くなりがち」なので注意する
このような小さなuser_id_listであればまったく問題ないのですが、このuser_id_listが巨大なサイズとなってくるとこの部分の処理に時間が掛かるようになってきます。
next unless user_id_list.include?(user_id)
呼ばれるのが数回とか数十回程度ならばあまり気になりませんが、Hadoopを利用するような処理の場合には膨大な回数で呼ばれるので注意が必要です。そのような場合にはuser_id_listを先頭3文字などで分割してあげるのもひとつの解決策だと思うので参考にしてみてください。
Copyright © ITmedia, Inc. All Rights Reserved.