連載
» 2017年03月21日 05時00分 公開

Hadoopで処理を実装してみる──Hadoop Streamingでの処理、script-runner.jarの使い方きょうから試せる Hadoop“スモールスタート”ガイド(8)(3/4 ページ)

[佐々木達也,著]

script-runner.jarを使ってマスターノードで処理を走らせる

 実はEMRにはscript-runner.jarというjarが用意されています※6。これは--argsオプション(もしくは--argオプション)で指定したスクリプトをマスターノード上で実行してくれるというものです。

※6 Running a Script in a Job Flow - Amazon Elastic MapReduce

script-runner.jarのオプション
オプション 説明
--args ARGS スクリプトに渡す引数をカンマ区切りで指定する s3n://path/to/file,foo,bar
--arg ARG スクリプトに渡す引数を指定する

 この処理に限らずEMRのステップでは/mnt/var/lib/hadoop/steps/x(ステップ番号)ディレクトリが作業パスとなります。例えばこの例の場合、この作業パスに--argsオプションの先頭で指定しているfinalizer.rbがダウンロードされて配置され、残りが引数としてマスターノードではこのようなコマンドが実行されます。

 ここで利用しているfinalizer.rbはこのような内容となっています。

Sfinalizer.rb
#!/usr/bin/ruby
$KCODE = 'u'
require 'date'
 
keyword_counter = Hash.new {|h,k| h[k] = 0 }
 
@output_dir, @file_name = ARGV
 
def result_file_name
  "#{@file_name}_#{Date.today.strftime('%Y%m%d')}"
end
 
def concat_result
  system 'rm -rf result_dir'
  system 'mkdir result_dir'
  system "s3cmd sync s3://sasata299/output/#{@output_dir}/ result_dir"
 
  # part-xxxxxファイルを連結する
  `find result_dir -maxdepth 1 -type f -size +0`.split(/\n/).each do |file|
    system "cat #{file} >> result_dir/result"
  end
end
 
def upload_result
  system "s3cmd put result_dir/#{result_file_name} s3://sasata299/result/#{result_file_name}"
end
 
concat_result
 
File.open('result_dir/result') {|f|
  f.each_line do |line|
    line.chomp!
    keyword, count = line.split(/\t/)
    keyword_counter[keyword] = count
  end
}
 
sorted_keyword_counter = keyword_counter.sort {|a,b| b[1] <=> a[1] }
 
File.open("result_dir/#{result_file_name}", 'w') {|f|
  sorted_keyword_counter.each do |keyword, counter|
    next if keyword == '__TOTAL__'
 
    # キーワード、回数、表示された割合(%)
    f.puts "#{keyword},#{counter},#{counter.to_i * 100 / keyword_counter['__TOTAL__'].to_f}"
  end
}
 
upload_result

 処理の流れとしてはまず、S3上からEMRのステップの結果を取得してきて/mnt/var/lib/hadoop/steps/x/result_dir/に置いています。

system "s3cmd sync s3://sasata299/output/#{@output_dir}/ result_dir"

 そのディレクトリ内には複数の出力ファイルが存在しているはずなので、それらのファイルをひとつのファイル(この場合はresult)として連結します。

`find result_dir -maxdepth 1 -type f -size +0`.split(/\n/).each do |file|
  system "cat #{file} >> result_dir/result"
end

 あとはこのファイルを読み取って、検索回数順に並び替えます。

sorted_keyword_counter = keyword_counter.sort {|a,b| b[1] <=> a[1] }

 これを別ファイルに保存します。今回はキーワード、検索回数だけじゃなく__TOTAL__を利用して、そのキーワードが検索された割合(%)も一緒に出力するようにしてみました。

File.open("result_dir/#{result_file_name}", 'w') {|f|
  sorted_keyword_counter.each do |keyword, counter|
    next if keyword == '__TOTAL__'
 
    # キーワード、回数、表示された割合(%)
    f.puts "#{keyword},#{counter},#{counter.to_i * 100 / keyword_counter['__TOTAL__'].to_f}"
  end
}

 最後にこれをS3上に保存すれば完了です。

system "s3cmd put result_dir/#{result_file_name} s3://sasata299/result/#{result_file_name}"

 最終的にこのような結果が得られました。最初の結果よりもだいぶ扱いやすくなったのではないかと思います。

トマト,4,12.5
お弁当,3,9.375
カレー,2,6.25
ナス,2,6.25
キャベツ,2,6.25
きゅうり,2,6.25
豚肉,2,6.25
からあげ,1,3.125
サラダ,1,3.125
なす,1,3.125
じゃがいも,1,3.125
ハンバーグ,1,3.125
卵,1,3.125
たいやき,1,3.125
パスタ,1,3.125
にんじん,1,3.125
ピーマン,1,3.125
スープ,1,3.125
ズッキーニ,1,3.125
ウインナー,1,3.125
おかず,1,3.125
たまねぎ,1,3.125

 script-runner.jarはさまざまな処理をマスターノード上で走らせることができるので、ぜひ有効に活用してみてください。特にHadoopのジョブを走らせた後の終了処理に使うと便利だと思います。

「特定のユーザのみを対象」とする

 上記の例では全データをそのまま利用しましたが、例えば特定の条件を満たしたユーザに対してのみデータの集計を行いたい場合などもあるでしょう。そのような場合には--cacheオプションが便利です。

 このようなファイルを用意し、ここに含まれるユーザIDのみを対象としてみます。

user_id_list
1038733
547291
191938
90123
193032
402918

 それに併せてMapperの処理も少し変更します。user_id_listを参照するようにした修正後のMapperはこのようになりました。user_id_listに含まれていないユーザIDの場合には処理をスキップするようにしています。

keyword_mapper_filtering_user.rb

 では早速実行してみましょう。Reducerは先ほどと同じものを使っています。

 script-runner.jarは先ほどと同様に使えます。

 最終的な結果はこのようになりました。先ほどとは結果が異なり、特定のユーザのデータだけが利用されてランキングが変わっていることがわかると思います。

豚肉,2,18.1818181818182
キャベツ,1,9.09090909090909
ハンバーグ,1,9.09090909090909
じゃがいも,1,9.09090909090909
スープ,1,9.09090909090909
卵,1,9.09090909090909
たいやき,1,9.09090909090909
カレー,1,9.09090909090909
ナス,1,9.09090909090909
たまねぎ,1,9.09090909090909

ただし「処理が重くなりがち」なので注意する

 このような小さなuser_id_listであればまったく問題ないのですが、このuser_id_listが巨大なサイズとなってくるとこの部分の処理に時間が掛かるようになってきます。

next unless user_id_list.include?(user_id)

 呼ばれるのが数回とか数十回程度ならばあまり気になりませんが、Hadoopを利用するような処理の場合には膨大な回数で呼ばれるので注意が必要です。そのような場合にはuser_id_listを先頭3文字などで分割してあげるのもひとつの解決策だと思うので参考にしてみてください。

図4-5 毎回全体を確認するのはすごく時間がかかる 毎回全体を確認するのはすごく時間がかかる
図4-6 先頭3桁のキーを作ってその中にリストを持たせる 先頭3桁のキーを作ってその中にリストを持たせる

Copyright © ITmedia, Inc. All Rights Reserved.

RSSについて

アイティメディアIDについて

メールマガジン登録

@ITのメールマガジンは、 もちろん、すべて無料です。ぜひメールマガジンをご購読ください。