Apache Spark の ペアRDD(PairRDDFunctions)について。
ペアRDDの生成
キー/値のペア RDD に対して適用できる特別な関数がPairRDDFunctionsクラスに実装されています。この関数を使用するために タプル型の RDD が必要になってくるのですが、使用する言語によって取り扱いが違っています。
Working with Key-Value Pairs - Spark Programming Guide
Scalaの場合
Scala でペア RDD を生成するにはmap関数でタプル型を返してRDD[(K, V)]を生成します。そして、タプル型からなる RDD に対しては暗黙的な型変換によってPairRDDFunctionsの関数が使用できますので、使用に関してあまり意識する必要はありません
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)
Javaの場合
Java でペア RDD を扱うにはJavaPairRDDというクラスに明示的な変換をする必要があります。JavaPairRDDからはPairRDDFunctionsの関数が使用できます。
変換にはmap関数の代わりにmapToPairやflatMapToPair関数を使用しなければいけません。これらは引数にタプル型を返すPairFunctionやPairFlatMapFunctionを受け取ります。
そして、Java には組み込みのタプル型がないので、scala.Table2クラスを使ってnew Tuple2(key, val)でタプルを生成します。要素にはtuble._1()およびtuble._2()というメソッドでアクセス可能です。
JavaRDD<String> lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
JavaPairRDD - Java Doc
PairRDDFunctions - Java Doc
Pythonの場合
Python でペア RDD を生成するにはmap関数でタプル型を返す必要があります。Python は Java や Scala とは異なっており、全ての関数は RDD のベースクラスに実装されているので、ペア RDD 用の関数を実行しても RDD 内のデータ型が適切でなかった場合には実行時に失敗します。
lines = sc.textFile("data.txt") pairs = lines.map(lambda x: (x.split(" ")[0], 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
ペアRDDの変換
ペアRDD では 標準の RDD で使えるすべての変換が使えます。ただし、ペアRDD にはタプルが含まれるので、個々の要素ではなくタプルを操作するものでなければいけません。
以下は、ペアRDD に対して適用できるPairRDDFunctionsクラスに実装されている変換で、データがキー/値形式になっている利点を生かすことができます。
| メソッド | 引数 | 内容 |
|---|---|---|
| reduceByKey | (func: (V, V) => V) | 同じキーの値を結合する。 |
| (func: (V, V) => V, numPartitions: Int) | ||
| (partitioner: Partitioner, func: (V, V) => V) | ||
| groupByKey | - | 同じキーの値をグループ化して返す。 戻り型: RDD[(K, Iterable[V])] |
| (numPartitions: Int) | ||
| (partitioner: Partitioner) | ||
| combineByKey | (createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C) |
キー毎に値を結合する。 |
| (createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, numPartitions: Int) | ||
| (createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null) | ||
| aggregateByKey | ||
| mapValues | (f: V => U) | 値のみに関数を適用する。 |
| flatMapValues | (f: (V) ⇒ TraversableOnce[U]) | イテレータを返す関数を値に適用する。 |
| keys | - | キーのみからなる`RDD[K]`を返す。 |
| values | - | 値のみからなる`RDD[V]`を返す。 |
| countApproxDistinctByKey | (relativeSD: Double = 0.05) | キー毎に値の種類数を返す。 戻り型: RDD[(K, Long)] |
| (relativeSD: Double, numPartitions: Int) | ||
| (relativeSD: Double, partitioner: Partitioner) | ||
| (p: Int, sp: Int, partitioner: Partitioner) | ||
| foldByKey | (zeroValue: V) (func: (V, V) ⇒ V) |
キー毎に値を畳み込む。 戻り型: RDD[(K, V)] |
| (zeroValue: V, numPartitions: Int) (func: (V, V) ⇒ V) | ||
| (zeroValue: V, partitioner: Partitioner) (func: (V, V) ⇒ V) | ||
| partitionBy | (partitioner: Partitioner) | partitionerで分割されたRDDを返す。 戻り型: RDD[(K, V)] |
| sampleByKey | (withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong) |
Basic Statistics - spark.mllib |
| sampleByKeyExact | (withReplacement: Boolean, fractions: Map[K, Double], seed: Long = Utils.random.nextLong) |
Basic Statistics - spark.mllib |
復数のペアRDDに対する変換
| メソッド | 引数 | 内容 |
|---|---|---|
| join | (other: RDD[(K, W)]) | 2つのRDDの内部結合(Inner Join)を行う。 戻り型: RDD[(K, (V, W))] |
| (other: RDD[(K, W)], numPartitions: Int | ||
| (other: RDD[(K, W)], partitioner: Partitioner) | ||
| leftOuterJoin | (other: RDD[(K, W)]) | 2つのRDDの外部結合(Left Outer Join)を行う。 戻り型: RDD[(K, (V, Option[W]))] |
| (other: RDD[(K, W)], numPartitions: Int) | ||
| (other: RDD[(K, W)], partitioner: Partitioner) | ||
| rightOuterJoin | (other: RDD[(K, W)]) | 2つのRDDの外部結合(Right Outer Join)を行う。 戻り型: RDD[(K, (V, Option[W]))] |
| (other: RDD[(K, W)], numPartitions: Int) | ||
| (other: RDD[(K, W)], partitioner: Partitioner) | ||
| fullOuterJoin | (other: RDD[(K, W)]) | 2つのRDDの完全外部結合(Full Outer Join)を行う。 戻り型: RDD[(K, (Option[V], Option[W]))] |
| (other: RDD[(K, W)], numPartitions: Int) | ||
| (other: RDD[(K, W)], partitioner: Partitioner) | ||
| subtractByKey | (other: RDD[(K, W)]) | 他のRDDに含まれているキーを持つ要素を取り除く。 戻り型: RDD[(K, V)] |
| (other: RDD[(K, W)], numPartitions: Int) | ||
| (other: RDD[(K, W)], partitioner: Partitioner) | ||
| cogroup | (other: RDD[(K, W)]) | 両方のRDDの同じキーを持つデータをグループ化する。 同名メソッドで引数に他のRDDを3つまで指定可能。 戻り型: RDD[(K, (Iterable[V], Iterable[W]))] |
| (other: RDD[(K, W)], numPartitions: Int) | ||
| (other: RDD[(K, W)], partitioner: Partitioner) | ||
| groupWith | (other: RDD[(K, W)]) | cogroup のエイリアス。他のRDDを3つまで指定可能。 戻り型: RDD[(K, (Iterable[V], Iterable[W]))] |
ペアRDDのアクション
変換と同様に、 標準の RDD で使えるすべてのアクションが使えます。
以下は、ペアRDD に対して適用できるPairRDDFunctionsクラスに実装されているアクションで、データがキー/値形式になっている利点を生かすことができます。
| メソッド | 引数 | 内容 |
|---|---|---|
| countByKey | - | キー毎に要素数をカウントする。 戻り型: Map[K, Long] |
| countByKeyApprox | (timeout: Long, confidence: Double = 0.95) |
タイムアウト付きの countByKey。 戻り型: PartialResult[Map[K, BoundedDouble]] |
| collectAsMap | - | 要素をMapに格納し直す。同じキーが復数ある場合は、 そのうちの1つだけ返される。戻り型: Map[K, V] |
| lookup | (key: K) | 指定したキーに関連づけられたすべての値を返す。 戻り型: Seq[V] |
| reduceByKeyLocally | (func: (V, V) ⇒ V) | キー毎に値の演算を行う。reduceByKeyToDriver は非推奨になり、reduceByKeyLocallyのエイリアスとなっている。 戻り型: Map[K, V] |