SparkのDoubleRDDについて

スポンサーリンク

Apache Spark の DoubleRDD(Functions)について。

数値型のRDD

関数によっては特定の型の RDD にしか適用できないものがあります。そして、数値型のRDD にしか適用できない関数がDoubleRDDFunctionsクラスに実装されています。 この関数を使用するために Double 型の RDD が必要になってくるのですが、使用する言語によって取り扱いが違ってきます。

ちなみにここで言ってる数値型とは Double のことであり、FloatRDD や LongRDD などはありません。

Scalaの場合

Scala はRDD[Double]からDoubleRDDFunctionsの暗黙的な型変換によって自動的に処理されますので、基本的にはあまり意識する必要はないです。 ただし、DoubleRDDFunctionsの関数は、当然標準的な RDD のクラスには存在しない関数なので、知らずにドキュメントを見ると混乱するかもしれません。

val intRDD = sc.parallelize(Seq(1,2,3,4,5,6,7,8,9))
intRDD.sum()  // RDD[Int] も RDD[Double] に変換される
val doubleRDD = intRDD.map(v => v * 1.08);
doubleRDD.mean()  // doubleRDD は RDD[Double] である

また、古い情報だと残っているかもしれないので注意点として書いておくと、Spark 1.3.0 以前では明示的なimport org.apache.spark.SparkContext._のインポートが無いと、コンパイラが暗黙的な型変換を検知できませんでした。今はインポートが不要になりましたので、この点を意識する必要がなくなっています。

 DoubleRDDFunctions - Scala Doc

Javaの場合

Java ではJavaDoubleRDDというクラスに明示的な変換をする必要があります。変換にはmapではなくてmapToDoubleflatMapToDoubleを使用しなければいけません。

List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9);
JavaRDD<Integer> intRDD = sc.parallelize(nums);
JavaDoubleRDD doubleRDD = intRDD.mapToDouble(v -> v * 1.08);
doubleRDD.mean();  // 正常

mapを使用した場合はJavaRDD<Double>に変換されるだけで、DoubleRDDFunctionsクラスの関数は使用できないので注意して下さい。

JavaRDD<Double> doubleRDD = intRDD.map(v -> v * 1.08);
doubleRDD.mean();  // エラー

 JavaDoubleRDD - Java Doc
 DoubleRDDFunctions - Java Doc

Pythonの場合

Python は Java や Scala とは異なっており、全ての関数は RDD のベースクラスに実装されているので、RDD 内のデータ型が適切でなかった場合には実行時に失敗します。

intRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
intRDD.sum()
strRDD = sc.parallelize(["hoge","fuga","bar"])
strRDD.mean()  # 文字列は対応してないので失敗する

 pyspark.RDD - Python Doc

DoubleRDDFunctionsのメソッド

DoubleRDDFunctions で使用できる数値関数は下記の通りです。

メソッド 戻り値 説明
stats StatCounter 記述統計の計算結果が StatCounter として返される。
sum Double 要素の合計を計算する。
mean Double 要素の平均を計算する。
sumApprox PartialResult
  <BoundedDouble>
タイムアウト時間内におおよその平均を計算する?
meanApprox PartialResult
  <BoundedDouble>
タイムアウト時間内におおよその合計を計算する?
stdev Double 要素の標準偏差を計算する。
variance Double 要素の分散を計算する。
sampleStdev Double 要素の標本標準偏差を計算する。
sampleVariance Double 要素の標本分散を計算する。
histogram Array[Long] ヒストグラムを計算する。
histogram (Array[Double],
  Array[Long])
ヒストグラムを計算する。

statsを実行した結果取得できる StatCounter オブジェクトは、下記の要約統計量が取得できます。一部、上記のメソッドと被っていますが内部的にはstatsを呼んでいるので同じです。

メソッド 説明
count RDD 内の要素数
max 最大値
min 最小値
sum 合計
mean 平均
stdev 標準偏差
variance 分散
sampleStdev 標本標準偏差
sampleVariance 標本分散