検索
連載

「ブートストラップアクション」でできることきょうから試せる Hadoop“スモールスタート”ガイド(7)(2/3 ページ)

実際にHadoopで処理を実装していきながら「Hadoopは、誰にだって扱える」を体感しましょう。今回は「ブートストラップアクションの利用方法」を解説します。

Share
Tweet
LINE
Hatena

Hadoopの設定を上書きする

 まず、Hadoopの設定を上書きしたい場合を考えてみます。そもそもデフォルトの設定はEMRによって最適化されていますが※1、より特定の用途に対応した設定にチューニングしたい場合などもあると思います。そういったときにはブートストラップアクションを利用してHadoopの設定を上書きすることが可能です。そういった用途のためにconfigure-hadoopというスクリプトがEMRから提供されているのでこれを使ってみましょう。

 ※1 Hadoopの設定ファイルは/home/hadoop/confディレクトリに配置されています


s3n://elasticmapreduce/bootstrap-actions/configure-hadoop
#!/usr/bin/ruby
 
require 'hpricot'
require 'tempfile'
 
CONFIG_HEADER = "<?xml version=\"1.0\"?>\n<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?>"
 
LETTER_TO_FILE = {
  "s" => "/home/hadoop/conf/hadoop-site.xml",
  "d" => "/home/hadoop/conf/hadoop-default.xml",
  "c" => "/home/hadoop/conf/core-site.xml",
  "h" => "/home/hadoop/conf/hdfs-site.xml",
  "m" => "/home/hadoop/conf/mapred-site.xml"
}
 
OPT_TO_FILE = {
  "site" => "/home/hadoop/conf/hadoop-site.xml",
  "default" => "/home/hadoop/conf/hadoop-default.xml",
  "core" => "/home/hadoop/conf/core-site.xml",
  "hdfs" => "/home/hadoop/conf/hdfs-site.xml",
  "mapred" => "/home/hadoop/conf/mapred-site.xml"
}
 
def usage
  puts "-<f>, --<file>-key-value"
  puts " Key/value pair that will be merged into the specified config file."
  puts ""
  puts "-<F>, --<file>-config-file"
  puts " Config file in Amazon S3 or locally that will be merged with the specified config file."
  puts ""
  puts "Acceptable config files:"
  puts " s/S site hadoop-site.xml"
  puts " d/D default hadoop-default.xml"
  puts " c/C core core-site.xml"
  puts " h/H hdfs hdfs-site.xml"
  puts " m/M mapred mapred-site.xml"
  puts ""
  puts ""
  puts "Example Usage"
  puts " --site-config-file s3://bucket/config.xml -s mapred.tasktracker.map.tasks.maximum=2"
  exit -1
end
 
def parse_config_file(config_file_path)
  ret = []
  if File.exist?(config_file_path) then
    doc = open(config_file_path) { |f| Hpricot(f) }
    (doc/"configuration"/"property").each do |property|
      val = {:name => (property/"name").inner_html, :value => (property/"value").inner_html }
      if (property/"final").inner_html != "" then
        val[:final] = (property/"final").inner_html
      end
      ret << val
    end
  else
    puts "#{config_file_path} does not exist, assuming empty configuration"
  end
  return ret
end
 
def dump_config_file(file_name, config)
  open(file_name, 'w') do |f|
    f.puts CONFIG_HEADER
    f.puts '<configuration>'
    for entry in config
      f.print " <property><name>#{entry[:name]}</name><value>#{entry[:value]}</value>"
      if entry[:final] then
        f.print "<final>#{entry[:final]}</final>"
      end
      f.puts '</property>'
    end
    f.puts '</configuration>'
  end
end
 
def merge_config(default, overwrite)
  for entry in overwrite
    cells = default.select { |x| x[:name] == entry[:name]}
    if cells.size == 0 then
      puts "'#{entry[:name]}': default does not have key, appending value '#{entry[:value]}'"
      default << entry
    elsif cells.size == 1 then
      puts "'#{entry[:name]}': new value '#{entry[:value]}' overwriting '#{cells[0][:value]}'"
      cells[0].replace(entry)
    else
      raise "'#{entry[:name]}': default has #{cells.size} keys"
    end
  end
end
 
def run_command(command)
  puts "Running command: '#{command}'"
  results = `#{command} 2>&1`
  failure = $?
  puts "Command output: " + results
  if failure != 0 then
    raise "Got failure status #{failure} running #{command}"
  end
  return results
end
 
def get_local_file(file_uri, temp)
  if file_uri.include?("://") then
    puts "File #{file_uri} is non local, fetching from remote"
    File.unlink(temp) if File::exists?(temp)
    run_command("/home/hadoop/bin/hadoop dfs -cp #{file_uri} file://#{temp}")
    puts "Remote file fetched into #{temp}"
    return temp
  else
    return file_uri
  end
end
 
def do_overwrites(list)
  for keyvalue, default_file, arg in list
    if keyvalue then
      puts "Processing default file #{default_file} with overwrite #{arg}"
      key = arg.split('=', 2)[0]
      value = arg.split('=', 2)[1]
      overwrite = [{:name => key, :value => value }]
    else
      puts "Processing default file #{default_file} with overwrites from #{arg}"
      local_overwrite_file = get_local_file(arg, default_file + ".overwrite")
      overwrite = parse_config_file(local_overwrite_file)
    end
 
    default = parse_config_file(default_file)
    merge_config(default,overwrite)
    dump_config_file(default_file + ".new", default)
    if File.exist?(default_file) then
      File.rename(default_file, default_file + ".old")
    end
    File.rename(default_file + ".new", default_file)
    puts "Saved #{default_file} with overwrites. Original saved to #{default_file}.old"
  end
end
 
# Returns:
# boolean - true if this is a key/value option (as compared to file)
# string - config file location to merge with
def opt_to_file(arg)
  if arg.match(/^-[sdchm]$/) then
    return true, LETTER_TO_FILE[arg[1..1].downcase]
  elsif arg.match(/^-[SDCHM]$/) then
    return false, LETTER_TO_FILE[arg[1..1].downcase]
  elsif arg.match(/^-+[^-]*-key-value$/) then
    file = OPT_TO_FILE[arg.match(/^-+([^-]*)-key-value$/)[1]]
    raise "Unrecognized argument" if !file
    return true, file
  elsif arg.match(/^-+[^-]*-config-file$/) then
    file = OPT_TO_FILE[arg.match(/^-+([^-]*)-config-file$/)[1]]
    raise "Unrecognized argument" if !file
    return false, file
  end
  raise "Unrecognized argument #{arg}"
end
 
def unit_test
  default_string = <<HERE
#{CONFIG_HEADER}
<configuration>
  <property><name>a</name><value>not_changed</value></property>
  <property><name>b</name><value>final_not_changed</value><final>true</final></property>
  <property><name>c</name><value>old_value_to_change</value></property>
  <property><name>d</name><value>old_value_to_change</value><final>true</final></property>
</configuration>
HERE
  default_file = Tempfile.new('default')
  default_file << default_string
  default_file.close
 
  overwrite_string = <<HERE
#{CONFIG_HEADER}
<configuration>
  <property><name>c</name><value>new_value</value></property>
  <property><name>d</name><value>new_value_no_final</value></property>
  <property><name>e</name><value>new_entry</value><final>true</final></property></configuration>
HERE
  overwrite_file = Tempfile.new('default')
  overwrite_file << overwrite_string
  overwrite_file.close
  do_overwrites([[false, default_file.path, overwrite_file.path]])
  do_overwrites([[true, default_file.path, "f=new_kv"]])
 
  default_file.open
  actual = default_file.read
  default_file.close
  puts "actual = #{actual}"
 
  expected = <<HERE
#{CONFIG_HEADER}
<configuration>
  <property><name>a</name><value>not_changed</value></property>
  <property><name>b</name><value>final_not_changed</value><final>true</final></property>
  <property><name>c</name><value>new_value</value></property>
  <property><name>d</name><value>new_value_no_final</value></property>
  <property><name>e</name><value>new_entry</value><final>true</final></property>
  <property><name>f</name><value>new_kv</value></property>
</configuration>
HERE
  if actual == expected then
    puts "TEST PASSED!"
  else
    raise "TEST_FAILED!!!"
  end
end
 
##
## BEGIN MAIN
##
if ARGV.size == 1 && ARGV[0] == "--test" then
  unit_test
elsif ARGV.size == 0 then
  usage
else
  list = []
  while ARGV.size > 0
    arg = ARGV.shift
    if ARGV.size == 0 then
      usage
    end
    begin
      keyvalue, file = opt_to_file(arg)
      list << [keyvalue, file, ARGV.shift]
    rescue
      print "Unrecognized argument: #{arg}\n\n"
      usage
    end
  end
  do_overwrites(list)
end

 このスクリプトを利用して、mapred-site.xmlに設定されているmapred.tasktracker.map.tasks.maximumの値を変更してみたいと思います。m1.smallを利用した場合、デフォルトではこの値は2と設定されていますが、それを3に変更します。

 ちなみに、--argsオプションで引数の先頭に指定している-sというのは何なのか気になるかもしれません。これはconfigure-hadoopのソースを読むとわかりますが、設定をhadoop-site.xmlというファイルに書き込むという意味になっています。

ブートストラップアクションのオプション
オプション 説明
--bootstrap-action SCRIPT ブートストラップアクションで実行させたいスクリプトのパスを指定する s3n://path/to/xxx
--bootstrap-name NAME ブートストラップアクションに名前を付ける
--args ブートストラップアクションに渡す引数をカンマ区切りで指定する -s,mapred.tasktracker.map.tasks.maximum=3

 EMRクラスタが起動したらマスターノードにSSHでリモートログインしてみましょう。通常この値が設定されているmapred-site.xmlの設定にはそのままデフォルトのものが残っています。

conf/mapred-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  ...
  <property>
    <name>mapred.tasktracker.map.tasks.maximum</name><value>2</value>
  </property>
  …
</configuration>

 ブートストラップアクションで指定した内容は-sで指定したhadoop-site.xmlに書かれています。hadoop-site.xmlは通常は存在しない設定ファイルですが、ブートストラップアクションでの設定時に作成され、かつ、この設定ファイルの値がデフォルトの値を上書きする形で利用されます。この値が上書きして利用されるため、結果としてmapred.tasktracker.map.tasks.maximumの値は3として動作する、というわけです。

conf/hadoop-site.xml
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
  <property>
    <name>mapred.tasktracker.map.tasks.maximum</name><value>3</value>
  </property>
</configuration>

 ブートストラップアクションのログはマスターノードの/mnt/var/log/bootstrapactions以下に記録されています。master.logはブートストラップアクション全体のログです。

master.log
2012-07-23 10:13:06,240 INFO i-4cbc9034: new instance started
2012-07-23 10:13:07,320 INFO i-4cbc9034: bootstrap action 1 completed
2012-07-23 10:14:18,944 INFO i-4cbc9034: all bootstrap actions complete and
instance ready

 さらに、1や2などのようなディレクトリが存在すると思います。これはn番目のブートストラップ実行時のログディレクトリを表します。例えば1/controllerというログファイルを見ると、ブートストラップアクションで実行するファイルをS3から取得して、/mnt/var/lib/bootstrap-actions/1に置き、引数を付けて実行している様子がわかります。

1/controller
012-07-23T10:13:06.262Z INFO Fetching file 's3n://elasticmapreduce/bootstrapactions/configure-hadoop'
2012-07-23T10:13:06.810Z INFO Working dir /mnt/var/lib/bootstrap-actions/1
2012-07-23T10:13:06.811Z INFO Executing /mnt/var/lib/bootstrap-actions/1/
configure-hadoop -s mapred.tasktracker.map.tasks.maximum=3
2012-07-23T10:13:07.318Z INFO Execution ended with ret val 0
2012-07-23T10:13:07.319Z INFO Execution succeeded

Copyright © ITmedia, Inc. All Rights Reserved.

ページトップに戻る