Hadoop Streaming の使い方について(サンプルRuby)

スポンサーリンク

Java以外の言語を使用して MapReduce を実行することのできる Hadoop Streaming の使用についてです。ディストリビューションに付属しているユーティリティであり、データを標準入出力を介して受け渡すため、標準入出力が扱える言語であれば MapReduce ジョブを記述して実行することが可能です。

Hadoop Streaming の構文

Hadoop Streaming の基本的な構文はhadoop command [genericOptions] [streamingOptions]になります。通常の Java でのジョブと同様に jar コマンドにhadoop-streaming.jarを指定してください。genericOptionsstreamingOptionsの前に配置しないと失敗してしまいます。以下がコマンド実行例です。

$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
      -files /tmp/sample_mapper.rb, /tmp/sample_reducer.rb \
      -mapper sample_mapper.rb \
      -reducer sample_reducer.rb \
      -input test/input/test.txt \
      -output test/output

Generic Command Options

オプション 概要
-conf アプリケーション構成ファイルを指定
-Dproperty=value 指定されたプロパティの値を使用
-fs Namenodeを指定
-files コピーするファイルをカンマ区切りで指定
-libjars クラスパスに含めるjarファイルをカンマ区切りで指定
-archives アーカイブファイルをカンマ区切りで指定

Streaming Command Options

オプション 概要
-input [必須] 入力のディレクトリかファイルを指定
-output [必須] 出力のディレクトリを指定
-mapper [必須] Mapperの実行プログラム
-reducer [必須] Reducerの実行プログラム
-file ファイルをノードにコピー
-inputformat InputFormatを指定(JavaClass)デフォルトTextInputFormat
-outputformat OutputFormatを指定(JavaClass)デフォルトTextOutputFormat
-partitioner Partitionerを指定(JavaClass)
-combiner Combinerを指定
-cmdenv 環境変数を指定
-inputreader InputFormatの代わりのRecordReaderを指定
-verbose 詳細を表示
-lazyOutput 遅延した出力を作成?
-numReduceTasks Reducerの数を指定
-mapdebug Mapperが失敗したときに呼び出すスクリプト
-reducedebug Reducerが失敗したときに呼び出すスクリプト

オプションの補足

-fileオプションで実行すると非推奨ですと警告がでます。-filesオプションの方を使いましょう。

WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.

-reducerオプションは必須となっていますが、Mapper のみ実行する場合は必要ありません。genericOptions に以下を指定してください。

-D mapreduce.job.reduces=0

他にもオプションは-infoで確認できます。

$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -info

RubyによるMapReduceプログラムのサンプル

例としてワードカウントのサンプルになりますが、Mapper は次のように記述します。標準出力に対してキーと値をタブ区切りで出力してください。また、Javaの Mapper クラスではキーと値が引数として渡されますが、Hadoop Streaming では値のみが渡されます。

#!/usr/bin/env ruby

STDIN.each_line do |line|
  line.split.each do |word|
    puts "#{word}\t1"
  end
end

Reducer は次のように記述します。データはタブ区切りのキーと値がキーでソートされた順に1行ずつ渡されます。そのため同じキーのデータは必ず同じ Reducer で処理されるため、キーの値が変わる(キーブレイクする)まで一連の処理を行うようなプログラムにする必要があります。

#!/usr/bin/env ruby

currentKey = nil
total = 0

STDIN.each_line do |line|
  key, value = line.split("\t")
  value = value.to_i
  if currentKey && currentKey != key
    puts "#{currentKey}\t#{total}"
    currentKey = key
    total = value
  else
    currentKey = key
    total += value
  end
end
puts "#{currentKey}\t#{total}"

今回は標準入力の読み込みにSTDIN.each_lineを使用しましたが、他にも次のような方法があります。

ARGF.each do |line| ... end
while line = STDIN.gets ... end

PATHについて

Hadoop Streaming は-files-archivesで指定したファイルが各ノードにコピーされて実行されるわけですが、当然のことながら各ノードには実行するプログラムがインストールされていなければならず、PATHが通っていなければいけません。そうでなければコマンドが見つからず次のようなエラーがでます。

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127

例えば Ruby のスクリプトで実行したい場合、1行目には shebang を記述しておきましょう。

#!/usr/bin/env ruby

それか、実は以下のように実行コマンドを指定してやることもできます。

-mapper "ruby sample_mapper.rb" -reducer "ruby sample_reducer.rb"

Rubyのbundlerを使用したい場合

単純なスクリプトの実行はわかったけど、bundler でインストールした Gem を使用しているスクリプトの場合どうすればいいのか?ここが一番嵌りましたが解決策はありました。

まず-archivesオプションを使用します。これはjartgzで圧縮したファイルを指定することでアプリを丸ごとコピーできます。そしてシェルをひとつはさんで実行してください。

まず実行コマンドは以下のようになります。-archivesにはbundle installしたアプリをtar zcf ../my-streaming.tar.gz .のようにパッケージングしたファイルを指定してください。#appとしているのは別名のシンボリックリンクを作成するためです。

Mapper の実行ファイルはシェルですが、引数に-archivesのファイル名(シンボリックリンク)と本当の実行プログラムを渡しています。

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
      -Dmapred.reduce.tasks=0 \
      -files "/tmp/bundler_run.sh" \
      -archives "/tmp/my-streaming.tar.gz#app" \
      -mapper "bundler_run.sh app sample_mapper.rb" \
      -input test/input \
      -output test/output

実行するシェルは以下のとおりです。まず rbenv のパスを通します。あとは見たままですが、第1引数にcdしてbundle exec ruby 第2引数で実行していますね。これで bundler を使用したスクリプトも実行することが可能です。

#!/bin/sh
export RBENV_ROOT="/opt/rbenv"
export PATH="$RBENV_ROOT/bin:$PATH"
eval "$(rbenv init -)"

APP_NAME=$1
EXEC=$2

cd $APP_NAME
bundle exec ruby $EXEC

参考リンク

 Apache Hadoop MapReduce Streaming –
 Use Ruby Gems With Hadoop Streaming
 終了ステータス - Man page of BASH