Apache Spark の DoubleRDD(Functions)について。
数値型のRDD
関数によっては特定の型の RDD にしか適用できないものがあります。そして、数値型のRDD にしか適用できない関数がDoubleRDDFunctionsクラスに実装されています。
この関数を使用するために Double 型の RDD が必要になってくるのですが、使用する言語によって取り扱いが違ってきます。
ちなみにここで言ってる数値型とは Double のことであり、FloatRDD や LongRDD などはありません。
Scalaの場合
Scala はRDD[Double]からDoubleRDDFunctionsの暗黙的な型変換によって自動的に処理されますので、基本的にはあまり意識する必要はないです。
ただし、DoubleRDDFunctionsの関数は、当然標準的な RDD のクラスには存在しない関数なので、知らずにドキュメントを見ると混乱するかもしれません。
val intRDD = sc.parallelize(Seq(1,2,3,4,5,6,7,8,9)) intRDD.sum() // RDD[Int] も RDD[Double] に変換される val doubleRDD = intRDD.map(v => v * 1.08); doubleRDD.mean() // doubleRDD は RDD[Double] である
また、古い情報だと残っているかもしれないので注意点として書いておくと、Spark 1.3.0 以前では明示的なimport org.apache.spark.SparkContext._のインポートが無いと、コンパイラが暗黙的な型変換を検知できませんでした。今はインポートが不要になりましたので、この点を意識する必要がなくなっています。
DoubleRDDFunctions - Scala Doc
Javaの場合
Java ではJavaDoubleRDDというクラスに明示的な変換をする必要があります。変換にはmapではなくてmapToDoubleやflatMapToDoubleを使用しなければいけません。
List<Integer> nums = Arrays.asList(1,2,3,4,5,6,7,8,9); JavaRDD<Integer> intRDD = sc.parallelize(nums); JavaDoubleRDD doubleRDD = intRDD.mapToDouble(v -> v * 1.08); doubleRDD.mean(); // 正常
mapを使用した場合はJavaRDD<Double>に変換されるだけで、DoubleRDDFunctionsクラスの関数は使用できないので注意して下さい。
JavaRDD<Double> doubleRDD = intRDD.map(v -> v * 1.08); doubleRDD.mean(); // エラー
JavaDoubleRDD - Java Doc
DoubleRDDFunctions - Java Doc
Pythonの場合
Python は Java や Scala とは異なっており、全ての関数は RDD のベースクラスに実装されているので、RDD 内のデータ型が適切でなかった場合には実行時に失敗します。
intRDD = sc.parallelize([1,2,3,4,5,6,7,8,9]) intRDD.sum() strRDD = sc.parallelize(["hoge","fuga","bar"]) strRDD.mean() # 文字列は対応してないので失敗する
DoubleRDDFunctionsのメソッド
DoubleRDDFunctions で使用できる数値関数は下記の通りです。
| メソッド | 戻り値 | 説明 |
|---|---|---|
| stats | StatCounter | 記述統計の計算結果が StatCounter として返される。 |
| sum | Double | 要素の合計を計算する。 |
| mean | Double | 要素の平均を計算する。 |
| sumApprox | PartialResult <BoundedDouble> |
タイムアウト時間内におおよその平均を計算する? |
| meanApprox | PartialResult <BoundedDouble> |
タイムアウト時間内におおよその合計を計算する? |
| stdev | Double | 要素の標準偏差を計算する。 |
| variance | Double | 要素の分散を計算する。 |
| sampleStdev | Double | 要素の標本標準偏差を計算する。 |
| sampleVariance | Double | 要素の標本分散を計算する。 |
| histogram | Array[Long] | ヒストグラムを計算する。 |
| histogram | (Array[Double], Array[Long]) |
ヒストグラムを計算する。 |
statsを実行した結果取得できる StatCounter オブジェクトは、下記の要約統計量が取得できます。一部、上記のメソッドと被っていますが内部的にはstatsを呼んでいるので同じです。
| メソッド | 説明 |
|---|---|
| count | RDD 内の要素数 |
| max | 最大値 |
| min | 最小値 |
| sum | 合計 |
| mean | 平均 |
| stdev | 標準偏差 |
| variance | 分散 |
| sampleStdev | 標本標準偏差 |
| sampleVariance | 標本分散 |